2022-03-17

Getting started with Cassandra or ScyllaDB

Today we're gonna learn Cassandra (it's been 5 years since I last use ScyllaDB: C++ version of Cassandra), to install Cassandra and ScyllaDB, you can use this docker-compose:

version: '3.3'
services:
  testcassandra:
    image: cassandra:3.11 # or latest
    environment:
      - HEAP_NEWSIZE=256M
      - MAX_HEAP_SIZE=1G
      - "JVM_OPTS=-XX:+PrintGCDateStamps"
      - CASSANDRA_BROADCAST_ADDRESS
    ports:
      - "9042:9042"
  testscylla:
    image: scylladb/scylla:4.5.1 # because latest 4.6 is broken
    command: --smp 2 --memory 1G --overprovisioned 1 --api-address 0.0.0.0 --developer-mode 1
    ports:
      - 19042:9042
#      - 9142:9142
#      - 7000:7000
#      - 7001:7001
#      - 7199:7199
#      - 10000:10000
#  scylla-manager:
#    image: scylladb/scylla-manager
#    depends_on:
#      - testscylla
 
Using docker we can create spawn multiple nodes to test NetworkTopologyStrategy, consistency level, and replication factor (or even multiple datacenter):
 
docker run --name NodeX -d scylladb/scylla:4.5.1
docker run --name NodeY -d scylladb/scylla:4.5.1 --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' NodeX)"
docker run --name NodeZ -d scylladb/scylla:4.5.1 --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' NodeX)"

docker exec -it NodeZ nodetool status
# wait for UJ (up joining) became UN (up normal)

Since I failed to run latest ScyllaDB (so we use 4.5). To install cqlsh locally, you can use this command:

pip3 install cqlsh
cqlsh 127.0.0.1 9042 # cassandra
cqlsh 127.0.0.1 19042 # scylladb

node=`docker ps | grep /scylla: | head -n 1 | cut -f 1 -d ' '`
docker exec -it $node cqlsh # using cqlsh inside scylladb
# ^ must wait 30s+ before docker ready

docker exec -it $node nodetool status
# ^ show node status

As we already know, Cassandra is columnar database, that we have to make a partition key (where the rows will be located) and clustering key (ordering of that data inside the partition), the SSTable part works similar to Clickhouse merges.

To create a keyspace (much like a "database" or collection of tables but we can set replication region), use this command:

CREATE KEYSPACE my_keyspace WITH replication = {'class':
'SimpleStrategy', 'replication_factor': 1};
-- ^ single node
-- {'class' : 'NetworkTopologyStrategy', 'replication_factor': '3'};
-- ^ multiple node but in a single datacenter and/or rack

-- {'class' : 'NetworkTopologyStrategy', 'DC1': '3', 'DC2': '3'};
-- ^ multiple datacenter

USE my_keyspace;

CONSISTENCY; -- how many read/write ack
-- ANY
-- ONE, TWO, THREE
-- LOCAL_ONE
-- QUORUM = replication_factor / 2 + 1
-- LOCAL_QUORUM
-- EACH_QUORUM -- only for write
-- ALL -- will failed if nodes < replication_factor
CONSISTENCY new_level;

To create a table with same partition key and clustering/ordering key:

CREATE TABLE users ( -- or TYPE for custom type, [keyspace.]
  
fname text, 
  lname text,
  title text,
  PRIMARY KEY (lname, fname)
);
DESCRIBE TABLE users; -- only for 4.0+
CREATE TABLE foo (
  pkey text,
  okey text,
  PRIMARY KEY ((pkey), okey) -- different partition and ordering
  -- add WITH CLUSTERING ORDER BY (okey DESC) for descending
); -- add WITH cdc = { 'enabled' = true, preimage = 'true' }
DESC SCHEMA; -- show all tables and materialized views


To upsert, use insert or update command (last write wins):

INSERT INTO users (fname, lname, title)
VALUES ('A', 'B', 'C');
INSERT INTO users (fname, lname, title)
VALUES ('A', 'B', 'D'); -- add IF NOT EXISTS to prevent replace
SELECT * FROM users; -- USING TIMEOUT XXms

UPDATE users SET title = 'E' WHERE fname = 'A' AND lname = 'C';
SELECT * FROM users; -- add IF EXISTS to prevent insert

