Showing posts with label redpanda. Show all posts
Showing posts with label redpanda. Show all posts

2023-10-07

NATS: at-most once Queue / simpler networking

As you might already know in the past article, I'm a huge fan of NATS, NATS is one of the fastest one at-most-once delivery non-persistent message broker. Unlike rabbitmq/lavinmq/kafka/redpanda, NATS is not a persistent queue, the persistent version is called NATS-Jetstream (but it's better to use rabbitmq/lavinmq/kafka/redpanda since they are more popular and have more integration than NATS-Jetstream, or use cloud provider's like GooglePubSub/AmazonSQS.

Features of NATS:

1. embeddable, you can embed nats directly on go application, so don't have to run a dedicated instance

ns, err := server.NewServer(opts)
go ns.Start()

if !ns.ReadyForConnections(4 * time.Second) {
      log.Fatal("not ready for connections")
}
nc, err := nats.Connect(ns.ClientURL())


2. high performance, old benchmark (not apple-to-apple because the rest mostly have persistence (at -least-once delivery) by default)
3. wildcard topic, topic can contain wildcard, so you can subscribe topic like foo.*.bar (0-9a-zA-Z, separate with dot . for subtopic)
4. autoreconnect (use Reconnect: -1), unlike some amqp client-library that have to handle reconnection manually
5. built in top-like monitoring nats-top -s serverHostname -m port

Gotchas

Since NATS does not support persistence, if there's no subscriber, message will lost (unlike other message queue), so it behaves like Redis PubSub, not like Redis Queue, just the difference is Redis PubSub is fan-out/broadcast only, while nats can do both broadcast or non-persistent queue.

Use Case of NATS

1. publish/subscribe, for example to give signal to another services. this assume broadcast, since there's no queue.
Common pattern for this for example, there is API for List items changed after certain updatedAt, and the service A want to give signal to another service B that there's new/updated items, so A just need to broadcast this message to NATS, and any subscriber of that topic can get the signal to fetch from A when getting a signal, and periodically as fallback.

// on publisher
err := nc.Publish(strTopic, bytMsg)

// on subscsriber
_, err := nc.Subscribe(strTopic, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
})

// sync version
sub, err := nc.SubscribeSync(strTopic)
for {
  msg, err := sub.NextMsg(5 * time.Second)
  if err != nil { // nats.ErrTimeout if no message
    break
  }
  _ = msg.Subject
  _ = msg.Data
}

 

2. request/reply, for example to load-balance requests/autoscaling, so you can deploy anywhere, and the "worker" will catch up
we can use QueueSubscribe with same queueName to make sure only 1 worker handling per message. if you have 2 different queueName, the same message will be processed by 2 different worker that subscribe to the different queueName.

// on requester
msg, err := nc.Request(strTopic, bytMsg, 5*time.Second)
_ = msg == []byte("reply"} // from worker/responder

// on worker/responder
_, err := nc.QueueSubscribe(strTopic, queueName, func(m *nats.Msg) {
    _ = m.Subject
    _ = m.Data
    m.Respond([]byte("reply")) // send back to requester
})

You can see more examples here, and example how to add otel trace on NATS here, and how to create mtls here.

2022-04-01

Georeplicable Architecture Tech Stacks

Since I got a use case for geo-replicable (multiple datacenters around the world), I need to create metrics and billing and log aggregator that survives even when multiple datacenter is down. So we need to create an architecture that the each cluster of databases in datacenter can give correct answer on their own, the layers are:

1. Cache Layer

2. Source of Immediate Truth (OLTP) Layer

3. Source of Event Truth Layer (either CDC or subscribable events or both)

4. Metrics and Analytics (OLAP) Layer

For single datacenter I might choose Tarantool or Aerospike (too bad if you want to store more than your RAM you must use paid version) or Redis (master-slave) for cache layer, especially if the DNS is smart enough to route the client's request to nearest datacenter.

