2022-03-17

Getting started with Cassandra or ScyllaDB

Today we're gonna learn Cassandra (it's been 5 years since I last use ScyllaDB: C++ version of Cassandra), to install Cassandra and ScyllaDB, you can use this docker-compose:

version: '3.3'
services:
  testcassandra:
    image: cassandra:3.11 # or latest
    environment:
      - HEAP_NEWSIZE=256M
      - MAX_HEAP_SIZE=1G
      - "JVM_OPTS=-XX:+PrintGCDateStamps"
      - CASSANDRA_BROADCAST_ADDRESS
    ports:
      - "9042:9042"
  testscylla:
    image: scylladb/scylla:4.5.1 # because latest 4.6 is broken
    command: --smp 2 --memory 1G --overprovisioned 1 --api-address 0.0.0.0 --developer-mode 1
    ports:
      - 19042:9042
#      - 9142:9142
#      - 7000:7000
#      - 7001:7001
#      - 7199:7199
#      - 10000:10000
#  scylla-manager:
#    image: scylladb/scylla-manager
#    depends_on:
#      - testscylla
 
Using docker we can create spawn multiple nodes to test NetworkTopologyStrategy, consistency level, and replication factor (or even multiple datacenter):
 
docker run --name NodeX -d scylladb/scylla:4.5.1
docker run --name NodeY -d scylladb/scylla:4.5.1 --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' NodeX)"
docker run --name NodeZ -d scylladb/scylla:4.5.1 --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' NodeX)"

docker exec -it NodeZ nodetool status
# wait for UJ (up joining) became UN (up normal)

Since I failed to run latest ScyllaDB (so we use 4.5). To install cqlsh locally, you can use this command:

pip3 install cqlsh
cqlsh 127.0.0.1 9042 # cassandra
cqlsh 127.0.0.1 19042 # scylladb

node=`docker ps | grep /scylla: | head -n 1 | cut -f 1 -d ' '`
docker exec -it $node cqlsh # using cqlsh inside scylladb
# ^ must wait 30s+ before docker ready

docker exec -it $node nodetool status
# ^ show node status

As we already know, Cassandra is columnar database, that we have to make a partition key (where the rows will be located) and clustering key (ordering of that data inside the partition), the SSTable part works similar to Clickhouse merges.

To create a keyspace (much like a "database" or collection of tables but we can set replication region), use this command:

CREATE KEYSPACE my_keyspace WITH replication = {'class':
'SimpleStrategy', 'replication_factor': 1};
-- ^ single node
-- {'class' : 'NetworkTopologyStrategy', 'replication_factor': '3'};
-- ^ multiple node but in a single datacenter and/or rack

-- {'class' : 'NetworkTopologyStrategy', 'DC1': '3', 'DC2': '3'};
-- ^ multiple datacenter

USE my_keyspace;

CONSISTENCY; -- how many read/write ack
-- ANY
-- ONE, TWO, THREE
-- LOCAL_ONE
-- QUORUM = replication_factor / 2 + 1
-- LOCAL_QUORUM
-- EACH_QUORUM -- only for write
-- ALL -- will failed if nodes < replication_factor
CONSISTENCY new_level;

To create a table with same partition key and clustering/ordering key:

CREATE TABLE users ( -- or TYPE for custom type, [keyspace.]
  
fname text, 
  lname text,
  title text,
  PRIMARY KEY (lname, fname)
);
DESCRIBE TABLE users; -- only for 4.0+
CREATE TABLE foo (
  pkey text,
  okey text,
  PRIMARY KEY ((pkey), okey) -- different partition and ordering
  -- add WITH CLUSTERING ORDER BY (okey DESC) for descending
); -- add WITH cdc = { 'enabled' = true, preimage = 'true' }
DESC SCHEMA; -- show all tables and materialized views


To upsert, use insert or update command (last write wins):

INSERT INTO users (fname, lname, title)
VALUES ('A', 'B', 'C');
INSERT INTO users (fname, lname, title)
VALUES ('A', 'B', 'D'); -- add IF NOT EXISTS to prevent replace
SELECT * FROM users; -- USING TIMEOUT XXms

UPDATE users SET title = 'E' WHERE fname = 'A' AND lname = 'C';
SELECT * FROM users; -- add IF EXISTS to prevent insert

# INSERT INTO users( ... ) VALUES ( ... ) USING TTL 600
# UPDATE users USING TTL 600 SET ...
# SELECT TTL(fname) FROM users WHERE ...
-- set TTL to 0 to remove TTL
-- column will be NULL if TTL became 0
-- whole row will be deleted if all non-PK column TTL is zero
# ALTER TABLE users WITH default_time_to_live = 3600;

