Showing posts with label nats. Show all posts
Showing posts with label nats. Show all posts

2023-10-07

NATS: at-most once Queue / simpler networking

As you might already know in the past article, I'm a huge fan of NATS, NATS is one of the fastest one at-most-once delivery non-persistent message broker. Unlike rabbitmq/lavinmq/kafka/redpanda, NATS is not a persistent queue, the persistent version is called NATS-Jetstream (but it's better to use rabbitmq/lavinmq/kafka/redpanda since they are more popular and have more integration than NATS-Jetstream, or use cloud provider's like GooglePubSub/AmazonSQS.

Features of NATS:

1. embeddable, you can embed nats directly on go application, so don't have to run a dedicated instance

ns, err := server.NewServer(opts)
go ns.Start()

if !ns.ReadyForConnections(4 * time.Second) {
      log.Fatal("not ready for connections")
}
nc, err := nats.Connect(ns.ClientURL())


2. high performance, old benchmark (not apple-to-apple because the rest mostly have persistence (at -least-once delivery) by default)
3. wildcard topic, topic can contain wildcard, so you can subscribe topic like foo.*.bar (0-9a-zA-Z, separate with dot . for subtopic)
4. autoreconnect (use Reconnect: -1), unlike some amqp client-library that have to handle reconnection manually
5. built in top-like monitoring nats-top -s serverHostname -m port

Gotchas

Since NATS does not support persistence, if there's no subscriber, message will lost (unlike other message queue), so it behaves like Redis PubSub, not like Redis Queue, just the difference is Redis PubSub is fan-out/broadcast only, while nats can do both broadcast or non-persistent queue.

Use Case of NATS

1. publish/subscribe, for example to give signal to another services. this assume broadcast, since there's no queue.
Common pattern for this for example, there is API for List items changed after certain updatedAt, and the service A want to give signal to another service B that there's new/updated items, so A just need to broadcast this message to NATS, and any subscriber of that topic can get the signal to fetch from A when getting a signal, and periodically as fallback.

// on publisher
err := nc.Publish(strTopic, bytMsg)

// on subscsriber
_, err := nc.Subscribe(strTopic, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
})

// sync version
sub, err := nc.SubscribeSync(strTopic)
for {
  msg, err := sub.NextMsg(5 * time.Second)
  if err != nil { // nats.ErrTimeout if no message
    break
  }
  _ = msg.Subject
  _ = msg.Data
}

 

2. request/reply, for example to load-balance requests/autoscaling, so you can deploy anywhere, and the "worker" will catch up
we can use QueueSubscribe with same queueName to make sure only 1 worker handling per message. if you have 2 different queueName, the same message will be processed by 2 different worker that subscribe to the different queueName.

// on requester
msg, err := nc.Request(strTopic, bytMsg, 5*time.Second)
_ = msg == []byte("reply"} // from worker/responder

// on worker/responder
_, err := nc.QueueSubscribe(strTopic, queueName, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
    m.Respond([]byte("reply")) // send back to requester
})

You can see more examples here, and example how to add otel trace on NATS here, and how to create mtls here.

2022-04-04

Automatic Load Balancer Registration/Deregistration with NATS or FabioLB

Today we're gonna test 2 alternative for automatic load balancing (previously I always use Caddy or NginX (because most of my projects is single server -- the bottleneck is always the database not the backend/compute part), with manual reverse proxy configuration, but today we're gonna test 2 possible way to high-availability load balance strategy (without kubernetes of course), first is using NATS, second one is using standard load balancer, in this case FabioLB.

To use NATS, we're gonna use this strategy:
first one we deploy is the our custom reverse proxy, that should able to convert any query string, form body with any kind of content-type, and any header if needed, we can use any serialization format (json, msgpack, protobuf, etc), but in this case we're just gonna use normal string, we call this service "apiproxy". The apiproxy will send the serialized payload (from map/object) into NATS using request-reply mechanism. Another service is our backend "worker"/handler, that could be anything, but in this case is our real handler that would contain our business logic, so we need to subscribe and return a reply to the apiproxy and it would be deserialized back to the client with any serizaliation format and protocol (gRPC/Websocket/HTTP-REST/JSONP/etc). Here's the benchmark result of normal Fiber without any proxy, apiproxy-nats-worker with single nats vs multi nats instance

# no proxy
go run main.go apiserver
hey -n 1000000 -c 255 http://127.0.0.1:3000
  Average:      0.0011 secs
  Requests/sec: 232449.1716

# single nats
go run main.go apiproxy
go run main.go # worker
hey -n 1000000 -c 255 http://127.0.0.1:3000
  Average:      0.0025 secs
  Requests/sec: 100461.5866

# 2 worker
  Average:      0.0033 secs
  Requests/sec: 76130.4079

# 4 worker
  Average:      0.0051 secs
  Requests/sec: 50140.6288

# limit the apiserver CPU
GOMAXPROCS=2 go run main.go apiserver
  Average:      0.0014 secs
  Requests/sec: 184234.0106

# apiproxy 2 core
# 1 worker 2 core each
  Average:      0.0025 secs
  Requests/sec: 103007.4516

# 2 worker 2 core each
  Average:      0.0029 secs
  Requests/sec: 87522.6801

# 4 worker 2 core each
  Average:      0.0037 secs
  Requests/sec: 67714.5851

# seems that the bottleneck is spawning the producer's NATS
# spawning 8 connections using round-robin

# 1 worker 2 core each
  Average:      0.0021 secs
  Requests/sec: 121883.4324

# 4 worker 2 core each
  Average:      0.0030 secs
  Requests/sec: 84289.4330

