2024-06-26

How to use Google Pub/Sub Locally with Golang

As much as I didn't like Google PubSub (because of it's inefficiency), especially compared to normal database that forced to use as queue (eg. Tarantool can do 200k events/s, Clickhouse can do 900k events/s, without automatic fan out). Here's how you can use it using Go. First you must create a docker compose file:


services:
  testpubsub:
    image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest # 482.0.0
    command: /usr/bin/gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
    ports:
      - "8085:8085"
# run with: docker compose up


The just run this code that stolen and modified a bit to also publish an event from this article:


package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
)

const (
    projectID = "your-project-id"
    topicID   = "your-topic-id"
    subName   = "your-subscription-name"
)

type Message struct {
    Data string `json:"data"`
}

func main() {
    os.Setenv(`PUBSUB_EMULATOR_HOST`, `localhost:8085`)
    ctx := context.Background()

    // Create a Pub/Sub client
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // Create a topic if it doesn't already exist
    topic := client.Topic(topicID)
    ok, err := topic.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if topic exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateTopic(ctx, topicID); err != nil {
            log.Fatalf("Failed to create topic: %v", err)
        }
        log.Printf("Topic %s created.\n", topicID)
    }

    // Create a subscription to the topic "topic A"
    sub := client.Subscription(subName)
    ok, err = sub.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if subscription exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
            Topic: topic,
        }); err != nil {
            log.Fatalf("Failed to create subscription: %v", err)
        }
        log.Printf("Subscription %s created.\n", subName)
    }

    go func() {
        time.Sleep(2 * time.Second)
        // publish some event
        topic.Publish(ctx, &pubsub.Message{
            Data: []byte(`{"data":"hello world"}`),
        })
    }()

    // Start consuming messages from the subscription
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        // Unmarshal the message data into a struct
        var m Message
        if err := json.Unmarshal(msg.Data, &m); err != nil {
            log.Printf("Failed to unmarshal message data: %v", err)
            msg.Nack()
            return
        }

        // Print the message data
        fmt.Printf("Received message: %s\n", m.Data)

        // Acknowledge the message
        msg.Ack()
    })
    if err != nil {
        log.Fatalf("Failed to receive messages: %v", err)
    }

    // Gracefully shutdown the Pub/Sub client
    if err := client.Close(); err != nil {
        log.Fatalf("Failed to close client: %v", err)
    }
}

That's it, you don't need access to internet to test Google Pub/Sub, it would just work with docker compose.

2024-06-13

Backup and Restore Cassandra Database for Development

Last time we're dumping from Cassandra and restore it to Clickhouse for proper analytics. To do that we need cqlsh and cassandradump

pip install -U cqlsh
pip install cassandra-driver

# list all tables
tables=`cqlsh -k keyspace1 -e 'describe keyspace' | grep TABLE | cut -d '.' -f 2 | cut -d ' ' -f 1`

# run copy to file
mkdir backup
echo $tables | while read x; do time cqlsh -k keyspace1 -e "COPY $x TO STDOUT WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" | grep -v 'WARNING: cqlsh'
| sed 's|\\\\|\\|g' > backup/$x.csv  ; done

# run copy from file
 echo $tables | while read x; do time cat backup/$x.csv
| sed 's|\\\\|\\|g' | cqlsh -k keyspace1 -e "COPY $x FROM stdin WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" ; done


That's it, you can update the rows before restoring for your integration test. There's a weird on Cassandra's CSV when backslash being escaped with backslash that cannot be removed even with tr command (probably because tr cant be more than 1 characters), it restored as more backslash and NULL imported as zero.

2024-03-23

Clickhouse in Kubernetes / Minikube using Bitnami Helm Charts

Normally for my projects I use docker-compose + normal bind persistence directory to avoid data loss for database (and keep it fast and simple to maintain), because it's simpler to upgrade version than normal installation (looking at that popular database that annoying to upgrade the major version that requier a lot of copying) and the network/disk/cpu overhead also unnoticeable. Today we're gonna try to setup Clickhouse using minikube. Assuming you already installed minikube, all you need to do is install helm (k8s package manager).

curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get-helm-3 > install_helm.sh && bash install_helm.sh
helm install my-release oci://registry-1.docker.io/bitnamicharts/clickhouse

# to check again the deploy
helm status my-release

# to check configurable options
helm show all oci://registry-1.docker.io/bitnamicharts/clickhouse

# to uninstall
helm uninstall my-release

