Showing posts with label clickhouse. Show all posts
Showing posts with label clickhouse. Show all posts

2024-03-23

Clickhouse in Kubernetes / Minikube using Bitnami Helm Charts

Normally for my projects I use docker-compose + normal bind persistence directory to avoid data loss for database (and keep it fast and simple to maintain), because it's simpler to upgrade version than normal installation (looking at that popular database that annoying to upgrade the major version that requier a lot of copying) and the network/disk/cpu overhead also unnoticeable. Today we're gonna try to setup Clickhouse using minikube. Assuming you already installed minikube, all you need to do is install helm (k8s package manager).

curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get-helm-3 > install_helm.sh && bash install_helm.sh
helm install my-release oci://registry-1.docker.io/bitnamicharts/clickhouse

# to check again the deploy
helm status my-release

# to check configurable options
helm show all oci://registry-1.docker.io/bitnamicharts/clickhouse

# to uninstall
helm uninstall my-release

by default it would create 2 shard with 3 replica (includes the pv and pvc for all pods), and too bad it's still using zookeeper instead of clickhouse-keeper.

my-release-clickhouse-shard0-0   1/1  Running   0 
my-release-clickhouse-shard0-1   1/1  Running   0 
my-release-clickhouse-shard0-2   1/1  Running   0 
my-release-clickhouse-shard1-0   1/1  Running   0 
my-release-clickhouse-shard1-1   1/1  Running   0 
my-release-clickhouse-shard1-2   1/1  Running   0 
my-release-zookeeper-0           1/1  Running   0 
my-release-zookeeper-1           1/1  Running   0 
my-release-zookeeper-2           1/1  Running   0 


to access it locally you just need to port forward and connect using clickhouse-client program:

kubectl port-forward --namespace default svc/my-release-clickhouse 9001:9000 &
clickhouse-client --host localhost --port 9001 --user default --password $(kubectl get secret --namespace default my-release-clickhouse -o jsonpath="{.data.admin-password}" | base64 -d)

alternatively you can just exec into the pod and use clickhouse-client inside the pod:

kubectl exec -it my-release-clickhouse-shard1-2 sh
clickhouse-client # use password from above

you already can create distributed table and do stuff with it. For more information about bitnami charts you can read it here. If you want to customize your setup (not 2 shard 3 replica, I always prefer no shard but multiple replica, unless your data is massive that hard not to shard across multiple machines). You can clone the chart and the modify the configuration, then deploy it, something like this:

git clone --depth 1 git@github.com:bitnami/charts.git
vim charts/
bitnami/clickhouse/values.yaml
# ^ change shard=1 keeper.enabled=true zookeeper.enabled=false
helm dependency build charts/bitnami/clickhouse
helm install ch1
charts/bitnami/clickhouse

# update
helm upgrade ch1 charts/bitnami/clickhouse

it would create something like this:

ch1-clickhouse-shard0-0         1/1  Running   0
ch1-clickhouse-shard0-1         1/1  Running   0
ch1-clickhouse-shard0-2         1/1  Running   0

That's it for now, not sure how to run this in real kubernetes cluster (not minikube), especially making the pv/pvc to be put in multiple nodes.

2024-02-20

Dump/export Cassandra/BigQuery tables and import to Clickhouse

Today we're gonna dump Cassandra table and put it to Clickhouse. Cassandra is columnar database but use as OLTP since it have really good distributed capability (customizable replication factor, multi-cluster/region, clustered/partitioned by default -- so good for multitenant applications), but if we need to do analytics queries or some complex query, it became super sucks, even with ScyllaDB's materialized view (which only good for recap/summary). To dump Cassandra database, all you need to do just construct a query and use dsbulk, something like this:

./dsbulk unload -delim '|' -k "KEYSPACE1" \
   -query "SELECT col1,col2,col3 FROM table1" -c csv \
   -u 'USERNAME1' -p 'PASSWORD1' \
   -b secure-bundle.zip | tr '\\' '"' |
    gzip -9 > table1_dump_YYYYMMDD.csv.gz ;