# seems also the apiproxy is hogging all the CPU cores
# limiting to 8 core for apiproxy
# now synchronous handler changed into async/callback version
GOMAXPROCS=8 go run main.go apiserver

# 1 worker 2 core each
  Average:      0.0017 secs
  Requests/sec: 148298.8623

# 2 worker 2 core each
  Average:      0.0017 secs
  Requests/sec: 143958.4056

# 4 worker 2 core each
  Average:      0.0029 secs
  Requests/sec: 88447.5352

# limiting the NATS to 4 core using go run on the source
# 1 worker 2 core each
  Average:      0.0013 secs
  Requests/sec: 194787.6327

# 2 worker 2 core each
  Average:      0.0014 secs
  Requests/sec: 176702.0119

# 4 worker 2 core each
  Average:      0.0022 secs
  Requests/sec: 116926.5218

# same nats core count, increase worker core count
# 1 worker 4 core each
  Average:      0.0013 secs
  Requests/sec: 196075.4366

# 2 worker 4 core each
  Average:      0.0014 secs
  Requests/sec: 174912.7629

# 4 worker 4 core each
  Average:      0.0021 secs
  Requests/sec: 121911.4473 --> see update below


Could be better if it was tested in multiple server, but it seems the bottleneck is on NATS connection when have many subscriber, they could not scale linearly (16-66% overhead for a single API proxy) IT's A BUG ON MY SIDE, SEE UPDATE BELOW. Next we're gonna try FabioLB with Consul, Consul used for service registry (it's a synchronous-consistent "database" like Zookeeper or Etcd). To install all of it use this commands:

# setup:
curl -fsSL https://apt.releases.hashicorp.com/gpg | sudo apt-key add -
sudo apt-add-repository "deb [arch=amd64] https://apt.releases.hashicorp.com $(lsb_release -cs) main"
sudo apt install consul
go install github.com/fabiolb/fabio@latest

# start:
sudo consul agent -dev --data-dir=/tmp/consul
fabio
go run main.go -addr 172.17.0.1:5000 -name svc-a -prefix /foo -consul 127.0.0.1:8500

# benchmark:
# without fabio
  Average:      0.0013 secs
  Requests/sec: 197047.9124

# with fabio 1 backend
  Average:      0.0038 secs
  Requests/sec: 65764.9021

# with fabio 2 backend
go run main.go -addr 172.17.0.1:5001 -name svc-a -prefix /foo -consul 127.0.0.1:8500

# the bottleneck might be the cores, so we limit the cores to 2 for each worker
# with fabio 1 backend 2 core each
  Average:      0.0045 secs
  Requests/sec: 56339.5518

# with fabio 2 backend 2 core each
  Average:      0.0042 secs
  Requests/sec: 60296.9714

# what if we limit also the fabio
GOMAXPROCS=8 fabio

# with fabio 8 core, 1 backend 2 core each
  Average:      0.0042 secs
  Requests/sec: 59969.5206

# with fabio 8 core, 2 backend 2 core each
  Average:      0.0041 secs
  Requests/sec: 62169.2256

# with fabio 8 core, 4 backend 2 core each
  Average:      0.0039 secs
  Requests/sec: 64703.8253

All CPU cores utilized around 50% of 32-core server 128GB RAM, can't find which part the bottleneck for now, but for sure both strategy have around 16% vs 67% overhead compared for non proxies (which is make sense because adding more layer will add more transport and more things to copy/transfer and transform/serialize-deserialize). The code used in this benchmark is here, on 2022mid directory, and the code for fabio-consul registration copied from ebay's github repository.

Why even we need to do this? If we're using api gateway pattern (one of the pattern that being used in my past company, but with Kubernetes on worker part), we could deploy independently and communicate between service using the gateway (proxy) without knowing the IP address or domain name of the service itself, as long as it have proper route and payload it can be handled wherever the service being deployed. What if you want to do canary or blue green deployment? you can just register a handler in nats or consul with different route name (especially for communication between services, not public to service), and wait for all traffic to be moved there before killing previous deployment.

So what should you choose? both strategy requires 3 moving part (apiproxy-nats-worker, fabio-consul-worker) but NATS strategy simpler in the development and can give better performance (especially if you make the apiproxy to be as flexible as possible), but it needs to have better serialization, since in this benchmark the serialization not measured, if you need better performance on serialization you must use codegen, which may require you to deploy 2 times (one for apiproxy, one for worker, unless you split the raw response meta with jsonparser or use map only for apiproxy). FabioLB strategy have more features, also you can use consul for service discovery (contacting other services directly by name without have to go thru FabioLB). NATS strategy have some benefit in terms of security, which is the NATS cluster can be inside DMZ, and worker can be on the different subnet without ability to connect each other and it would still works, where if you use consul to connect directly to another service, they should have route or connection to access each other. The bad part about NATS is that you should not use it for file upload, or it would hogging a lot of resource, so it should handled by apiproxy directly, then the reference of the uploaded file should be forwarded as payload to NATS. You can check NATS traffic statistics using nats-top.

What's next? Maybe we can try traefik, which is a service registry combined with load balancer in one binary, it can also use consul.

UPDATE: by changing the code from Subscribe (broadcast/fan-out) to QueueSubscribe (load balance), it have similar performance on 1/2/4 subscribers, so we can NATS for high availability/fault tolerance in api gateway pattern with cost of 16% overhead.

TL:DR

no LB: 232K rps
-> LB with NATS request-reply: 196K rps (16% overhead)
no LB: 197K rps
-> LB with Fabio+Consul: 65K rps (67% overhead)