# SELECT * FROM users LIMIT 3
# SELECT * FROM users PER PARTITION LIMIT 2
# SELECT * FROM users PER PARTITION LIMIT 1 LIMIT 3

CREATE TABLE stats(city text PRIMARY KEY,total COUNTER);
UPDATE stats SET total =
total + 6 WHERE city = 'Kuta';
SELECT * FROM stats;

To change the schema, use usual alter table command:

ALTER TABLE users ADD mname text;
-- tinyint, smallint, int, bigint (= long)
-- variant (= the real bigint)
-- float, double
-- decimal
-- text/varchar, ascii
-- timestamp
-- date, time
-- uuid
-- timeuuid (with mac address, conflict free, set now())
-- boolean
-- inet
-- counter
-- set<type> (set {val,val}, +{val}, -{val})
-- list<type> (set [idx]=, [val,val], +[], []+, -[], DELETE [idx])
-- map<type,type> (set {key: val}, [key]=, DELETE [key] FROM)
-- tuple<type,...> (set (val,...))>

SELECT * FROM users;

UPDATE users SET mname = 'F' WHERE fname = 'A' AND lname = 'D';
-- add IF col=val to prevent update (aka lightweight transaction)
-- IF NOT EXISTS
--
SELECT * FROM users;
 
Complex nested type example from this page:

CREATE TYPE phone (
    country_code int,
    number text,
);
CREATE TYPE address (
  street text,
  city text,
  zip text,
  phones map<text, frozen<phone>> -- must be frozen, cannot be updated
);
CREATE TABLE pets_v4 (
  name text PRIMARY KEY,
  addresses map<text, frozen<address>>
);
INSERT INTO pets_v4 (name, addresses)
  VALUES ('Rocky', {
    'home' : {
      street: '1600 Pennsylvania Ave NW',
      city: 'Washington',
      zip: '20500',
      phones: {
        'cell' : { country_code: 1, number: '202 456-1111' },
        'landline' : { country_code: 1, number: '202 456-1234' }
      }
    },
    'work' : {
      street: '1600 Pennsylvania Ave NW',
      city: 'Washington',
      zip: '20500',
      phones: { 'fax' : { country_code: 1, number: '202 5444' } }
    }
  });

 
To create index (since Cassandra only allows retrieve by partition and cluster key or full scan):

CREATE INDEX ON users(title); -- global index (2 hops per query)
SELECT * FROM users WHERE title = 'E';
DROP INDEX users_title_idx;
SELECT * FROM users WHERE title = 'E' ALLOW FILTERING; -- full scan
CREATE INDEX ON users((lname),title); -- local index (1 hop per query)