tr command above used to unescape backslash, since dsbulk export csv not in proper format (\" not ""), after than you can just restore it by running something like this:

CREATE TABLE table1 (
    col1 String,
    col2 Int64,
    col3 UUID,
) ENGINE = ReplacingMergeTree()
ORDER BY (col1, col2);

SET format_csv_delimiter = '|';
SET input_format_csv_skip_first_lines = 1;

INSERT INTO table1
FROM INFILE 'table1_dump_YYYYMMDD.csv.gz'
FORMAT CSV;


BigQuery

 Similar to Clickhouse, BigQuery is one of the best analytical engine (because of unlimited compute and massively parallel storage), but it comes with cost, improper partitioning/clustering (even with proper one, because it's limited to only 1 column unlike Clickhouse that can do more) with large table will do a huge scan ($6.25 per TiB) and a lot of compute slot, if combined with materialized view or periodic query on cron, it would definitely kill your wallet. To dump from BigQuery all you need to do just create GCS (Google Cloud Storage) bucket then run some query something like this:

EXPORT DATA
  OPTIONS (
    uri = 'gs://
BUCKET1/table2_dump/1-*.parquet',
    format = 'PARQUET',
    overwrite = true
    --, compression = 'GZIP' -- import failed: ZLIB_INFLATE_FAILED
  )
AS (
  SELECT * FROM `dataset1.table2`
);

-- it's better to create snapshot table
-- if you do WHERE filter on above query, eg.
CREATE TABLE dataset1.table2_filtered_snapshot AS
  SELECT * FROM `
dataset1.table2` WHERE col1 = 'yourFilter';

Not using compression because it's failed to import, not sure why. The parquet files will be shown on your bucket, click on "Remove public access prevention", and allow it to be publicly available with gcloud command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET1 --member=allUsers --role=roles/storage.objectViewer
# remove-iam-policy-binding to undo this

 

 Then just restore it:

CREATE TABLE table2 (
          Col1 String,
          Col2 DateTime,
          Col3 Int32
) ENGINE = ReplacingMergeTree()
ORDER BY (Col1, Col2, Col3);

SET parallel_distributed_insert_select = 1;

INSERT INTO table2
SELECT Col1, Col2, Col3
FROM s3Cluster(
    'default',
    'https://storage.googleapis.com/BUCKET1/table2_dump/1-*.parquet',
    '', -- s3 access id, remove or leave empty if public
    '' -- s3 secret key,
remove or leave empty if public
);


Writing UDF for Clickhouse using Golang

Today we're going to create an UDF (User-defined Function) in Golang that can be run inside Clickhouse query, this function will parse uuid v1 and return timestamp of it since Clickhouse doesn't have this function for now. Inspired from the python version with TabSeparated delimiter (since it's easiest to parse), UDF in Clickhouse will read line by line (each row is each line, and each text separated with tab is each column/cell value):

package main
import (
    "bufio"
    "encoding/binary"
    "encoding/hex"
    "fmt"
    "os"
    "strings"
    "time"
)
func main() {
    scanner := bufio.NewScanner(os.Stdin)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        id, _ := FromString(scanner.Text())
        fmt.Println(id.Time())
    }
}
func (me UUID) Nanoseconds() int64 {
    time_low := int64(binary.BigEndian.Uint32(me[0:4]))
    time_mid := int64(binary.BigEndian.Uint16(me[4:6]))
    time_hi := int64((binary.BigEndian.Uint16(me[6:8]) & 0x0fff))
    return int64((((time_low) + (time_mid << 32) + (time_hi << 48)) - epochStart) * 100)
}
func (me UUID) Time() time.Time {
    nsec := me.Nanoseconds()
    return time.Unix(nsec/1e9, nsec%1e9).UTC()
}
// code below Copyright (C) 2013 by Maxim Bublis <b@codemonkey.ru>
// see https://github.com/satori/go.uuid
// Difference in 100-nanosecond intervals between
// UUID epoch (October 15, 1582) and Unix epoch (January 1, 1970).
const epochStart = 122192928000000000
// UUID representation compliant with specification
// described in RFC 4122.
type UUID [16]byte
// FromString returns UUID parsed from string input.
// Following formats are supported:
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
func FromString(input string) (u UUID, err error) {
    s := strings.Replace(input, "-", "", -1)
    if len(s) == 41 && s[:9] == "urn:uuid:" {
        s = s[9:]
    } else if len(s) == 34 && s[0] == '{' && s[33] == '}' {
        s = s[1:33]
    }
    if len(s) != 32 {
        err = fmt.Errorf("uuid: invalid UUID string: %s", input)
        return
    }
    b := []byte(s)
    _, err = hex.Decode(u[:], b)
    return
}
// Returns canonical string representation of UUID:
// xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
func (u UUID) String() string {
    return fmt.Sprintf("%x-%x-%x-%x-%x",
        u[:4], u[4:6], u[6:8], u[8:10], u[10:])
}


Compile and put it with proper owner and permission on /var/lib/clickhouse/user_scripts/uuid2timestr and create /etc/clickhouse-server/uuid2timestr_function.xml (must be have proper suffix) containing:

<functions>
    <function>
        <type>executable</type>
        <name>uuid2timestr</name>
        <return_type>
String</return_type>
        <argument>
            <type>String</type>
        </argument>
        <format>TabSeparated</format>
        <command>uuid2timestr</command>
        <lifetime>0</lifetime>
    </function>
</functions>


after that you can restart Clickhouse (sudo systemctl restart clickhouse-server or sudo clickhouse restart) depends on how you install it (apt or binary setup).

Usage


to make sure it's loaded, you can just find this line on the log:

<Trace> ExternalUserDefinedExecutableFunctionsLoader: Loading config file '/etc/clickhouse-server/uuid2timestr_function.xml

then just run a query using that function:

SELECT uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')

┌─uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')─┐
│ 2023-12-11 05:58:33.2391752 +0000 UTC                │
└──────────────────────────────────────────────────────┘

2022-05-07

Getting started with Trino

Trino is a distributed query engine, that allows you to JOIN from multiple datasources (databases like mysql, postgresql, bigquery, cassandra, mongodb, redis, prometheus, elasticsearch, csv file, google sheets, s3, etc). It's like Clickhouse but without high-tech (merge-tree) storage ability, so it cannot do blazing fast analytics query like in Clickhouse, but it can be as fast as the connected database that it uses, eg. if it uses Clickhouse connected, then it can be as fast as Clickhouse. It was developed by Facebook (previously named Presto). List of database connectors can be seen here. To use Trino, you can use dockerized version or manual:

# Docker
docker run -d
 -p 8080:8080 --name trino1 trinodb/trino
# web UI only for monitoring, use random username 

docker exec -it trino1 trino

# Ubuntu 22.04
java --version
python3 --version
# download and extract from https://trino.io/download.html
mkdir 
./trino-server-379/etc
cd trino-server-379
SRCURL=https://raw.githubusercontent.com/trinodb/trino-the-definitive-guide/master/single-installation/etc
wget -c $SRCURL/jvm.config
wget -c $SRCURL/log.properties
wget -c $SRCURL/node.properties
echo '
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8081
query.max-memory=5GB
query.max-memory-per-node=1GB
discovery.uri=http://127.0.0.1:8081
' > config.properties
echo '

node.data-dir=/tmp/
' >> node.properties
mkdir catalog
echo '
connector.name=cassandra
cassandra.contact-points=127.0.0.1
# more here https://trino.io/docs/current/connector/cassandra.html
' > catalog/localscylla.properties 
cd ..
python3 ./bin/launcher.py run # to run in background: start

# CLI/Client
EXELOC=/usr/bin
/trino
curl -O $EXELOC https://repo1.maven.org/maven2/io/trino/trino-cli/379/trino-cli-379-executable.jar
chmod a+x $EXELOC
trino --server http://localhost:8081

These are the list of commands can be used in trino (other than standard SQL):

SHOW CATALOGS;
SHOW SCHEMAS FROM/IN __CATALOG__; # eg. localscylla
SHOW TABLES FROM/IN __CATALOG__.__SCHEMA__;
DESCRIBE __CATALOG__.__SCHEMA__.__TABLE__;
EXPLAIN SELECT * FROM __CATALOG__.__SCHEMA__.__TABLE__;

That's it, you can add more databases connection by creating more etc/catalog/*.properties file with proper configuration (username, password, port, etc).

2022-04-01

Georeplicable Architecture Tech Stacks

Since I got a use case for geo-replicable (multiple datacenters around the world), I need to create metrics and billing and log aggregator that survives even when multiple datacenter is down. So we need to create an architecture that the each cluster of databases in datacenter can give correct answer on their own, the layers are:

1. Cache Layer

2. Source of Immediate Truth (OLTP) Layer

3. Source of Event Truth Layer (either CDC or subscribable events or both)

4. Metrics and Analytics (OLAP) Layer

For single datacenter I might choose Tarantool or Aerospike (too bad if you want to store more than your RAM you must use paid version) or Redis (master-slave) for cache layer, especially if the DNS is smart enough to route the client's request to nearest datacenter.

Tarantool can also be source of immediate truth, but the problem is it should be master-slave replication so it could be fault tolerant, but it seems there might have to do manual intervention to fallback to promote slave node as master and reroute other slave to recognize new master?

So the other choice is either TiDB (for consistent use case, lower write rate, more complex query use case) or ScyllaDB (for partition tolerant use case, higher write rate). Both are good in terms of availability. TiDB's TiFlash also good for analytics use case.

For source of event truth, we can use RedPanda for pubsub/MQ use cases, and Debezium (that requires Kafka/RedPanda) for change data capture from ScyllaDB, or TiCDC for TiDB to stream to RedPanda.

Lastly for analytics, we can use Clickhouse, also to store all structured logs that can be easily queried, or can also be Loki. For metrics might aggregate from RedPanda using MaterializeIO (too bad that cluster is paid).

So.. what are the combinations possible?

1. Tarantool (manual counter for metrics) + Clickhouse (manual publish logs and analytics), this one good only for single location/datacenter, unless the clients are all can hit proper server location (like game servers, or smart CDNs)

2. same as #1 but with RedPanda if have multiple service, all logs and events published thru RedPanda

3. Aerospike/Tarantool/Redis (manual counter for metrics) + TiDB + TiFlash (analytics) + Loki for logs

4. Aerospike/Tarantool/Redis (manual counter for metrics) + TIDB + TiCDC + ClickHouse (for logs and analytics)

5. Aerospike/Tarantool/Redis (cache only) + TiDB + TiFlash (analytics) + TiCDC + MaterializeIO (for metrics) + Loki (logs)

6. Aerospike/Tarantool/Redis (cache only) + TiDB + TiCDC + Clickhouse (analytics and logs) + MaterializeIO (for metrics)

7. Aerospike/Tarantool/Redis (cache only) + ScyllaDB + Debezium + RedPanda + Clickhouse (analytics and logs) + MaterializeIO (for metrics) 

for number #5-#7 you can remove the Aerospike/Tarantool part if no need to cache (consistency matters, and you have a very large cluster that can handle peaks).

wait, why don't you include full-text search use case? '__') ok, we can use Debezium that publish to ElasticSearch (might be overkill), or manual publish to TypeSense or MeiliSearch.

That's only for data part of the cluster, what about computation and presentation layer (backend/frontend)?

Backend of course I will use Golang or C#, frontend either Svelte (web only) or Flutter (mobile and web), Unity3D (game).

For object storage (if locally I would use MinIO) and CDN (CloudFlare for example), you can see my previous post.

So why you choose those?

1. Tarantool, one of the fastest in-mem, like 200K wps/qps but the cons I already mentioned above, the performance similar to Redis, but this one supports SQL.

2. Aerospike, also one of the fastest in-mem, like also 200K wps/qps last time I checked, also can do master-master replication, if I'm not mistaken, limited to 4 nodes for free verrsion, can set the replication factor, but see other cons I mentioned above.

3. TiDB, one newsql with automatic rebalance (one node died, and it would still works fast), can do around 30-50K single node last time I benchmarked, their benchmark mostly shows 30K-40K rps for writes, 120K rps for mixed read-write, 330K rps for read-only multi nodes benchmark, need more space? add more TiKV instance, need more compute? add more TiDB instance, need to do faster analytics queries? add TiFlash instance. But.. the bad part is, so many moving parts, you have to deploy TiDB (query engine), TiKV (storage), and PD (placement driver), also TiCDC and TiFlash if you need those too, also I'm not sure how it would perform at multi-DC use-cases. What's the managed alternative? AlloyDB-PostgreSQL (GCP) or TiDB-Cloud (AWS)

4. ScyllaDB, is faster version of Cassandra, most of benchmark can do 120K-400K insert and queries per second, one of the best database for mutli-DC, each keyspace can be set whether should replicate in which datacenter, how many the replication factor, consistency controlled in client-side, also can manage multiple view since 2 years ago (materialized view), so we don't have to create and maintain multiple table manually for each query pattern. What's the managed alternative? ScyllaDB-Cloud

5. RedPanda, is faster version of Kafka, last time I checked, one instance can receive 800K events per seconds, and publishes 4 millions events per second.

6. Clickhouse, is one of the best analytics database, can do a 1M batched ingestion per second in single node, can do complex queries really fast under a second (but depends on how big your data, but kind of query that if you did it in normal RDBMS would took minutes), the cons is one node can only handle 100 queries concurrently.

7. MaterializeIO, is like ksqldb but written in Rust, haven't checked the performance but they claimed they perform as fast as Redis.

What's other alternative? Yugabyte looks ok, especially the YCQL part that works like Cassandra/ScyllaDB. Especially yugabyte seems to be combining Redis, Postgres, and Cassandra in one deployment, but I like TiDB more because last time I checked, I need to config something to make it writable when one node died.

Proxy and load balancer? Caddy, Traefik, FabioLB (with Consul), or NATS (eg. 3 load balancer/custom api gateway that deployed in front, then it would serialize request to NATS inside DMZ, the worker/handler will receive that and return response that deserialized back to the loadbalancer/api gateway, that way load balancer doesn't need to know how exactly how many worker/handler, services can also communicate synchronously thru NATS without knowing each other service's IP address, and the worker/handler part can be scaled independently)

Deployment? Nomad, just rsync it to Jelastic, DockerCompose for dependencies (don't forget to bind the volume or your data will gone), Kubernetes

Tracker? maybe Pirsch

Why there's no __INSERT_DB_TYPE_HERE__? Graph database? because I rarely do recursive/graph queries, and especially I don't know which one that are best? Neo4J? DGraph? NebulaGraph? TigerGraph? AnzoGraph? TerminusDB? Age? JanusGraph? HugeGraph?

Have some other cool/high-performance tech stack suggestion? Chat with me at http://t.me/kokizzu (give proper intro tho, or I would think you are a crypto scam spammer XD)

2021-11-22

Kafka vs RedPanda Benchmark (also Tarantool and Clickhouse as queue)

Using default settings from their docker-compose example, today we're gonna benchmark one of popular MQ/PubSub software. I never used MQ extensively before (only NATS, Google PubSub, ActiveMQ, and Amazon SQS), usually just using standard database that stores event is sufficient (the consumer using pull, tailing from last primary key counter, and if need to fan-out just use multiple goroutine and multiple channel), because my projects never been a latency sensitive applications.

Some issues: 
  1. the benchmark has locking (atomic counters, sync.Map, etc), so consumer might not utilize whole CPU cores.
  2. confluent's kafka docker always error when starting because /var/lib/kafka/data not writable, so I bind on /var/lib/kafka instead. Clickhouse also always failed to start when bind to /var/lib/clickhouse/data, so I don't bind volume for Clickhouse.
  3. RedPanda failed to start when fs.aio-max-nr even when it's already ~1 million (originally only 64K), so I set it to 4194304
Benchmarking 1000 goroutines publishing 2000 messages each, with 100 goroutines consuming in parallel.

REDPANDA version: v21.10.1 (rev e7b6714)

=== redpanda single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2387
MaxLatency (ms):  2125
AvgLatency (ms):  432
Total (s) 3.457646367s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2408
MaxLatency (ms):  2663
AvgLatency (ms):  490
Total (s) 3.459949739s

=== redpanda multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  4187
MaxLatency (ms):  12146
AvgLatency (ms):  9701
Total (s) 13.610533861s 

# ^ weird, maybe startup not yet complete?
# retried reinit docker-compose, 1st time always slow
# but 2nd time always fast:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2413
MaxLatency (ms):  2704
AvgLatency (ms):  467
Total (s) 3.545496041s


KAFKA version: 7.0.0-ccs (Commit:c6d7e3013b411760)
equal to kafka 3.0.0

=== kafka single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6634
MaxLatency (ms):  12052
AvgLatency (ms):  8579
Total (s) 13.722706977s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6380
MaxLatency (ms):  11856
AvgLatency (ms):  8636
Total (s) 13.625928209s

=== kafka multi:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6596
MaxLatency (ms):  11932
AvgLatency (ms):  8523
Total (s) 13.659630863s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  6535
MaxLatency (ms):  11903
AvgLatency (ms):  8588
Total (s) 13.677644818s

These benchmark using default settings that exists in the docker examples I found, except SMP (I set it to the same amount of cores in the server that used to benchmark to make it fair with Kafka that uses JVM that by default can utilize all cores -- apparently this has insignificant impact). Current conclusion is, RedPanda way faster than Kafka, in terms of publishing speed (around ~1μs per message, 477K-837K msg/s) and consuming latency (432ms to 2.7s per message), while Kafka (around ~3μs per message, 301K-313K msg/s) and 8.5s to 12s per message. The RAM statistics tho, RedPanda uses 12GB for each node (10% of server's RAM), while Kafka only uses 355MB, 375MB, 788MB for nodes, and 120MB for zookeeper. The repo to reproduce this benchmark is here on 2021mq directory.

Btw if you're looking for Kafka/RedPanda GUI, try KOwl, this way more beautiful than ActiveMQ default Web UI.

Bonus rounds, using one of the fastest OLTP database: Tarantool and one of the fastest OLAP database: Clickhouse as Queue, by laveraging sequence (auto increment) or internal function to generate a sequence, the difference is there's only one consumer group (have to manually fan out using goroutine), no json encode and decode since it's structured database:


TARANTOOL version: 2.8.2

=== tarantool single (memtx):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11238
MaxLatency (ms):  1071
AvgLatency (ms):  101
Total (s) 11.244551225s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9596
MaxLatency (ms):  816
AvgLatency (ms):  61
Total (s) 9.957516119s

=== tarantool single (vinyl):

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  11383
MaxLatency (ms):  1076
AvgLatency (ms):  157
Total (s) 11.388865281s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  9104
MaxLatency (ms):  102
AvgLatency (ms):  13
Total (s) 9.196549551s


CLICKHOUSE version: 21.11.4.14

=== clickhouse single:

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2052
MaxLatency (ms):  2078
AvgLatency (ms):  1461
Total (s) 3.570767491s

FailProduce:  0
FailConsume:  0
DoubleConsume:  0
Produced (ms):  2057
MaxLatency (ms):  2008
AvgLatency (ms):  1445
Total (s) 3.536277427s

The result recap table (ms = millisecond, us = microsecond, ns = nanosecond):

only best of 2 runsRedPanda singleRedPanda multiKafka singleKafka multiTarantool memtxTarantool vinylClickhouse single
Publish (ms)2,3872,4136,3806,5359,5969,1042,052
Sub Max Latency (ms)2,1252,70411,85611,9038161022,008
Sub Avg Latency (ms)4904678,6368,52361131,445
Pub Troughput (msg/s)837,872828,844313,480306,044208,420219,684974,659
est. Pub Latency (ns)1,1941,2073,1903,2684,7984,5521,026
est. Sub Throughput (msg/s)4,081,6334,282,655231,589234,65932,786,885153,846,1541,384,083

Conclusion: Tarantool probably the only single node database that can compete with Kafka for queue use case (we can have multi-master replica but not recommended, it's better to use master-slave config where slave used as failover), for other database especially RDBMS that persist to disk pretty sure can only do ~50K tps, Clickhouse can be multi-master, and last time i check, it can do ~600K inserts per seconds (while this time it's around 1M inserts per seconds), I simulate the atomic counter on Clickhouse using TimeStamp64Milli, the query limited to 100 queries per second but it's quite good enough for pub-sub use case. The benefit of using database as MQ/PubSub is that you can do a very flexible query (SQL support), mostly better tooling (especially Clickhouse), or update the record for new consumer, but the cons is that you must notify/fan-out (for example using NATS broadcast, only push the signal for worker to pull), track the ack/retries and the read offset of the workers yourself (pull).

2021-05-31

Easy Tarantool and ClickHouse Replication Setup

These two are currently my favorite databases because of their speed and features, Tarantool (200K tps) for OLTP, and ClickHouse (600K insert/s, 100 qps) for OLAP. Today we will learn how to setup a multi-master replication on each database:

# docker-compose.yml # tarantool
version: '3.3'
services:
  tt1: # master
    image: tarantool/tarantool:2.7.2 # x.x.2+ = stable
    volumes:
      - ./scripts/:/opt/scripts
    command: tarantool /opt/scripts/app.lua
    environment:
      - TARANTOOL_USER_NAME=tester
      - TARANTOOL_USER_PASSWORD=tester
    ports:
      - 13301:3301
  tt2: # slave1
    image: tarantool/tarantool:2.7.2
    volumes:
      - ./scripts/:/opt/scripts
    command: tarantool /opt/scripts/app.lua
    environment:
      - TARANTOOL_USER_NAME=tester
      - TARANTOOL_USER_PASSWORD=tester
    ports:
      - 23301:3301
  tt3: # slave2
    image: tarantool/tarantool:2.7.2
    volumes:
      - ./scripts/:/opt/scripts
    command: tarantool /opt/scripts/app.lua
    environment:
      - TARANTOOL_USER_NAME=tester
      - TARANTOOL_USER_PASSWORD=tester
    ports:
      - 33301:3301

# scripts/app.lua # need to be set on different port/file if not using docker-compose
# for multi-master, you should not use counter data type or it would be out of sync/conflict
# so it's better to use master-slave (2 read_only replica)
box.cfg{
    listen = 3301,
    replication = {
        'replicator:password@tt1:3301', -- master URI
        'replicator:password@tt2:3301', -- replica 1 URI
        'replicator:password@tt3:3301', -- replica 2 URI
    },
    read_only = false # set to true for replica 1 and 2 if you want master-slave
}
box.once("schema", function()
    box.schema.user.create('replicator', {password = 'password'})
    box.schema.user.grant('replicator', 'replication') -- grant replication role
    box.schema.space.create("test")
    box.space.test:create_index("primary")
    print('box.once executed on master')
end)

# start it
docker-compose up

# create table and insert on master
tarantoolctl connect tester:tester@127.0.0.1:13301
connected to 127.0.0.1:13301

127.0.0.1:13301> box.execute [[ create table test1(id int primary key, name string) ]]
---
- row_count: 1
...

127.0.0.1:13301> box.execute [[ insert into test1(id,name) values(1,'test') ]]
---
- row_count: 1
...

# check on slave cluster node
tarantoolctl connect tester:tester@127.0.0.1:23301
connected to 127.0.0.1:23301
127.0.0.1:23301> box.execute [[ select * FROM test1 ]]
---
- metadata:
  - name: ID
    type: integer
  - name: NAME
    type: string
  rows:
  - [1, 'test']
...



That's it, very easy right? Now for the ClickHouse

# docker-compose.yml # clickhouse
version: '3.3'
services:  ch1:
    image: yandex/clickhouse-server
    restart: always
    volumes:
      - ./config.xml:/etc/clickhouse-server/config.d/local.xml
      - ./macro1.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./data/1:/var/lib/clickhouse    
    ports:
      - '18123:8123'
      - '19000:9000'
      - '19009:9009'
    ulimits:
      nproc: 65536
      nofile:
        soft: 252144
        hard: 252144
  ch2:
    image: yandex/clickhouse-server
    restart: always
    volumes:
      - ./config.xml:/etc/clickhouse-server/config.d/local.xml
      - ./macro2.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./data/2:/var/lib/clickhouse
    ports:
      - '28123:8123'
      - '29000:9000'
      - '29009:9009'
    ulimits:
      nproc: 65536
      nofile:
        soft: 252144
        hard: 252144
  ch3:
    image: yandex/clickhouse-server
    restart: always
    volumes:
      - ./config.xml:/etc/clickhouse-server/config.d/local.xml
      - ./macro3.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./data/3:/var/lib/clickhouse
    ports:
      - '38123:8123'
      - '39000:9000'
      - '39009:9009'
    ulimits:
      nproc: 65536
      nofile:
        soft: 252144
        hard: 252144
  zookeeper:
    image: zookeeper

# config.xml
<yandex>
    <remote_servers>
        <replicated>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>ch1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>ch2</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>ch3</host>
                    <port>9000</port>
                </replica>
            </shard>
        </replicated>
    </remote_servers>
    <zookeeper>
        <node>
            <host>zookeeper</host>
            <port>2181</port>
        </node>
    </zookeeper>
</yandex>

# macroXX.xml # replace XX with 1, 2, or 3
<yandex>
    <macros replace="replace">
        <cluster>cluster1</cluster>
        <replica>chXX</replica>
    </macros>
</yandex>

# start it
docker-compose up

# create table and insert on first cluster node
clickhouse-client --port 19000
SELECT * FROM system.clusters;
CREATE DATABASE db1 ON CLUSTER replicated;
SHOW DATABASES;
USE db1;

CREATE TABLE IF NOT EXISTS db1.table1 ON CLUSTER replicated
( id UInt64
, dt Date
, val UInt64
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{cluster}/tables/table1',
'{replica}')
PARTITION BY modulo( id, 1000 )
ORDER BY (dt);

INSERT INTO db1.
table1
(id, dt, val)
VALUES (1,'2021-05-31',2);

# check on second cluster node
clickhouse-client --port 29000
SELECT * FROM db1.table1;

┌─id─┬─dt
─┬─val─┐
│  1 │ 2021-05-31 │   2 │
└────┴────────────┴─────┘
↘ Progress: 1.00 rows, 42.00 B (132.02 rows/s., 5.54 KB/s.)  99%
1 rows in set. Elapsed: 0.008 sec.


That's it, you now can have both database on single computer to start testing the replication or for development (it's recommended to downscale it just to single replica tho).