by default it would create 2 shard with 3 replica (includes the pv and pvc for all pods), and too bad it's still using zookeeper instead of clickhouse-keeper.

my-release-clickhouse-shard0-0   1/1  Running   0 
my-release-clickhouse-shard0-1   1/1  Running   0 
my-release-clickhouse-shard0-2   1/1  Running   0 
my-release-clickhouse-shard1-0   1/1  Running   0 
my-release-clickhouse-shard1-1   1/1  Running   0 
my-release-clickhouse-shard1-2   1/1  Running   0 
my-release-zookeeper-0           1/1  Running   0 
my-release-zookeeper-1           1/1  Running   0 
my-release-zookeeper-2           1/1  Running   0 


to access it locally you just need to port forward and connect using clickhouse-client program:

kubectl port-forward --namespace default svc/my-release-clickhouse 9001:9000 &
clickhouse-client --host localhost --port 9001 --user default --password $(kubectl get secret --namespace default my-release-clickhouse -o jsonpath="{.data.admin-password}" | base64 -d)

alternatively you can just exec into the pod and use clickhouse-client inside the pod:

kubectl exec -it my-release-clickhouse-shard1-2 sh
clickhouse-client # use password from above

you already can create distributed table and do stuff with it. For more information about bitnami charts you can read it here. If you want to customize your setup (not 2 shard 3 replica, I always prefer no shard but multiple replica, unless your data is massive that hard not to shard across multiple machines). You can clone the chart and the modify the configuration, then deploy it, something like this:

git clone --depth 1 git@github.com:bitnami/charts.git
vim charts/
bitnami/clickhouse/values.yaml
# ^ change shard=1 keeper.enabled=true zookeeper.enabled=false
helm dependency build charts/bitnami/clickhouse
helm install ch1
charts/bitnami/clickhouse

# update
helm upgrade ch1 charts/bitnami/clickhouse

it would create something like this:

ch1-clickhouse-shard0-0         1/1  Running   0
ch1-clickhouse-shard0-1         1/1  Running   0
ch1-clickhouse-shard0-2         1/1  Running   0

That's it for now, not sure how to run this in real kubernetes cluster (not minikube), especially making the pv/pvc to be put in multiple nodes.

2024-02-20

Dump/export Cassandra/BigQuery tables and import to Clickhouse

Today we're gonna dump Cassandra table and put it to Clickhouse. Cassandra is columnar database but use as OLTP since it have really good distributed capability (customizable replication factor, multi-cluster/region, clustered/partitioned by default -- so good for multitenant applications), but if we need to do analytics queries or some complex query, it became super sucks, even with ScyllaDB's materialized view (which only good for recap/summary). To dump Cassandra database, all you need to do just construct a query and use dsbulk, something like this:

./dsbulk unload -delim '|' -k "KEYSPACE1" \
   -query "SELECT col1,col2,col3 FROM table1" -c csv \
   -u 'USERNAME1' -p 'PASSWORD1' \
   -b secure-bundle.zip | tr '\\' '"' |
    gzip -9 > table1_dump_YYYYMMDD.csv.gz ;