# INSERT INTO users( ... ) VALUES ( ... ) USING TTL 600
# UPDATE users USING TTL 600 SET ...
# SELECT TTL(fname) FROM users WHERE ...
-- set TTL to 0 to remove TTL
-- column will be NULL if TTL became 0
-- whole row will be deleted if all non-PK column TTL is zero
# ALTER TABLE users WITH default_time_to_live = 3600;

# SELECT * FROM users LIMIT 3
# SELECT * FROM users PER PARTITION LIMIT 2
# SELECT * FROM users PER PARTITION LIMIT 1 LIMIT 3

CREATE TABLE stats(city text PRIMARY KEY,total COUNTER);
UPDATE stats SET total =
total + 6 WHERE city = 'Kuta';
SELECT * FROM stats;

To change the schema, use usual alter table command:

ALTER TABLE users ADD mname text;
-- tinyint, smallint, int, bigint (= long)
-- variant (= the real bigint)
-- float, double
-- decimal
-- text/varchar, ascii
-- timestamp
-- date, time
-- uuid
-- timeuuid (with mac address, conflict free, set now())
-- boolean
-- inet
-- counter
-- set<type> (set {val,val}, +{val}, -{val})
-- list<type> (set [idx]=, [val,val], +[], []+, -[], DELETE [idx])
-- map<type,type> (set {key: val}, [key]=, DELETE [key] FROM)
-- tuple<type,...> (set (val,...))>

SELECT * FROM users;

UPDATE users SET mname = 'F' WHERE fname = 'A' AND lname = 'D';
-- add IF col=val to prevent update (aka lightweight transaction)
-- IF NOT EXISTS
--
SELECT * FROM users;
 
Complex nested type example from this page:

CREATE TYPE phone (
    country_code int,
    number text,
);
CREATE TYPE address (
  street text,
  city text,
  zip text,
  phones map<text, frozen<phone>> -- must be frozen, cannot be updated
);
CREATE TABLE pets_v4 (
  name text PRIMARY KEY,
  addresses map<text, frozen<address>>
);
INSERT INTO pets_v4 (name, addresses)
  VALUES ('Rocky', {
    'home' : {
      street: '1600 Pennsylvania Ave NW',
      city: 'Washington',
      zip: '20500',
      phones: {
        'cell' : { country_code: 1, number: '202 456-1111' },
        'landline' : { country_code: 1, number: '202 456-1234' }
      }
    },
    'work' : {
      street: '1600 Pennsylvania Ave NW',
      city: 'Washington',
      zip: '20500',
      phones: { 'fax' : { country_code: 1, number: '202 5444' } }
    }
  });

 
To create index (since Cassandra only allows retrieve by partition and cluster key or full scan):

CREATE INDEX ON users(title); -- global index (2 hops per query)
SELECT * FROM users WHERE title = 'E';
DROP INDEX users_title_idx;
SELECT * FROM users WHERE title = 'E' ALLOW FILTERING; -- full scan
CREATE INDEX ON users((lname),title); -- local index (1 hop per query)

To create a materialized view (that works similar to Clickhouse's materialized view):

CREATE MATERIALIZED VIEW users_by_title AS 
SELECT * -- ALTER TABLE will automatically add this VIEW too
FROM users
WHERE title IS NOT NULL
  AND fname IS NOT NULL

  AND lname IS NOT NULL

PRIMARY KEY ((title),lname,fname);
SELECT * FROM users_by_title;
INSERT INTO users(lname,fname,title) VALUES('A','A','A');
SELECT * FROM users_by_title WHERE title = 'A';
DROP MATERIALIZED VIEW users_by_title;
-- docker exec -it NodeZ nodetool viewbuildstatus

To create "transaction" use BATCH statement:

BEGIN BATCH;
INSERT INTO ...
UPDATE ...
DELETE ...
APPLY BATCH; 

To import from file, use COPY command:

COPY users FROM 'users.csv' WITH HEADER=true;

Tips for performance optimization:
1. for multi-DC use LocalQuorum on read, and TokenAware+DCAwareRoundRobin to prevent reading from nodes on different DC
2. ALLOW FILTERING if small number of records low cardinality (eg. values are true vs false only) -- 0 hop
3. global INDEX when primary key no need to be included, and latency doesn't matter (2 hops)
4. local INDEX for when primary key can be included (1 hops)
5. MATERIALIZED VIEW when want to use different partition for the same data, and storage doesn't matter
6. always use prepared statement