To create a materialized view (that works similar to Clickhouse's materialized view):

CREATE MATERIALIZED VIEW users_by_title AS 
SELECT * -- ALTER TABLE will automatically add this VIEW too
FROM users
WHERE title IS NOT NULL
  AND fname IS NOT NULL

  AND lname IS NOT NULL

PRIMARY KEY ((title),lname,fname);
SELECT * FROM users_by_title;
INSERT INTO users(lname,fname,title) VALUES('A','A','A');
SELECT * FROM users_by_title WHERE title = 'A';
DROP MATERIALIZED VIEW users_by_title;
-- docker exec -it NodeZ nodetool viewbuildstatus

To create "transaction" use BATCH statement:

BEGIN BATCH;
INSERT INTO ...
UPDATE ...
DELETE ...
APPLY BATCH; 

To import from file, use COPY command:

COPY users FROM 'users.csv' WITH HEADER=true;

Tips for performance optimization:
1. for multi-DC use LocalQuorum on read, and TokenAware+DCAwareRoundRobin to prevent reading from nodes on different DC
2. ALLOW FILTERING if small number of records low cardinality (eg. values are true vs false only) -- 0 hop
3. global INDEX when primary key no need to be included, and latency doesn't matter (2 hops)
4. local INDEX for when primary key can be included (1 hops)
5. MATERIALIZED VIEW when want to use different partition for the same data, and storage doesn't matter
6. always use prepared statement

2022-02-22

C# vs Go in Simple Benchmark

Today we're gonna retry two of my few favorite language in associative array and comb sort benchmark (compile and run, not just runtime performance, because development waiting for compilation time also important) like in the past benchmark. For installing DotNet:

wget https://packages.microsoft.com/config/ubuntu/21.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb
sudo dpkg -i packages-microsoft-prod.deb
rm packages-microsoft-prod.deb
sudo apt install apt-transport-https
sudo apt-get update
sudo apt-get install -y dotnet-sdk-6.0 aspnetcore-runtime-6.0

For installing Golang:

sudo add-apt-repository ppa:longsleep/golang-backports
sudo apt-get update
sudo apt install -y golang-1.17

Result (best of 3 runs)

cd assoc; time dotnet run
6009354 6009348 611297
36186112 159701682 23370001

CPU: 14.16s     Real: 14.41s    RAM: 1945904KB

cd assoc; time go run map.go
6009354 6009348 611297
36186112 159701682 23370001

CPU: 14.80s     Real: 12.01s    RAM: 2305384KB

This is a bit weird, usually I see that Go use less memory but slower, but in this benchmark C# that are using less memory but a bit slower (14.41s vs 12.01s), possibly because the compilation speed also included.

cd num-assoc; time dotnet run
CPU: 2.21s      Real: 2.19s     RAM: 169208KB

cd num-assoc; time go run comb.go
CPU: 0.46s      Real: 0.44s     RAM: 83100KB

What if we increase the N from 1 million to 10 million?

cd num-assoc; time dotnet run
CPU: 19.25s     Real: 19.16s    RAM: 802296KB

cd num-assoc; time go run comb.go
CPU: 4.60s      Real: 4.67s     RAM: 808940KB

If you want to contribute (if I make mistake when coding the C# or Go version of the algorithm, or if there's more efficient data structure, just fork and create a pull request, and I will redo the benchmark).

2022-01-28

Getting started with Kubernetes

Since the containerization got more popular, kubernetes gained more traction than using just one VM for deployment. In previous post I explained why or when you don't need kubernetes and when you'll need it. At least from deployment perspective we can categorize it into 5 types (based on ownership, initial cost, and granularity of the recurring cost, the need for capacity planning):

1. On-Premise Dedicated Server, your own server, your own rack, or put it the colocation, we own the hardware, and we have to replace it when it's broken, we have to maintain the network part (switches, routers). Usually this one is best choice for internal services (software that used only by internal staff, from the security and bandwidth perspective.

2. VM, we rent the "cloud" infrastructure, this can be considered IaaS (Infrastructure as a Service), we rent a virtual machine/server or sometimes named Virtual Private/Dedicated Server, so we pay monthly when the server turned on (or based on contract, X per month/year) also sometimes the bandwidth (unless the provider have unmetered billing). Some of notable product in this category: Google Compute Engine, Amazon EC2, Azure VM, Contabo VPS/VDS, etc. Usually this one is the best for databases (unless you are using managed database service) or other stateful applications or if the maximum number of users are limited that estimated based on the capacity planning (not whole world will be accessing this VM), because it a bit insane if you want your data moving/replicating automatically on high load, so manual trigger/scheduled for scale up/out/down/in, for stateful/database this one will still better option (eg. scale up/out when black friday, scale down/in when it ends).

3. Kubernetes, we rent or use managed kubernetes, or install kubernetes on top of our own own-premise dedicated server. Usually the company will rent 3 huge servers 64 core, 256GB RAM, with very large harddisk, and let developer to deploy containers/pod inside the kubernetes themself splitted based on their team or service's namespace. This have constant cost (those 3 huge VMs/OnPrems, and the managed kubernetes service's cost), some provider also provides automatic node scale-out (so the kubernetes nodes/VM [where the pods will be put inside] can be spawned/deleted based on load). Some notable product in this category: GKE, Amazon EKS, AKS, DOKS, Jelastic Kubernetes Cluster, etc. This one best if you have a lot number of services. For truly self-managed one: Mesos, Docker Swarm, Nomad combined with some other service (since they only manage a fraction of Kubernetes features, eg. Consul for service discovery, FabioLB for load balancer, Vault for secrets management, etc) can be other alternative. The different between this and number 4 below is, you still mostly have to provision the node VM/hardware manually (but can be automated with scripts as long as the provider supported it, eg. via Pulumi or IaaS provider's provisioning/autoscaling group API).

4. Container Engine, in this type of deployment we use the infrastructure provider's platform, so we only need to supply container without have to rent the VM manually, some provider will deploy the container inside a single VM, some other will deploy the container on shared dedicated server/VM. All of them have the same feature: autoscale out, but only some of them have auto scale up. Some notable product in this category: Google AppEngine, Amazon ECS/Beanstalk/Fargate, Azure App Service, Jelastic Cloud, Heroku, etc. Usually this one is the best choice for most cases on budget-wise and for scalability side, especially if you have a small number of services or using monolith, or can also great for large number of services if the billing is granular (pay only the resources [CPU, RAM, Disk] you utilize, not when the server turned on) like in Jelastic.

5. Serverless/FaaS, we only need to supply the function (mostly based on a specific template) that will run on specific event (eg. on specific time like CRON, or when load balancer receiving a request like in old CGI). Usually the function put inside a container, and used as standby instance, so the scale out only happened when it receives high load. If the function requires database as dependency, it's recommended to use managed databases that support high number of connections/connect-disconnect or offloaded to MQ/PubSub service. Notable products in this category: Google CloudRun, AWS Lambda, Azure Functions, OpenFaaS, Netlify, Vercel, Cloudflare Workers, etc. We pay this service usually based on CPU duration, number of calls, total RAM usage, bandwidth, and any other metrics, so it would be very cheap when number of function calls are small, but can be really costly if you are writing inefficient function or have large number of calls. Usually lambda only used for handling spikes or as atomic CRON.

Because of the hype, or because it fit their use cases (bunch of teams that want to do independent service deployments), and the possibility of avoiding vendor locking, sometimes a company might decide to use kubernetes. Most of the company can survive by not following the hype by only managed database (or deploying database on VM or even using docker-compose with volume binding) + container engine (for scaling out strategy), not having to train everyone to learn Kubernetes and not having to have a dedicated team/person to set up and manage Kubernetes (security, policies, etc).

But today we're gonna try one of the fastest local kubernetes for development use-case (not for production).

curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

minikube start

# use --driver=kvm2 or virtualbox if docker cannot connect internet
#sudo apt install virtualbox
#sudo apt install qemu-kvm libvirt-daemon-system libvirt-clients bridge-utils virt-manager
#sudo adduser `id -un` libvirt
#sudo adduser `id -un` kvm

alias kubectl='minikube kubectl --'
alias k=kubectl

# will download kubectl if it's the first time
k

# get pods from all namespace
k get po -A

# open dashboard and authenticate
minikube dashboard 

# destroy minikube cluster
minikube ssh # sudo poweroff
minikube delete


create Dockerfile you want to deploy to kubernetes cluster, or if it's just simple single binary golang project, build locally then put it to alpine docker (instead of build cleanly inside docker), then push to image registry will be work just fine:

# build binary
CGO_ENABLED=0 GOOS=linux go build -o ./bla.exe

# create Dockerfile
echo '
FROM alpine:latest
WORKDIR /
COPY bla.exe .
CMD ./bla.exe # or ENTRYPOINT if you need to override
' > Dockerfile

# build docker image
VERSION=$(ruby -e 't = Time.now; print "v1.#{t.month+(t.year-2021)*12}%02d.#{t.hour}%02d" % [t.day, t.min]')
COMMIT=$(git rev-parse --verify HEAD)
APPNAME=local-bla
docker image build -f ./Dockerfile . \
  --build-arg "app_name=$APPNAME" \
  -t "$APPNAME:latest" \
  -t "$APPNAME:$COMMIT" \
  -t "$APPNAME:$VERSION"

# example how to test locally without kubernetes
docker image build -f ./Dockerfile -t docktest1 .
docker run --network=host \
  --env 'DBMAST=postgres://usr:pwd@127.1:5432/db1?sslmode=disable' \
  --env 'DBREPL=postgres://usr:pwd@127.1:5432/db1?sslmode=disable' \
  --env DEBUG=true \
  --env PORT=8082 \
  docktest1

# push image to minikube
minikube image load $APPNAME

# create deployment config
echo '
apiVersion: v1
kind: Pod
metadata:
  name: bla-pod
spec:
  containers:
    - name: bla
      image:
bla
      imagePullPolicy: Never
      env:
      - name: BLA_ENV
        value: "ENV_VALUE_TO_INJECT"
      # if you need access to docker-compose outside the kube cluster
      # use minikube ssh, route -n, check the ip of the gateway
      # and use that ip as connection string
      # it should work, as long the port forwarded
  restartPolicy: Never
' > bla-pod.yaml

# deploy
kubectl apply -f bla-pod.yaml

# check
k get pods
k logs bla-pod

# delete deployment
kubectl delete pod bla-pod

If you need NewRelic log forwarding, it's just as easy as adding a helm chart (it would automatically attach new pod logs and send it to newrelic):

curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm repo add newrelic https://helm-charts.newrelic.com
helm search repo newrelic/
helm install newrelic-logging newrelic/newrelic-logging --set licenseKey=eu01xx2xxxxxxxxxxxxxxxRAL
kubectl get daemonset -o wide -w --namespace default newrelic-logging

The next step should be adding load balancer or ingress so that pod can receive http requests. Other alternative for faster development workflow (autosync after recompile to the pod), you can try Tilt.