Tarantool can also be source of immediate truth, but the problem is it should be master-slave replication so it could be fault tolerant, but it seems there might have to do manual intervention to fallback to promote slave node as master and reroute other slave to recognize new master?

So the other choice is either TiDB (for consistent use case, lower write rate, more complex query use case) or ScyllaDB (for partition tolerant use case, higher write rate). Both are good in terms of availability. TiDB's TiFlash also good for analytics use case.

For source of event truth, we can use RedPanda for pubsub/MQ use cases, and Debezium (that requires Kafka/RedPanda) for change data capture from ScyllaDB, or TiCDC for TiDB to stream to RedPanda.

Lastly for analytics, we can use Clickhouse, also to store all structured logs that can be easily queried, or can also be Loki. For metrics might aggregate from RedPanda using MaterializeIO (too bad that cluster is paid).

So.. what are the combinations possible?

1. Tarantool (manual counter for metrics) + Clickhouse (manual publish logs and analytics), this one good only for single location/datacenter, unless the clients are all can hit proper server location (like game servers, or smart CDNs)

2. same as #1 but with RedPanda if have multiple service, all logs and events published thru RedPanda

3. Aerospike/Tarantool/Redis (manual counter for metrics) + TiDB + TiFlash (analytics) + Loki for logs

4. Aerospike/Tarantool/Redis (manual counter for metrics) + TIDB + TiCDC + ClickHouse (for logs and analytics)

5. Aerospike/Tarantool/Redis (cache only) + TiDB + TiFlash (analytics) + TiCDC + MaterializeIO (for metrics) + Loki (logs)

6. Aerospike/Tarantool/Redis (cache only) + TiDB + TiCDC + Clickhouse (analytics and logs) + MaterializeIO (for metrics)

7. Aerospike/Tarantool/Redis (cache only) + ScyllaDB + Debezium + RedPanda + Clickhouse (analytics and logs) + MaterializeIO (for metrics) 

for number #5-#7 you can remove the Aerospike/Tarantool part if no need to cache (consistency matters, and you have a very large cluster that can handle peaks).

wait, why don't you include full-text search use case? '__') ok, we can use Debezium that publish to ElasticSearch (might be overkill), or manual publish to TypeSense or MeiliSearch.

That's only for data part of the cluster, what about computation and presentation layer (backend/frontend)?

Backend of course I will use Golang or C#, frontend either Svelte (web only) or Flutter (mobile and web), Unity3D (game).

For object storage (if locally I would use MinIO) and CDN (CloudFlare for example), you can see my previous post.

So why you choose those?

1. Tarantool, one of the fastest in-mem, like 200K wps/qps but the cons I already mentioned above, the performance similar to Redis, but this one supports SQL.

2. Aerospike, also one of the fastest in-mem, like also 200K wps/qps last time I checked, also can do master-master replication, if I'm not mistaken, limited to 4 nodes for free verrsion, can set the replication factor, but see other cons I mentioned above.

3. TiDB, one newsql with automatic rebalance (one node died, and it would still works fast), can do around 30-50K single node last time I benchmarked, their benchmark mostly shows 30K-40K rps for writes, 120K rps for mixed read-write, 330K rps for read-only multi nodes benchmark, need more space? add more TiKV instance, need more compute? add more TiDB instance, need to do faster analytics queries? add TiFlash instance. But.. the bad part is, so many moving parts, you have to deploy TiDB (query engine), TiKV (storage), and PD (placement driver), also TiCDC and TiFlash if you need those too, also I'm not sure how it would perform at multi-DC use-cases. What's the managed alternative? AlloyDB-PostgreSQL (GCP) or TiDB-Cloud (AWS)

4. ScyllaDB, is faster version of Cassandra, most of benchmark can do 120K-400K insert and queries per second, one of the best database for mutli-DC, each keyspace can be set whether should replicate in which datacenter, how many the replication factor, consistency controlled in client-side, also can manage multiple view since 2 years ago (materialized view), so we don't have to create and maintain multiple table manually for each query pattern. What's the managed alternative? ScyllaDB-Cloud

