2021-11-22

Kafka vs RedPanda Benchmark (also Tarantool and Clickhouse as queue)

Using default settings from their docker-compose example, today we're gonna benchmark one of popular MQ/PubSub software. I never used MQ extensively before (only NATS, Google PubSub, ActiveMQ, and Amazon SQS), usually just using standard database that stores event is sufficient (the consumer using pull, tailing from last primary key counter, and if need to fan-out just use multiple goroutine and multiple channel), because my projects never been a latency sensitive applications.

Some issues: 
  1. the benchmark has locking (atomic counters, sync.Map, etc), so consumer might not utilize whole CPU cores.
  2. confluent's kafka docker always error when starting because /var/lib/kafka/data not writable, so I bind on /var/lib/kafka instead. Clickhouse also always failed to start when bind to /var/lib/clickhouse/data, so I don't bind volume for Clickhouse.
  3. RedPanda failed to start when fs.aio-max-nr even when it's already ~1 million (originally only 64K), so I set it to 4194304
Benchmarking 1000 goroutines publishing 2000 messages each, with 100 goroutines consuming in parallel.

REDPANDA version: v21.10.1 (rev e7b6714)

=== redpanda single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2387
MaxLatency (ms):  2125
AvgLatency (ms):  432
Total (s) 3.457646367s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2408
MaxLatency (ms):  2663
AvgLatency (ms):  490
Total (s) 3.459949739s

=== redpanda multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  4187
MaxLatency (ms):  12146
AvgLatency (ms):  9701
Total (s) 13.610533861s 

# ^ weird, maybe startup not yet complete?
# retried reinit docker-compose, 1st time always slow
# but 2nd time always fast:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2413
MaxLatency (ms):  2704
AvgLatency (ms):  467
Total (s) 3.545496041s


KAFKA version: 7.0.0-ccs (Commit:c6d7e3013b411760)
equal to kafka 3.0.0

=== kafka single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6634
MaxLatency (ms):  12052
AvgLatency (ms):  8579
Total (s) 13.722706977s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6380
MaxLatency (ms):  11856
AvgLatency (ms):  8636
Total (s) 13.625928209s

=== kafka multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6596
MaxLatency (ms):  11932
AvgLatency (ms):  8523
Total (s) 13.659630863s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6535
MaxLatency (ms):  11903
AvgLatency (ms):  8588
Total (s) 13.677644818s

