Showing posts with label google pubsub. Show all posts
Showing posts with label google pubsub. Show all posts

2024-06-26

How to use Google Pub/Sub Locally with Golang

As much as I didn't like Google PubSub (because of it's inefficiency), especially compared to normal database that forced to use as queue (eg. Tarantool can do 200k events/s, Clickhouse can do 900k events/s, without automatic fan out). Here's how you can use it using Go. First you must create a docker compose file:


services:
  testpubsub:
    image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest # 482.0.0
    command: /usr/bin/gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
    ports:
      - "8085:8085"
# run with: docker compose up


The just run this code that stolen and modified a bit to also publish an event from this article:


package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
)

const (
    projectID = "your-project-id"
    topicID   = "your-topic-id"
    subName   = "your-subscription-name"
)

type Message struct {
    Data string `json:"data"`
}

func main() {
    os.Setenv(`PUBSUB_EMULATOR_HOST`, `localhost:8085`)
    ctx := context.Background()

    // Create a Pub/Sub client
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // Create a topic if it doesn't already exist
    topic := client.Topic(topicID)
    ok, err := topic.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if topic exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateTopic(ctx, topicID); err != nil {
            log.Fatalf("Failed to create topic: %v", err)
        }
        log.Printf("Topic %s created.\n", topicID)
    }

    // Create a subscription to the topic "topic A"
    sub := client.Subscription(subName)
    ok, err = sub.Exists(ctx)
    if err != nil {
        log.Fatalf("Failed to check if subscription exists: %v", err)
    }
    if !ok {
        if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
            Topic: topic,
        }); err != nil {
            log.Fatalf("Failed to create subscription: %v", err)
        }
        log.Printf("Subscription %s created.\n", subName)
    }

    go func() {
        time.Sleep(2 * time.Second)
        // publish some event
        topic.Publish(ctx, &pubsub.Message{
            Data: []byte(`{"data":"hello world"}`),
        })
    }()

    // Start consuming messages from the subscription
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        // Unmarshal the message data into a struct
        var m Message
        if err := json.Unmarshal(msg.Data, &m); err != nil {
            log.Printf("Failed to unmarshal message data: %v", err)
            msg.Nack()
            return
        }

        // Print the message data
        fmt.Printf("Received message: %s\n", m.Data)

        // Acknowledge the message
        msg.Ack()
    })
    if err != nil {
        log.Fatalf("Failed to receive messages: %v", err)
    }

    // Gracefully shutdown the Pub/Sub client
    if err := client.Close(); err != nil {
        log.Fatalf("Failed to close client: %v", err)
    }
}

That's it, you don't need access to internet to test Google Pub/Sub, it would just work with docker compose.

2023-10-07

NATS: at-most once Queue / simpler networking

As you might already know in the past article, I'm a huge fan of NATS, NATS is one of the fastest one at-most-once delivery non-persistent message broker. Unlike rabbitmq/lavinmq/kafka/redpanda, NATS is not a persistent queue, the persistent version is called NATS-Jetstream (but it's better to use rabbitmq/lavinmq/kafka/redpanda since they are more popular and have more integration than NATS-Jetstream, or use cloud provider's like GooglePubSub/AmazonSQS.

Features of NATS:

1. embeddable, you can embed nats directly on go application, so don't have to run a dedicated instance

ns, err := server.NewServer(opts)
go ns.Start()

if !ns.ReadyForConnections(4 * time.Second) {
      log.Fatal("not ready for connections")
}
nc, err := nats.Connect(ns.ClientURL())


2. high performance, old benchmark (not apple-to-apple because the rest mostly have persistence (at -least-once delivery) by default)
3. wildcard topic, topic can contain wildcard, so you can subscribe topic like foo.*.bar (0-9a-zA-Z, separate with dot . for subtopic)
4. autoreconnect (use Reconnect: -1), unlike some amqp client-library that have to handle reconnection manually
5. built in top-like monitoring nats-top -s serverHostname -m port

Gotchas

Since NATS does not support persistence, if there's no subscriber, message will lost (unlike other message queue), so it behaves like Redis PubSub, not like Redis Queue, just the difference is Redis PubSub is fan-out/broadcast only, while nats can do both broadcast or non-persistent queue.

Use Case of NATS

1. publish/subscribe, for example to give signal to another services. this assume broadcast, since there's no queue.
Common pattern for this for example, there is API for List items changed after certain updatedAt, and the service A want to give signal to another service B that there's new/updated items, so A just need to broadcast this message to NATS, and any subscriber of that topic can get the signal to fetch from A when getting a signal, and periodically as fallback.

// on publisher
err := nc.Publish(strTopic, bytMsg)

// on subscsriber
_, err := nc.Subscribe(strTopic, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
})

// sync version
sub, err := nc.SubscribeSync(strTopic)
for {
  msg, err := sub.NextMsg(5 * time.Second)
  if err != nil { // nats.ErrTimeout if no message
    break
  }
  _ = msg.Subject
  _ = msg.Data
}

 

2. request/reply, for example to load-balance requests/autoscaling, so you can deploy anywhere, and the "worker" will catch up
we can use QueueSubscribe with same queueName to make sure only 1 worker handling per message. if you have 2 different queueName, the same message will be processed by 2 different worker that subscribe to the different queueName.

// on requester
msg, err := nc.Request(strTopic, bytMsg, 5*time.Second)
_ = msg == []byte("reply"} // from worker/responder

// on worker/responder
_, err := nc.QueueSubscribe(strTopic, queueName, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
    m.Respond([]byte("reply")) // send back to requester
})

You can see more examples here, and example how to add otel trace on NATS here, and how to create mtls here.