5. RedPanda, is faster version of Kafka, last time I checked, one instance can receive 800K events per seconds, and publishes 4 millions events per second.

6. Clickhouse, is one of the best analytics database, can do a 1M batched ingestion per second in single node, can do complex queries really fast under a second (but depends on how big your data, but kind of query that if you did it in normal RDBMS would took minutes), the cons is one node can only handle 100 queries concurrently.

7. MaterializeIO, is like ksqldb but written in Rust, haven't checked the performance but they claimed they perform as fast as Redis.

What's other alternative? Yugabyte looks ok, especially the YCQL part that works like Cassandra/ScyllaDB. Especially yugabyte seems to be combining Redis, Postgres, and Cassandra in one deployment, but I like TiDB more because last time I checked, I need to config something to make it writable when one node died.

Proxy and load balancer? Caddy, Traefik, FabioLB (with Consul), or NATS (eg. 3 load balancer/custom api gateway that deployed in front, then it would serialize request to NATS inside DMZ, the worker/handler will receive that and return response that deserialized back to the loadbalancer/api gateway, that way load balancer doesn't need to know how exactly how many worker/handler, services can also communicate synchronously thru NATS without knowing each other service's IP address, and the worker/handler part can be scaled independently)

Deployment? Nomad, just rsync it to Jelastic, DockerCompose for dependencies (don't forget to bind the volume or your data will gone), Kubernetes

Tracker? maybe Pirsch

Why there's no __INSERT_DB_TYPE_HERE__? Graph database? because I rarely do recursive/graph queries, and especially I don't know which one that are best? Neo4J? DGraph? NebulaGraph? TigerGraph? AnzoGraph? TerminusDB? Age? JanusGraph? HugeGraph?

Have some other cool/high-performance tech stack suggestion? Chat with me at http://t.me/kokizzu (give proper intro tho, or I would think you are a crypto scam spammer XD)

2021-11-22

Kafka vs RedPanda Benchmark (also Tarantool and Clickhouse as queue)

Using default settings from their docker-compose example, today we're gonna benchmark one of popular MQ/PubSub software. I never used MQ extensively before (only NATS, Google PubSub, ActiveMQ, and Amazon SQS), usually just using standard database that stores event is sufficient (the consumer using pull, tailing from last primary key counter, and if need to fan-out just use multiple goroutine and multiple channel), because my projects never been a latency sensitive applications.

Some issues: 
  1. the benchmark has locking (atomic counters, sync.Map, etc), so consumer might not utilize whole CPU cores.
  2. confluent's kafka docker always error when starting because /var/lib/kafka/data not writable, so I bind on /var/lib/kafka instead. Clickhouse also always failed to start when bind to /var/lib/clickhouse/data, so I don't bind volume for Clickhouse.
  3. RedPanda failed to start when fs.aio-max-nr even when it's already ~1 million (originally only 64K), so I set it to 4194304
Benchmarking 1000 goroutines publishing 2000 messages each, with 100 goroutines consuming in parallel.

REDPANDA version: v21.10.1 (rev e7b6714)

=== redpanda single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2387
MaxLatency (ms):  2125
AvgLatency (ms):  432
Total (s) 3.457646367s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2408
MaxLatency (ms):  2663
AvgLatency (ms):  490
Total (s) 3.459949739s

=== redpanda multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  4187
MaxLatency (ms):  12146
AvgLatency (ms):  9701
Total (s) 13.610533861s 

# ^ weird, maybe startup not yet complete?
# retried reinit docker-compose, 1st time always slow
# but 2nd time always fast:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2413
MaxLatency (ms):  2704
AvgLatency (ms):  467
Total (s) 3.545496041s


KAFKA version: 7.0.0-ccs (Commit:c6d7e3013b411760)
equal to kafka 3.0.0

=== kafka single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6634
MaxLatency (ms):  12052
AvgLatency (ms):  8579
Total (s) 13.722706977s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6380
MaxLatency (ms):  11856
AvgLatency (ms):  8636
Total (s) 13.625928209s

=== kafka multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6596
MaxLatency (ms):  11932
AvgLatency (ms):  8523
Total (s) 13.659630863s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6535
MaxLatency (ms):  11903
AvgLatency (ms):  8588
Total (s) 13.677644818s

These benchmark using default settings that exists in the docker examples I found, except SMP (I set it to the same amount of cores in the server that used to benchmark to make it fair with Kafka that uses JVM that by default can utilize all cores -- apparently this has insignificant impact). Current conclusion is, RedPanda way faster than Kafka, in terms of publishing speed (around ~1μs per message, 477K-837K msg/s) and consuming latency (432ms to 2.7s per message), while Kafka (around ~3μs per message, 301K-313K msg/s) and 8.5s to 12s per message. The RAM statistics tho, RedPanda uses 12GB for each node (10% of server's RAM), while Kafka only uses 355MB, 375MB, 788MB for nodes, and 120MB for zookeeper. The repo to reproduce this benchmark is here on 2021mq directory.

Btw if you're looking for Kafka/RedPanda GUI, try KOwl, this way more beautiful than ActiveMQ default Web UI.

Bonus rounds, using one of the fastest OLTP database: Tarantool and one of the fastest OLAP database: Clickhouse as Queue, by laveraging sequence (auto increment) or internal function to generate a sequence, the difference is there's only one consumer group (have to manually fan out using goroutine), no json encode and decode since it's structured database:


TARANTOOL version: 2.8.2

=== tarantool single (memtx):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11238
MaxLatency (ms):  1071
AvgLatency (ms):  101
Total (s) 11.244551225s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9596
MaxLatency (ms):  816
AvgLatency (ms):  61
Total (s) 9.957516119s

=== tarantool single (vinyl):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11383
MaxLatency (ms):  1076
AvgLatency (ms):  157
Total (s) 11.388865281s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9104
MaxLatency (ms):  102
AvgLatency (ms):  13
Total (s) 9.196549551s


CLICKHOUSE version: 21.11.4.14

=== clickhouse single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2052
MaxLatency (ms):  2078
AvgLatency (ms):  1461
Total (s) 3.570767491s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2057
MaxLatency (ms):  2008
AvgLatency (ms):  1445
Total (s) 3.536277427s

The result recap table (ms = millisecond, us = microsecond, ns = nanosecond):

only best of 2 runsRedPanda singleRedPanda multiKafka singleKafka multiTarantool memtxTarantool vinylClickhouse single
Publish (ms)2,3872,4136,3806,5359,5969,1042,052
Sub Max Latency (ms)2,1252,70411,85611,9038161022,008
Sub Avg Latency (ms)4904678,6368,52361131,445
Pub Troughput (msg/s)837,872828,844313,480306,044208,420219,684974,659
est. Pub Latency (ns)1,1941,2073,1903,2684,7984,5521,026
est. Sub Throughput (msg/s)4,081,6334,282,655231,589234,65932,786,885153,846,1541,384,083

Conclusion: Tarantool probably the only single node database that can compete with Kafka for queue use case (we can have multi-master replica but not recommended, it's better to use master-slave config where slave used as failover), for other database especially RDBMS that persist to disk pretty sure can only do ~50K tps, Clickhouse can be multi-master, and last time i check, it can do ~600K inserts per seconds (while this time it's around 1M inserts per seconds), I simulate the atomic counter on Clickhouse using TimeStamp64Milli, the query limited to 100 queries per second but it's quite good enough for pub-sub use case. The benefit of using database as MQ/PubSub is that you can do a very flexible query (SQL support), mostly better tooling (especially Clickhouse), or update the record for new consumer, but the cons is that you must notify/fan-out (for example using NATS broadcast, only push the signal for worker to pull), track the ack/retries and the read offset of the workers yourself (pull).