2024-02-20

Dump/export Cassandra/BigQuery tables and import to Clickhouse

Today we're gonna dump Cassandra table and put it to Clickhouse. Cassandra is columnar database but use as OLTP since it have really good distributed capability (customizable replication factor, multi-cluster/region, clustered/partitioned by default -- so good for multitenant applications), but if we need to do analytics queries or some complex query, it became super sucks, even with ScyllaDB's materialized view (which only good for recap/summary). To dump Cassandra database, all you need to do just construct a query and use dsbulk, something like this:

./dsbulk unload -delim '|' -k "KEYSPACE1" \
   -query "SELECT col1,col2,col3 FROM table1" -c csv \
   -u 'USERNAME1' -p 'PASSWORD1' \
   -b secure-bundle.zip | tr '\\' '"' |
    gzip -9 > table1_dump_YYYYMMDD.csv.gz ;


tr command above used to unescape backslash, since dsbulk export csv not in proper format (\" not ""), after than you can just restore it by running something like this:

CREATE TABLE table1 (
    col1 String,
    col2 Int64,
    col3 UUID,
) ENGINE = ReplacingMergeTree()
ORDER BY (col1, col2);

SET format_csv_delimiter = '|';
SET input_format_csv_skip_first_lines = 1;

INSERT INTO table1
FROM INFILE 'table1_dump_YYYYMMDD.csv.gz'
FORMAT CSV;


BigQuery

 Similar to Clickhouse, BigQuery is one of the best analytical engine (because of unlimited compute and massively parallel storage), but it comes with cost, improper partitioning/clustering (even with proper one, because it's limited to only 1 column unlike Clickhouse that can do more) with large table will do a huge scan ($6.25 per TiB) and a lot of compute slot, if combined with materialized view or periodic query on cron, it would definitely kill your wallet. To dump from BigQuery all you need to do just create GCS (Google Cloud Storage) bucket then run some query something like this:

EXPORT DATA
  OPTIONS (
    uri = 'gs://
BUCKET1/table2_dump/1-*.parquet',
    format = 'PARQUET',
    overwrite = true
    --, compression = 'GZIP' -- import failed: ZLIB_INFLATE_FAILED
  )
AS (
  SELECT * FROM `dataset1.table2`
);

-- it's better to create snapshot table
-- if you do WHERE filter on above query, eg.
CREATE TABLE dataset1.table2_filtered_snapshot AS
  SELECT * FROM `
dataset1.table2` WHERE col1 = 'yourFilter';

Not using compression because it's failed to import, not sure why. The parquet files will be shown on your bucket, click on "Remove public access prevention", and allow it to be publicly available with gcloud command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET1 --member=allUsers --role=roles/storage.objectViewer
# remove-iam-policy-binding to undo this

 

 Then just restore it:

CREATE TABLE table2 (
          Col1 String,
          Col2 DateTime,
          Col3 Int32
) ENGINE = ReplacingMergeTree()
ORDER BY (Col1, Col2, Col3);

SET parallel_distributed_insert_select = 1;

INSERT INTO table2
SELECT Col1, Col2, Col3
FROM s3Cluster(
    'default',
    'https://storage.googleapis.com/BUCKET1/table2_dump/1-*.parquet',
    '', -- s3 access id, remove or leave empty if public
    '' -- s3 secret key,
remove or leave empty if public
);


Writing UDF for Clickhouse using Golang

Today we're going to create an UDF (User-defined Function) in Golang that can be run inside Clickhouse query, this function will parse uuid v1 and return timestamp of it since Clickhouse doesn't have this function for now. Inspired from the python version with TabSeparated delimiter (since it's easiest to parse), UDF in Clickhouse will read line by line (each row is each line, and each text separated with tab is each column/cell value):

package main
import (
    "bufio"
    "encoding/binary"
    "encoding/hex"
    "fmt"
    "os"
    "strings"
    "time"
)
func main() {
    scanner := bufio.NewScanner(os.Stdin)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        id, _ := FromString(scanner.Text())
        fmt.Println(id.Time())
    }
}
func (me UUID) Nanoseconds() int64 {
    time_low := int64(binary.BigEndian.Uint32(me[0:4]))
    time_mid := int64(binary.BigEndian.Uint16(me[4:6]))
    time_hi := int64((binary.BigEndian.Uint16(me[6:8]) & 0x0fff))
    return int64((((time_low) + (time_mid << 32) + (time_hi << 48)) - epochStart) * 100)
}
func (me UUID) Time() time.Time {
    nsec := me.Nanoseconds()
    return time.Unix(nsec/1e9, nsec%1e9).UTC()
}
// code below Copyright (C) 2013 by Maxim Bublis <b@codemonkey.ru>
// see https://github.com/satori/go.uuid
// Difference in 100-nanosecond intervals between
// UUID epoch (October 15, 1582) and Unix epoch (January 1, 1970).
const epochStart = 122192928000000000
// UUID representation compliant with specification
// described in RFC 4122.
type UUID [16]byte
// FromString returns UUID parsed from string input.
// Following formats are supported:
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
func FromString(input string) (u UUID, err error) {
    s := strings.Replace(input, "-", "", -1)
    if len(s) == 41 && s[:9] == "urn:uuid:" {
        s = s[9:]
    } else if len(s) == 34 && s[0] == '{' && s[33] == '}' {
        s = s[1:33]
    }
    if len(s) != 32 {
        err = fmt.Errorf("uuid: invalid UUID string: %s", input)
        return
    }
    b := []byte(s)
    _, err = hex.Decode(u[:], b)
    return
}
// Returns canonical string representation of UUID:
// xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
func (u UUID) String() string {
    return fmt.Sprintf("%x-%x-%x-%x-%x",
        u[:4], u[4:6], u[6:8], u[8:10], u[10:])
}


Compile and put it with proper owner and permission on /var/lib/clickhouse/user_scripts/uuid2timestr and create /etc/clickhouse-server/uuid2timestr_function.xml (must be have proper suffix) containing:

<functions>
    <function>
        <type>executable</type>
        <name>uuid2timestr</name>
        <return_type>
String</return_type>
        <argument>
            <type>String</type>
        </argument>
        <format>TabSeparated</format>
        <command>uuid2timestr</command>
        <lifetime>0</lifetime>
    </function>
</functions>


after that you can restart Clickhouse (sudo systemctl restart clickhouse-server or sudo clickhouse restart) depends on how you install it (apt or binary setup).

Usage


to make sure it's loaded, you can just find this line on the log:

<Trace> ExternalUserDefinedExecutableFunctionsLoader: Loading config file '/etc/clickhouse-server/uuid2timestr_function.xml

then just run a query using that function:

SELECT uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')

┌─uuid2timestr('51038948-97ea-11ee-b7e0-52de156a77d8')─┐
│ 2023-12-11 05:58:33.2391752 +0000 UTC                │
└──────────────────────────────────────────────────────┘