These benchmark using default settings that exists in the docker examples I found, except SMP (I set it to the same amount of cores in the server that used to benchmark to make it fair with Kafka that uses JVM that by default can utilize all cores -- apparently this has insignificant impact). Current conclusion is, RedPanda way faster than Kafka, in terms of publishing speed (around ~1μs per message, 477K-837K msg/s) and consuming latency (432ms to 2.7s per message), while Kafka (around ~3μs per message, 301K-313K msg/s) and 8.5s to 12s per message. The RAM statistics tho, RedPanda uses 12GB for each node (10% of server's RAM), while Kafka only uses 355MB, 375MB, 788MB for nodes, and 120MB for zookeeper. The repo to reproduce this benchmark is here on 2021mq directory.

Btw if you're looking for Kafka/RedPanda GUI, try KOwl, this way more beautiful than ActiveMQ default Web UI.

Bonus rounds, using one of the fastest OLTP database: Tarantool and one of the fastest OLAP database: Clickhouse as Queue, by laveraging sequence (auto increment) or internal function to generate a sequence, the difference is there's only one consumer group (have to manually fan out using goroutine), no json encode and decode since it's structured database:


TARANTOOL version: 2.8.2

=== tarantool single (memtx):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11238
MaxLatency (ms):  1071
AvgLatency (ms):  101
Total (s) 11.244551225s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9596
MaxLatency (ms):  816
AvgLatency (ms):  61
Total (s) 9.957516119s

=== tarantool single (vinyl):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11383
MaxLatency (ms):  1076
AvgLatency (ms):  157
Total (s) 11.388865281s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9104
MaxLatency (ms):  102
AvgLatency (ms):  13
Total (s) 9.196549551s


CLICKHOUSE version: 21.11.4.14

=== clickhouse single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2052
MaxLatency (ms):  2078
AvgLatency (ms):  1461
Total (s) 3.570767491s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2057
MaxLatency (ms):  2008
AvgLatency (ms):  1445
Total (s) 3.536277427s

The result recap table (ms = millisecond, us = microsecond, ns = nanosecond):

only best of 2 runsRedPanda singleRedPanda multiKafka singleKafka multiTarantool memtxTarantool vinylClickhouse single
Publish (ms)2,3872,4136,3806,5359,5969,1042,052
Sub Max Latency (ms)2,1252,70411,85611,9038161022,008
Sub Avg Latency (ms)4904678,6368,52361131,445
Pub Troughput (msg/s)837,872828,844313,480306,044208,420219,684974,659
est. Pub Latency (ns)1,1941,2073,1903,2684,7984,5521,026
est. Sub Throughput (msg/s)4,081,6334,282,655231,589234,65932,786,885153,846,1541,384,083

Conclusion: Tarantool probably the only single node database that can compete with Kafka for queue use case (we can have multi-master replica but not recommended, it's better to use master-slave config where slave used as failover), for other database especially RDBMS that persist to disk pretty sure can only do ~50K tps, Clickhouse can be multi-master, and last time i check, it can do ~600K inserts per seconds (while this time it's around 1M inserts per seconds), I simulate the atomic counter on Clickhouse using TimeStamp64Milli, the query limited to 100 queries per second but it's quite good enough for pub-sub use case. The benefit of using database as MQ/PubSub is that you can do a very flexible query (SQL support), mostly better tooling (especially Clickhouse), or update the record for new consumer, but the cons is that you must notify/fan-out (for example using NATS broadcast, only push the signal for worker to pull), track the ack/retries and the read offset of the workers yourself (pull).

2021-11-17

Alternative Strategy for Dependency Injection (lambda-returning vs function-pointer)

There's some common strategy for injecting dependency (one or sets of function) using interface, something like this:

type Foo interface{ Bla() string }
type RealAyaya struct {}
func(a *RealAyaya) Bla() {}
type MockAyaya struct {} // generated from gomock or others
func(a *MockAyaya) Bla() {}
// real usage:
deps := RealAyaya{}
deps.Bla()
// test usage:
deps := MockAyaya{}
deps.Bla()

and there's another one (dependency parameter on function returning a lambda):

type Bla func() string
type DepsIface interface { ... }
func NewBla(deps DepsIface) Bla {
  return func() string {
    // do something with deps
  }
// real usage:
bla := NewBla(realDeps)
res := bla()
// test usage:
bla := NewBLa(mockedOrFakeDeps)
res := bla()

and there other way by combining both fake and real implementation like this, or alternatively using proxy/cache+codegen if it's for 3rd party dependency.
and there other way (plugggable per-function level):

type Bla func() string
type BlaCaller struct {
  BlaFunc Bla
}
// real usage:
bla := BlaCaller{ BlaFunc: deps.SomeMethod }
res := bla.BlaFunc()
// test usage:
bla := BlaCaller{ BlaFunc: func() string { return `fake` } }
res := bla.BlaFunc()

Analysis


The first one is the most popular way, the 2nd one is one that I saw recently (that also being used in openApi/swagger codegen, i forgot which library), the bad part is that we have to sanitize the trace manually because it would show something like NewBla.func1 in the traces, and we have to use generated mock or implement everything if we have to test. Last style is what I thought when writing some task, where the specs still unclear whether I should:
1. query from local database
2. hit another service
3. or just a fake data (in the tests)
I can easily switch out any function without have to depend on whole struct or interface, and it would be still easy to debug (set breakpoint) and jump around the method, compared to generated mock or interface version.
Probably the bad part is, we have to inject every function one by one for each function that we want to call (which nearly equal effort as the 2nd one). But if that's the case, when your function requires 10+ other function to inject, maybe it's time to refactor?

The real use case would be something like this:

type LoanExpirationFunc func(userId string) time.Time 
type InProcess1 struct {
  UserId string 
// add more input here
  LoanExpirationFunc LoanExpirationFunc
  // add more injectable-function, eg. 3rd party hit or db read/save
}
type OutProcess1 struct {}
func Process1(in *InProcess1) (out *OutProcess1) {
  if ... // eg. validation
  x := in.LoanExpirationFunc(in.UserId) 
  // ... // do something
}

func defaultLoanExpirationFunc(userId string) time.Time {
  // 
eg. query from database
}

type thirdParty struct {} // to put dependencies
func NewThirdParty() (*thirdParty) { return &thirdParty{} }
func (t *thirdParty) extLoanExpirationFunc(userId string) time.Time {
  // eg. hit another service
}

// init input:
func main() {
  http.HandleFunc("/case1", func(w, r ...) {
    in := InProcess1{LoanExpirationFunc: defaultLoanExpirationFunc}
    in.ParseFromRequest(r)
    out := Process1(in)  
    out.WriteToResponse(w)
  })
  tp := NewThirdParty()
  http.HandleFunc("/case2", func(w, r ...) {
    in := InProcess1{LoanExpirationFunc: tp.extLoanExpirationFunc}
    in.ParseFromRequest(r)
    out := Process1(in)  
    out.WriteToResponse(w)
  })
}

// on test:
func TestProcess1(t *testing.T) {
  t.Run(`test one year from now`, func(t *testing.T) {
    in := inProcess1{LoanExpirationFunc: func(string) { return time.Now().Add(1, 0, 0) }}
    out := Process1(in)
    assert.Equal(t, out, ...)
  })
}

Haven't using this strategy extensively on new a project (since I just thought about this today and yesterday when creating horrid integration test), but I'll update this post when I found annoyance with this strategy.
 
UPDATE 2022: after using this strategy extensively for a while, this one is better than interface (especially when using IntelliJ), my tip: it would be better if you use function pointer name and injected function name with same name.

2021-11-14

Databases with Automatic Rebalance Benchmark (TIDB vs YugabyteDB vs CockroachDB)

Automatic rebalance/repair/self-healing (we can remove or add new node, and it will distribute the data and rebalance itself, data are replicated to more than 1 node). Previous benchmark doesn't really care about this awesome feature (no more cutoff downtime to kill master instance and promote slave as master then switch every client to connect to new master -- if not using any proxy).

Some databases that I found that support this feature:

  1. Aerospike (in-mem kv database, community edition max 4 billion object) https://aerospike.com/products/product-matrix/
  2. Couchbase (document database, there's community edition) https://www.couchbase.com/products/editions
  3. MongoDB (document database, without SQL syntax)
  4. Cassandra (columnar database, CQL a subset of SQL)
  5. ScyllaDB (columnar database, cassandra-compatible, oss version max 5 node) https://www.scylladb.com/pricing/
  6. CockroachDB (postgresql compatible, there's core edition) https://www.cockroachlabs.com/compare/
  7. TiDB (mysql compatible)
  8. YugaByteDB (postgresql/cassandra/redis compatible)
  9. RavenDB (community edition max ram 6GB) https://ravendb.net/buy
  10. SingleStore/MemSQL (mysql compatible, free edition max 4 node) https://www.singlestore.com/free-software/
Reproducibility

The repository are here: https://github.com/kokizzu/hugedbbench on the 2021 folder. We're going to test local single (if possible) and multi server deployment using docker. Why using docker? because i don't want to ruin my computer/server with trash files they are creating in system directory (if any). Some of databases not included if not supporting SQL or if a license key required to start. Why only benchmarking 2 column? because it fit my project's most common use case, where there's 1 PK (bigint or string), and 1 unique key (mostly string), and the rest mostly some indexed or non-indexed column. Why are you even doing this? Just want to select the best thing for my next side project's techstack (and because my past companies I've work with seems love to move around database server location a lot).
The specs for the server that used in this benchmark: 32-core 128GB RAM 500GB NVMe disk.

CockroachDB

CockroachDB is one of NewSQL movement that support PostgreSQL syntax, to deploy in single node we can use docker compose. The UI for cluster monitor on port 8080 is quite ok :3 better than nothing.

Here's the result for 100 inserts x 1000 goroutines:

Version used: v21.1.11
CockroachDB InsertOne 10.034616078s
CockroachDB Count 42.326487ms
CockroachDB UpdateOne 12.804722812s
CockroachDB Count 78.221432ms
CockroachDB SelectOne 2.281355728s
CockroachDB Total 25.2420225s
$ sudo du -h --max-depth 0 2021/cockroachdb/cockroach1
442M    2021/cockroachdb/cockroach1

CockroachDB InsertOne 7.125466063s
CockroachDB Count 39.753102ms
CockroachDB UpdateOne 10.221870484s
CockroachDB Count 70.624908ms
CockroachDB SelectOne 2.196985441s
CockroachDB Total 19.655920219s
432M    2021/cockroachdb/cockroach1

# multiple cockroach docker but connect only into one
# seems high availability (>1 RF) turned on by default
# but you have to init the cluster manually after docker-compose up
CockroachDB InsertOne 13.979257573s
CockroachDB Count 46.824883ms
CockroachDB UpdateOne 1m22.941738013s
CockroachDB Count 42.374814ms
CockroachDB SelectOne 2.676679427s
CockroachDB Total 1m39.687566436s
433M    2021/cockroachdb/cockroach1
292M    2021/cockroachdb/cockroach2
222M    2021/cockroachdb/cockroach3

TiDB

TiDB is one of NewSQL movement that support MySQL syntax, the recommended way is using tiup command, but we're going to use docker so it would be fair with other database product. The official docker use 3 placement driver and 3 kv server, so I try that first. The cluster monitor in port 10080 but it blocked by chrome, so I moved it on 10081, it's very plaintexty compared to other products.

Version used: 5.7.25-TiDB-v5.0.1
TiDB InsertOne 14.063953386s
TiDB Count 32.523526ms
TiDB UpdateOne 11.329688001s
TiDB Count 49.320725ms
TiDB SelectOne 2.110410282s
TiDB Total 27.601866351s
$ sudo du -h --max-depth 0 2021/tidb/t*/
24G     2021/tidb/tikv0/
24G     2021/tidb/tikv1/
24G     2021/tidb/tikv2/
123M    2021/tidb/tipd0/
123M    2021/tidb/tipd1/
123M    2021/tidb/tipd2/

TiDB InsertOne 13.434256392s
TiDB Count 44.192782ms
TiDB UpdateOne 12.575839233s
TiDB Count 63.126285ms
TiDB SelectOne 2.00257672s
TiDB Total 28.134319527s
24G     2021/tidb/tikv0/
24G     2021/tidb/tikv1/
24G     2021/tidb/tikv2/
123M    2021/tidb/tipd0/
62M     2021/tidb/tipd1/
62M     2021/tidb/tipd2/

# reducing to single server mode (1 pd, 1 kv, 1 db), first run:
TiDB InsertOne 3.216365486s
TiDB Count 34.30629ms
TiDB UpdateOne 3.913131711s
TiDB Count 62.202395ms
TiDB SelectOne 1.991229179s
TiDB Total 9.233077269s
24G     2021/tidb/tikv0/
62M     2021/tidb/tipd0/

YugaByteDB

YugaByteDB is one of NewSQL movement that support PostgreSQL syntax, to deploy in single node we can use docker compose too. The cluster monitor on port :7000 is quite ok. The tmp directory mounted because if it isn't it would stuck starting on 2nd time unless the temporary file manually deleted. limits.conf applied.

Version used: 2.9.1.0
YugaByteDB InsertOne 11.402609701s
YugaByteDB Count 159.357304ms
YugaByteDB UpdateOne 19.232827282s
YugaByteDB Count 214.389496ms
YugaByteDB SelectOne 2.778803557s
YugaByteDB Total 33.834838111s
$ sudo du -h --max-depth 0 2021/yugabytedb/yb*1
25M     2021/yugabytedb/ybmaster1
519M    2021/yugabytedb/ybtserver1

YugaByteDB InsertOne 13.536083917s
YugaByteDB Count 202.381009ms
YugaByteDB UpdateOne 20.78337085s
YugaByteDB Count 190.119437ms
YugaByteDB SelectOne 2.849347721s
YugaByteDB Total 37.607747856s
25M     2021/yugabytedb/ybmaster1
519M    2021/yugabytedb/ybtserver1

# multiple ybtserver but only connect to one
# replication factor 1, first run:
YugaByteDB InsertOne 15.260747636s
YugaByteDB Count 66.599257ms
YugaByteDB UpdateOne 26.246382158s
YugaByteDB Count 63.119089ms
YugaByteDB SelectOne 3.213271599s
YugaByteDB Total 44.90095282s
25M     2021/yugabytedb/ybmaster1
242M    2021/yugabytedb/ybtserver1
156M    2021/yugabytedb/ybtserver2
132M    2021/yugabytedb/ybtserver3

# after changing replication factor to 2, first run:
YugaByteDB InsertOne 38.614091068s
YugaByteDB Count 76.615212ms
YugaByteDB UpdateOne 56.796680169s
YugaByteDB Count 84.35411ms
YugaByteDB SelectOne 3.14747611s
YugaByteDB Total 1m38.756226195s
26M     2021/yugabytedb/ybmaster1
343M    2021/yugabytedb/ybtserver1
349M    2021/yugabytedb/ybtserver2
349M    2021/yugabytedb/ybtserver3

# after changing replication factor to 3, first run:
YugaByteDB InsertOne 45.289805293s
YugaByteDB Count 97.112383ms
YugaByteDB UpdateOne 54.665380464s
YugaByteDB Count 64.206741ms
YugaByteDB SelectOne 3.125693618s
YugaByteDB Total 1m43.290014042s
26M     2021/yugabytedb/ybmaster1/
513M    2021/yugabytedb/ybtserver1/
512M    2021/yugabytedb/ybtserver2/
512M    2021/yugabytedb/ybtserver3/

Conclusion


Here's the recap of 100 records x 1000 goroutine insert/update/select duration, only for single instance:

Only Best of 2 runsCockroachDB
(single)
TiDB (single)YugaByteDB
(single)
InsertOne (s)7.1(best) 2.9(worst) 11.4
UpdateOne (s)10.2(best) 3.9(worst) 19.2
SelectOne (s)2.1(best) 1.9(worst) 2.7
Total (s)19.6(best) 9.2(worst) 33.8
Disk Usage (MB)(best) 432(worst) 24062544

So, at best, it roughly on average take 29 μs to insert, 39 μs to update, 19 μs to select one record.
Comparing only multi (RF=2+):

Only Best of 2 runsCockroachDB
(multi)
TiDB (multi)YugaByteDB
(multi)
YugaByteDB
(multi 2)
YugaByteDB
(multi 3)
InsertOne (s)12.5(best) 3.114.334.9(worst) 45.2
UpdateOne (s)(worst) 60.1(best) 4.119.652.754.6
SelectOne (s)(worst) 3.1(best) 2.12.8(worst) 3.12.9
Total (s)77.2(best) 9.540.891.7(worst) 103.2
Disk Usage (MB)1015(worst) 72247(best) 52110461563

So, at best, it roughly on average take 31 μs to insert, 41 μs to update, 21 μs to select one record.
Comparing only multi with replication factor with true HA:

Only Best of 2 runsCockroachDB
(multi)
TiDB (multi)YugaByteDB
(multi 3)
InsertOne (s)13.9(best) 3.1(worst) 45.2
UpdateOne (s)(worst) 82.9(best) 4.154.6
SelectOne (s)2.6(best) 2.12.9
Total (s)(worst) 99.6(best) 9.5(worst) 103.2
Disk Usage (MB)(best) 1015(worst) 722471563

It seems TiDB has most balanced performance in expense the need to have pre-allocated disk space, while CockroachDB has worst performance on multi-instance update task, and YugabyteDB has worst performance on multi-instance insert task.

What happened if we do the benchmark once more, remove one storage node (docker stop), then redo the benchmark (only for RF=2+)?

Yugabytedb test doesn't even entering the insert stage after 5 minutes '__') may be because of truncate is slow? so I changed the benchmark scenario only for yugabyte to be 1 node be killed after 2 seconds of insertion phase, but still yugabyte giving an error "ERROR: Timed out: Write RPC (request call id 3873) to 172.21.0.5:9100 timed out after 60.000s (SQLSTATE XX000)", it cannot complete. EDIT yugabyte staff on slack suggested that it should be using RF=3 so it would still survive when one node died.

Only 1 runCockroachDB
(multi, kill 1)
TiDB (multi, kill 1 tikv)TiDB (multi, kill 1 tipd)TiDB (multi, kill 1 tikv 1 tipd)YugaByteDB
(multi 3, kill 1)
InsertOne (s)(worst) 35.919.214.3(best) 9.234.8
UpdateOne (s)18.611.616.0(best) 9.9(worst) 68.8
SelectOne (s)4.01.9(best) 1.82.33.1
Total (s)58.832.934.0(best) 21.5(worst) 106.9
Disk Usage (MB)(best) 1076(worst) 7236472308723082659

TiDB seems to be the winner also for case when a node died, in expense of the need of 7 initial node (1 tidb [should be at least 2 for HA], 3 tipd, 3 tikv, but probably can be squeezed to be 1 tidb, 1 tipd, 2 tikv, since apparently the default replication factor is 3), where cockroachdb only need 3, and yugabytedb need 4 (1 ybmaster, 3 ybserver). Not sure tho what would happened if 1 tidb/ybmaster instance is died. The recap spreadsheet are here.

Next time we're gonna test how simple is it to add and remove node (and securely, if possible only limited set of servers can join without have to set firewall/DMZ to restrict unprivileged servers) then re-benchmark with more complex common use case (like UPSERT, range queries, WHERE-IN, JOIN, and secondary index). If automatic rebalance not in the requirement, I would still use Tarantool (since 2020.09) and Clickhouse (since 2021.04), but now I found one more new favorite automatic-rebalance database other than Aerospike (since 2016.11), :3 myahaha! So this is probably the reason lots of companies moving to TiDB.

Btw do not comment on this blog (since it's too much spammy comment and there's no notification whether new comment added), just use github issue or reddit instead.

UPDATE #1: redo the benchmark for all database after updating the limits.conf, TiDB improved by a lot, while CockroachDB remains the same except for update benchmark.
 
UPDATE #2: TiKV can be set to use only a little of initial disk space by setting "reserve-space" it would only use 4.9GB per TiKV after the test