programming: the action or process of writing computer programs. | rants: speak or shout at length in a wild, [im]passioned way.
2024-06-26
How to use Google Pub/Sub Locally with Golang
services:
testpubsub:
image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest # 482.0.0
command: /usr/bin/gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
ports:
- "8085:8085"
# run with: docker compose up
The just run this code that stolen and modified a bit to also publish an event from this article:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
)
const (
projectID = "your-project-id"
topicID = "your-topic-id"
subName = "your-subscription-name"
)
type Message struct {
Data string `json:"data"`
}
func main() {
os.Setenv(`PUBSUB_EMULATOR_HOST`, `localhost:8085`)
ctx := context.Background()
// Create a Pub/Sub client
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// Create a topic if it doesn't already exist
topic := client.Topic(topicID)
ok, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if topic exists: %v", err)
}
if !ok {
if _, err := client.CreateTopic(ctx, topicID); err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
log.Printf("Topic %s created.\n", topicID)
}
// Create a subscription to the topic "topic A"
sub := client.Subscription(subName)
ok, err = sub.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if subscription exists: %v", err)
}
if !ok {
if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: topic,
}); err != nil {
log.Fatalf("Failed to create subscription: %v", err)
}
log.Printf("Subscription %s created.\n", subName)
}
go func() {
time.Sleep(2 * time.Second)
// publish some event
topic.Publish(ctx, &pubsub.Message{
Data: []byte(`{"data":"hello world"}`),
})
}()
// Start consuming messages from the subscription
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Unmarshal the message data into a struct
var m Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
log.Printf("Failed to unmarshal message data: %v", err)
msg.Nack()
return
}
// Print the message data
fmt.Printf("Received message: %s\n", m.Data)
// Acknowledge the message
msg.Ack()
})
if err != nil {
log.Fatalf("Failed to receive messages: %v", err)
}
// Gracefully shutdown the Pub/Sub client
if err := client.Close(); err != nil {
log.Fatalf("Failed to close client: %v", err)
}
}
That's it, you don't need access to internet to test Google Pub/Sub, it would just work with docker compose.
2024-06-13
Backup and Restore Cassandra Database for Development
Last time we're dumping from Cassandra and restore it to Clickhouse for proper analytics. To do that we need cqlsh and cassandradump
pip install -U cqlsh
pip install cassandra-driver
# list all tables
tables=`cqlsh -k keyspace1 -e 'describe keyspace' | grep TABLE | cut -d '.' -f 2 | cut -d ' ' -f 1`
# run copy to file
mkdir backup
echo $tables | while read x; do time cqlsh -k keyspace1 -e "COPY $x TO STDOUT WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" | grep -v 'WARNING: cqlsh' | sed 's|\\\\|\\|g' > backup/$x.csv ; done
# run copy from file
echo $tables | while read x; do time cat backup/$x.csv | sed 's|\\\\|\\|g' | cqlsh -k keyspace1 -e "COPY $x FROM stdin WITH DELIMITER='|' AND HEADER=TRUE AND NULL=NULL" ; done
That's it, you can update the rows before restoring for your integration test. There's a weird on Cassandra's CSV when backslash being escaped with backslash that cannot be removed even with tr command (probably because tr cant be more than 1 characters), it restored as more backslash and NULL imported as zero.