tr command above used to unescape backslash, since dsbulk export csv not in proper format (\" not ""), after than you can just restore it by running something like this:

CREATE TABLE table1 (
    col1 String,
    col2 Int64,
    col3 UUID,
) ENGINE = ReplacingMergeTree()
ORDER BY (col1, col2);

SET format_csv_delimiter = '|';
SET input_format_csv_skip_first_lines = 1;

INSERT INTO table1
FROM INFILE 'table1_dump_YYYYMMDD.csv.gz'
FORMAT CSV;


BigQuery

 Similar to Clickhouse, BigQuery is one of the best analytical engine (because of unlimited compute and massively parallel storage), but it comes with cost, improper partitioning/clustering (even with proper one, because it's limited to only 1 column unlike Clickhouse that can do more) with large table will do a huge scan ($6.25 per TiB) and a lot of compute slot, if combined with materialized view or periodic query on cron, it would definitely kill your wallet. To dump from BigQuery all you need to do just create GCS (Google Cloud Storage) bucket then run some query something like this:

EXPORT DATA
  OPTIONS (
    uri = 'gs://
BUCKET1/table2_dump/1-*.parquet',
    format = 'PARQUET',
    overwrite = true
    --, compression = 'GZIP' -- import failed: ZLIB_INFLATE_FAILED
  )
AS (
  SELECT * FROM `dataset1.table2`
);

-- it's better to create snapshot table
-- if you do WHERE filter on above query, eg.
CREATE TABLE dataset1.table2_filtered_snapshot AS
  SELECT * FROM `
dataset1.table2` WHERE col1 = 'yourFilter';

Not using compression because it's failed to import, not sure why. The parquet files will be shown on your bucket, click on "Remove public access prevention", and allow it to be publicly available with gcloud command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET1 --member=allUsers --role=roles/storage.objectViewer
# remove-iam-policy-binding to undo this

 

 Then just restore it:

CREATE TABLE table2 (
          Col1 String,
          Col2 DateTime,
          Col3 Int32
) ENGINE = ReplacingMergeTree()
ORDER BY (Col1, Col2, Col3);

SET parallel_distributed_insert_select = 1;

INSERT INTO table2
SELECT Col1, Col2, Col3
FROM s3Cluster(
    'default',
    'https://storage.googleapis.com/BUCKET1/table2_dump/1-*.parquet',
    '', -- s3 access id, remove or leave empty if public
    '' -- s3 secret key,
remove or leave empty if public
);


Writing UDF for Clickhouse using Golang

Today we're going to create an UDF (User-defined Function) in Golang that can be run inside Clickhouse query, this function will parse uuid v1 and return timestamp of it since Clickhouse doesn't have this function for now. Inspired from the python version with TabSeparated delimiter (since it's easiest to parse), UDF in Clickhouse will read line by line (each row is each line, and each text separated with tab is each column/cell value):

package main
import (
    "bufio"
    "encoding/binary"
    "encoding/hex"
    "fmt"
    "os"
    "strings"
    "time"
)
func main() {
    scanner := bufio.NewScanner(os.Stdin)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        id, _ := FromString(scanner.Text())
        fmt.Println(id.Time())
    }
}
func (me UUID) Nanoseconds() int64 {
    time_low := int64(binary.BigEndian.Uint32(me[0:4]))
    time_mid := int64(binary.BigEndian.Uint16(me[4:6]))
    time_hi := int64((binary.BigEndian.Uint16(me[6:8]) & 0x0fff))
    return int64((((time_low) + (time_mid << 32) + (time_hi << 48)) - epochStart) * 100)
}
func (me UUID) Time() time.Time {
    nsec := me.Nanoseconds()
    return time.Unix(nsec/1e9, nsec%1e9).UTC()
}
// code below Copyright (C) 2013 by Maxim Bublis <b@codemonkey.ru>
// see https://github.com/satori/go.uuid
// Difference in 100-nanosecond intervals between
// UUID epoch (October 15, 1582) and Unix epoch (January 1, 1970).
const epochStart = 122192928000000000
// UUID representation compliant with specification
// described in RFC 4122.
type UUID [16]byte
// FromString returns UUID parsed from string input.
// Following formats are supported:
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
func FromString(input string) (u UUID, err error) {
    s := strings.Replace(input, "-", "", -1)
    if len(s) == 41 && s[:9] == "urn:uuid:" {
        s = s[9:]
    } else if len(s) == 34 && s[0] == '{' && s[33] == '}' {
        s = s[1:33]
    }
    if len(s) != 32 {
        err = fmt.Errorf("uuid: invalid UUID string: %s", input)
        return
    }
    b := []byte(s)
    _, err = hex.Decode(u[:], b)
    return
}
// Returns canonical string representation of UUID:
// xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
func (u UUID) String() string {
    return fmt.Sprintf("%x-%x-%x-%x-%x",
        u[:4], u[4:6], u[6:8], u[8:10], u[10:])
}


Compile and put it with proper owner and permission on /var/lib/clickhouse/user_scripts/uuid2timestr and create /etc/clickhouse-server/uuid2timestr_function.xml (must be have proper suffix) containing:

<functions>
    <function>
        <type>executable</type>
        <name>uuid2timestr</name>
        <return_type>
String</return_type>
        <argument>
            <type>String</type>
        </argument>
        <format>TabSeparated</format>
        <command>uuid2timestr</command>
        <lifetime>0</lifetime>
    </function>
</functions>


after that you can restart Clickhouse (sudo systemctl restart clickhouse-server or sudo clickhouse restart) depends on how you install it (apt or binary setup).

Usage


to make sure it's loaded, you can just find this line on the log:

<Trace> ExternalUserDefinedExecutableFunctionsLoader: Loading config file '/etc/clickhouse-server/uuid2timestr_function.xml

then just run a query using that function:

SELECT uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')

┌─uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')─┐
│ 2023-12-11 05:58:33.2391752 +0000 UTC                │
└──────────────────────────────────────────────────────┘

2024-01-31

OLLAMA with AMD GPU (ROCm)

Today we're gonna test ollama (just like previous article) with AMD GPU, to do this you'll need to run docker, for example using this docker compose file:

version: "3.7"

services:
  ollama:
    container_name: ollama
    image: ollama/ollama:0.1.22-rocm
    environment:
      HSA_OVERRIDE_GFX_VERSION: 10.3.0 # only if you are using 6600XT
    volumes:
      - /usr/share/ollama/.ollama:/root/.ollama # reuse existing model
      #- ./etc__resolv.conf:/etc/resolv.conf # if your dns sucks
    devices:
      - /dev/dri
      - /dev/kfd
    restart: unless-stopped

  ollama-webui:
    image: ghcr.io/ollama-webui/ollama-webui:main
    container_name: ollama-webui
    ports:
      - "3122:8080"
    volumes:
      - ./ollama-webui:/app/backend/data
    environment:
      - 'OLLAMA_API_BASE_URL=http://ollama:11434/api'
    restart: unless-stopped
 

To run it you just need to execute docker exec -it `docker ps | grep ollama/ollama | cut -f 1 -d ' '` bash

Make sure that you have already have ROCm installed

$ dpkg -l | grep rocm | cut -d ' ' -f 3
rocm-cmake
rocm-core
rocm-device-libs
rocm-hip-libraries
rocm-hip-runtime
rocm-hip-runtime-dev
rocm-hip-sdk
rocm-language-runtime
rocm-llvm
rocm-ocl-icd
rocm-opencl
rocm-opencl-runtime
rocm-smi-lib
rocminfo
$ cat /etc/apt/sources.list.d/* | grep -i 'rocm'
deb [arch=amd64 signed-by=/etc/apt/keyrings/rocm.gpg] https://repo.radeon.com/amdgpu/6.0/ubuntu jammy main
deb [arch=amd64 signed-by=/etc/apt/keyrings/rocm.gpg] https://repo.radeon.com/rocm/apt/6.0 jammy main
deb [arch=amd64 signed-by=/etc/apt/keyrings/rocm.gpg] https://repo.radeon.com/rocm/apt/6.0 jammy main


For example in previous article the same statement here would took around 60s, but using GPU, only took 20-30s (3x-2x faster):

time ollama run codellama 'show me inplace mergesort using golang'

Or from outside: 

time docker exec -it `docker ps | grep ollama/ollama | cut -f 1 -d ' '` ollama run codellama 'show me inplace mergesort using golang'

 

Here is an example of how to implement an in-place merge sort algorithm in Go:
```
package main

import (
        "fmt"
)

func main() {
        // Test data
        arr := []int{5, 2, 8, 3, 1, 6, 4}

        // Sort the array using an in-place merge sort algorithm
        inPlaceMergeSort(arr)

        fmt.Println("Sorted array:", arr)
}

func inPlaceMergeSort(arr []int) {
        if len(arr) <= 1 {
                return
        }

        // Split the array into two halves
        mid := len(arr) / 2
        left := arr[:mid]
        right := arr[mid:]

        // Recursively sort each half
        inPlaceMergeSort(left)
        inPlaceMergeSort(right)

        // Merge the two sorted halves into a single array
        mergeInPlace(arr, left, right)
}

func mergeInPlace(arr []int, left, right []int) {
        i := 0
        j := 0
        k := 0

        for i < len(left) && j < len(right) {
                if left[i] <= right[j] {
                        arr[k] = left[i]
                        i++
                } else {
                        arr[k] = right[j]
                        j++
                }
                k++
        }

        // Copy the remaining elements from either left or right array to arr
        for i < len(left) {
                arr[k] = left[i]
                i++
                k++
        }

        for j < len(right) {
                arr[k] = right[j]
                j++
                k++
        }
}
```
This implementation of in-place merge sort uses a divide and conquer approach to sort the array. It first splits the array into two halves, sorts
each half recursively using itself, and then merges the two sorted halves into a single array. The `mergeInPlace` function is used to merge the two
sorted halves in-place, without requiring any additional memory.


real    0m30.528s
CPU: 0.02s      Real: 21.07s    RAM: 25088KB

NOTE: the answer from codellama above are wrong, since it's not in-place merge sort, also even so it's just normal merge sort using slice would overwrite the underlaying array causing wrong result.

You can also visit http://localhost:3122 for web UI.