2024-06-26

How to use Google Pub/Sub Locally with Golang

As much as I didn't like Google PubSub (because of it's inefficiency), especially compared to normal database that forced to use as queue (eg. Tarantool can do 200k events/s, Clickhouse can do 900k events/s, without automatic fan out). Here's how you can use it using Go. First you must create a docker compose file:


services:
  testpubsub:
    image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest # 482.0.0
    command: /usr/bin/gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
    ports:
      - "8085:8085"
# run with: docker compose up


The just run this code that stolen and modified a bit to also publish an event from this article:


package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
)

const (
    projectID = "your-project-id"
    topicID   = "your-topic-id"
    subName   = "your-subscription-name"
)

type Message struct {
    Data string `json:"data"`
}

func main() {
    os.Setenv(`PUBSUB_EMULATOR_HOST`, `localhost:8085`)
    ctx := context.Background()

    // Create a Pub/Sub client
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // Create a topic if it doesn't already exist
    topic := client.Topic(topicID)
    ok, err := topic.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if topic exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateTopic(ctx, topicID); err != nil {
            log.Fatalf("Failed to create topic: %v", err)
        }
        log.Printf("Topic %s created.\n", topicID)
    }

    // Create a subscription to the topic "topic A"
    sub := client.Subscription(subName)
    ok, err = sub.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if subscription exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
            Topic: topic,
        }); err != nil {
            log.Fatalf("Failed to create subscription: %v", err)
        }
        log.Printf("Subscription %s created.\n", subName)
    }

    go func() {
        time.Sleep(2 * time.Second)
        // publish some event
        topic.Publish(ctx, &pubsub.Message{
            Data: []byte(`{"data":"hello world"}`),
        })
    }()

    // Start consuming messages from the subscription
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        // Unmarshal the message data into a struct
        var m Message
        if err := json.Unmarshal(msg.Data, &m); err != nil {
            log.Printf("Failed to unmarshal message data: %v", err)
            msg.Nack()
            return
        }

        // Print the message data
        fmt.Printf("Received message: %s\n", m.Data)

        // Acknowledge the message
        msg.Ack()
    })
    if err != nil {
        log.Fatalf("Failed to receive messages: %v", err)
    }

    // Gracefully shutdown the Pub/Sub client
    if err := client.Close(); err != nil {
        log.Fatalf("Failed to close client: %v", err)
    }
}

That's it, you don't need access to internet to test Google Pub/Sub, it would just work with docker compose.

2024-06-13

Backup and Restore Cassandra Database for Development

Last time we're dumping from Cassandra and restore it to Clickhouse for proper analytics. To do that we need cqlsh and cassandradump

pip install -U cqlsh
pip install cassandra-driver

# list all tables
tables=`cqlsh -k keyspace1 -e 'describe keyspace' | grep TABLE | cut -d '.' -f 2 | cut -d ' ' -f 1`

# run copy to file
mkdir backup
echo $tables | while read x; do time cqlsh -k keyspace1 -e "COPY $x TO STDOUT WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" | grep -v 'WARNING: cqlsh'
| sed 's|\\\\|\\|g' > backup/$x.csv  ; done

# run copy from file
 echo $tables | while read x; do time cat backup/$x.csv
| sed 's|\\\\|\\|g' | cqlsh -k keyspace1 -e "COPY $x FROM stdin WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" ; done


That's it, you can update the rows before restoring for your integration test. There's a weird on Cassandra's CSV when backslash being escaped with backslash that cannot be removed even with tr command (probably because tr cant be more than 1 characters), it restored as more backslash and NULL imported as zero.

2024-03-23

Clickhouse in Kubernetes / Minikube using Bitnami Helm Charts

Normally for my projects I use docker-compose + normal bind persistence directory to avoid data loss for database (and keep it fast and simple to maintain), because it's simpler to upgrade version than normal installation (looking at that popular database that annoying to upgrade the major version that require a lot of copying) and the network/disk/cpu overhead also unnoticeable. Today we're gonna try to setup Clickhouse using minikube. Assuming you already installed minikube, all you need to do is install helm (k8s package manager).

curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get-helm-3 > install_helm.sh && bash install_helm.sh
helm install my-release oci://registry-1.docker.io/bitnamicharts/clickhouse

# to check again the deploy
helm status my-release

# to check configurable options
helm show all oci://registry-1.docker.io/bitnamicharts/clickhouse

# to uninstall
helm uninstall my-release

by default it would create 2 shard with 3 replica (includes the pv and pvc for all pods), and too bad it's still using zookeeper instead of clickhouse-keeper.

my-release-clickhouse-shard0-0   1/1  Running   0 
my-release-clickhouse-shard0-1   1/1  Running   0 
my-release-clickhouse-shard0-2   1/1  Running   0 
my-release-clickhouse-shard1-0   1/1  Running   0 
my-release-clickhouse-shard1-1   1/1  Running   0 
my-release-clickhouse-shard1-2   1/1  Running   0 
my-release-zookeeper-0           1/1  Running   0 
my-release-zookeeper-1           1/1  Running   0 
my-release-zookeeper-2           1/1  Running   0 


to access it locally you just need to port forward and connect using clickhouse-client program:

kubectl port-forward --namespace default svc/my-release-clickhouse 9001:9000 &
clickhouse-client --host localhost --port 9001 --user default --password $(kubectl get secret --namespace default my-release-clickhouse -o jsonpath="{.data.admin-password}" | base64 -d)

alternatively you can just exec into the pod and use clickhouse-client inside the pod:

kubectl exec -it my-release-clickhouse-shard1-2 sh
clickhouse-client # use password from above

you already can create distributed table and do stuff with it. For more information about bitnami charts you can read it here. If you want to customize your setup (not 2 shard 3 replica, I always prefer no shard but multiple replica, unless your data is massive that hard not to shard across multiple machines). You can clone the chart and the modify the configuration, then deploy it, something like this:

git clone --depth 1 git@github.com:bitnami/charts.git
vim charts/
bitnami/clickhouse/values.yaml
# ^ change shard=1 keeper.enabled=true zookeeper.enabled=false
helm dependency build charts/bitnami/clickhouse
helm install ch1
charts/bitnami/clickhouse

# update
helm upgrade ch1 charts/bitnami/clickhouse

it would create something like this:

ch1-clickhouse-shard0-0         1/1  Running   0
ch1-clickhouse-shard0-1         1/1  Running   0
ch1-clickhouse-shard0-2         1/1  Running   0

That's it for now, not sure how to run this in real kubernetes cluster (not minikube), especially making the pv/pvc to be put in multiple nodes.