How to Analyze Billions of Records per Second on a Single Desktop PC

Everything you never wanted to know about LocustDB

This section provides a detailed overview of how LocustDB executes queries by following two queries through all stages of query execution.

High level architecture

In-memory storage

All tables are stored in a hash map that maps table names to the table data structures. Each table is subdivided into a list of immutable partitions that hold some subset of the records for the table. Each of these partitions consists of a list of columns. Each column stores the data associated with the column and some metadata like the name of the column, the encoding scheme and summary information about the stored values.

Query execution

When the system receives a query string, it is parsed into an abstract syntax tree (AST) and the referenced table is extracted. LocustDB then creates a snapshot of that table by cloning the list of partitions that make up the table. This does not actually involve copying any of the table data, it just clones the reference counted pointers to the (immutable) partitions. The table snapshot, AST and some other data are all packaged up in a QueryTask object and added to a queue of pending tasks. On start up, LocustDB will spawn one worker thread for each core, all of which are notified when a new task is put onto the queue. The query task is picked up by all idle worker threads, which can process individual partitions in parallel. Sub results from each partition are combined by the worker thread that produced them, and a final pass by a single thread combines the subresults of each worker thread after all partitions have been processed.

Select + filter

After this high level overview, let’s dive into the details of how queries are actually executed by following an example query through all stages of execution. All of the example output shown in this section was generated by running commands in the LocustDB REPL. If you want to follow along, or play around with your own data set or different queries, you can do so by following these instructions. (Fair warning: you will have to compile from source but Rust’s excellent build system/package manager Cargo makes that painless). Anyway, this is the first query we will dissect:

locustdb> SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

Scanned 2097152 rows in 14ms (0.14B rows/s)!

pickup_ntaname
------------------------------
"Lenox Hill-Roosevelt Island"
"Airport"
"Midtown-Midtown South"
"Central Harlem South"
"Midtown-Midtown South"

Pretty straightforward, we just select five nonempty entries from the pickup_ntaname column. As you will see, there’s still a lot going behind the scenes! The first thing that needs to happen is converting the query string into abstract syntax trees. My parser is very unremarkable and all I will say about it is that if you format your queries just right and all the stars align it will produce something like this:

locustdb> :ast SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;
Query {
    select: [
        ColName(
            "pickup_ntaname"
        )
    ],
    table: "trips",
    filter: Func2(
        NotEquals,
        ColName(
            "pickup_ntaname"
        ),
        Const(
            Str(
                ""
            )
        )
    ),
    aggregate: [],
    order_by: None,
    order_desc: false,
    limit: LimitClause {
        limit: 5,
        offset: 0
    },
    order_by_index: None
}

The next step is to turn these ASTs into a sequence of vector operators that can be executed by the query engine. Since each partition may use a different encoding for the same column, the query plan can actually be different for each partition. In this case, we only need to touch a single partition to find our five records so we only have a single query plan, which can be inspected with the :explain command:

locustdb> :explain SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

Query plan in 1 batches
-- Stage 0 (streaming) --
column_0     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
constant_1   = ""                                            Constant
encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt>
column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
filtered_5   = column_4[equals_3]                            Filter<u8>
decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>

Each line in the output represents a single operator. The pseudo-code expression on the left attempts to summarize what the operator does, borrowing from Matlabs indexing notation. The identifier on the right corresponds to the name of the Rust structure that implements the operator (you can find them all in this folder). To make it clearer what’s going on, let’s go through each operation individually. The following snippets by the :show command print not only the query plan, but also the output generated by each operator on the first mini-batch of the first partition. (Note: the columns in the actual dataset are very uniform and I manually edited the outputs for clarity).

The first line of the output marks the beginning of stage 0. The query engine automatically groups all operators that can be executed concurrently into stages. In this query, all operators can be grouped under a single stage.

-- Stage 0 --

The next line gives information about the number of elements in the partition (1048576) and the length of the temporary buffers that are allocated by the operators. In this case, we will process the data set in chunks of size 1024 for a total of 1024 iterations. This is a very important optimization that allows all temporary results to remain inside L1 cache, and speeds up typical queries by a small integer multiple.

batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024

Below is the first actual operator, which writes a reference to the column data to the shared scratchpad. One detail to note is that pickup_ntaname is stored as dictionary encoded U8 (byte) values.

column_0 = [pickup_ntaname; U8; StringDictionary]             GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more) 

This next operator simply assigns the string constant “” from the original query string to variable constant_1.

constant_1   = ""                                            Constant
Scalar("")

The next operator finally does something interesting. It encodes above constant using the string dictionary, which works by looking up at what index the "" string is stored at, in this case index 0. The query engine does this so that the pickup_ntaname <> "" expression can be evaluated by comparing this encoded integer value with the raw column data for pickup_ntaname. No string comparisons required!

encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
Scalar(0)

The actual comparison operation. This gives us a vector of bytes with values 0/1 that tells us which records we want to select.

equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt<u8>>
Vec<U8>[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)

The Filter operation jointly scans the vector we just produced together with the raw column to select matching records:

column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more) 
filtered_5   = column_4[equals_3]                            Filter<u8>
Vec<U8>[96, 1]

Finally, the selected records are turned back into strings by looking up the corresponding entries in the dictionary:

decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>
Vec<Str>[Lenox Hill-Roosevelt Island, Airport]

And then we do all of this again 1023 more times:

[1023 more iterations]

Below you can see the entire output together:

locustdb> :show SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

-- Stage 0 --
batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024
column_0     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
constant_1   = ""                                            Constant
Scalar("")
encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
Scalar(0)
equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt>
Vec<U8>[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
filtered_5   = column_4[equals_3]                            Filter<u8>
Vec<U8>[96, 1]
decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>
Vec<Str>[Lenox Hill-Roosevelt Island, Airport]

[1023 more iterations]

Scanned 3145728 rows in 15ms (0.20B rows/s)!

pickup_ntaname
------------------------------
"Lenox Hill-Roosevelt Island"
"Airport"
"Midtown-Midtown South"
"Central Harlem South"
"Midtown-Midtown South"

Group by and aggregation

Still bearing with me? Then let’s look at a more complex query that performs grouping and also aggregates subresults from different partitions:

locustdb> SELECT passenger_count, store_and_fwd_flag, count(0), sum(total_amount) FROM trips LIMIT 10;

Scanned 1464785771 rows in 1330ms (1.10B rows/s)!

passenger_count | store_and_fwd_flag | count_0   | sum_1
----------------+--------------------+-----------+--------------
0               | ""                 | 10712     | 23670371
0               | "0"                | 239       | 395984
0               | "N"                | 3988526   | 4351189801
0               | "Y"                | 107966    | 134586050
1               | ""                 | 319360760 | 420920786930
1               | "*"                | 1         | 470
1               | "0"                | 76996375  | 82565436325
1               | "1"                | 1363418   | 1512292719
1               | "N"                | 622044598 | 907071715859
1               | "Y"                | 8422445   | 13233585913

This time, the query plan consists of 19 operators grouped into multiple stages. The query planner will make stages as large as possible, but some operators cannot produce or operate on partial results and have to be separated out into their own stage. For example, the summation operator has to run for the entire partition before its output can be used, otherwise some of the entries might still be incomplete. Other operators, like sorting, can’t be computed incrementally and need all of their inputs in full before they are run.

There are actually 4 slightly different query plans this time due to different encodings for the total_amount column. Here, I am just showing the most common query plan, which is constructed for 988 out of 1465 partitions:

Query plan in 988 batches
-- Stage 0 (streaming) --
column_0     = [store_and_fwd_flag; U8; StringDictionary]    GetEncoded
casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
column_2     = [passenger_count; U8; IntCast]                GetEncoded
casted_3     = column_2 as I64                               TypeConversionOperator<u8, i64>
bitpacked_4  = casted_1 + (casted_3 << $shift)               ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>
constant_5   = Constant                                      Constant
count_6[bitpacked_4] += 1                                    VecCount<i64>
column_8     = [total_amount; U16; IntCast]                  GetEncoded
sum_7[bitpacked_4] += column_8                               VecSum<u16, i64>

-- Stage 1 --
nonzero_indices_9 = nonzero_indices(count_6)                 NonzeroIndices<u32, i64>

-- Stage 2 --
sum_7        = sum_7[count_6 > 0]                            Compact<i64, u32>

-- Stage 3 --
count_6      = count_6[count_6 > 0]                          NonzeroCompact<u32>

-- Stage 4 --
decoded_10   = decode(count_6; IntCast)                      Decode<i64>

-- Stage 5 --
unpacked_11  = (nonzero_indices_9 >> $shift) & $mask         BitUnpackOperator
casted_12    = unpacked_11 as U8                             TypeConversionOperator<i64, u8>
decoded_13   = decode(casted_12; IntCast)                    Decode<i64>

-- Stage 6 --
unpacked_14  = (nonzero_indices_9 >> $shift) & $mask         BitUnpackOperator
casted_15    = unpacked_14 as U8                             TypeConversionOperator<i64, u8>
decoded_16   = decode(casted_15; StringDictionary)           Decode<&str>

The first thing that happens is that the passenger_count and store_and_fwd_flag columns are bit packed into a single 64bit integer value:

-- Stage 0 --
batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024
column_0     = [store_and_fwd_flag; U8; StringDictionary]    GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
Vec<I64>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
column_2     = [passenger_count; U8; IntCast]                GetEncoded
&U8[2, 3, 3, 5, 5, 5, 1, 1, 1, 1, 3, 3, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...] (990 more)
casted_3     = column_2 as I64                               TypeConversionOperator<u8, i64>
Vec<I64>[2, 3, 3, 5, 5, 5, 1, 1, 1, 1, 3, 3, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...] (990 more)
bitpacked_4  = casted_1 + (casted_3 << 2)                    ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>
Vec<I64>[8, 12, 12, 20, 20, 20, 4, 4, 4, 4, 12, 12, 12, 12, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, ...] (993 more)

This makes it very easy to compute the count expression by simply indexing into and incrementing a single array storing the counts:

count_6[bitpacked_4] += 1                                    VecCount<i64>
Vec<U32>[7, 0, 0, 0, 765, 4, 0, 0, 144, 2, 0, 0, 32, 0, 0, 0, 10, 0, 0, 0, 39, 0, 0, 0, 21, 0, 0, 0, 0, 0, 0, 0, ...] (7 more)

The summation proceeds similarly, and the entire process is repeated 1023 more times:

sum_7[bitpacked_4] += decoded_9                              VecSum<i64, i64>
Vec<I64>[14691, 0, 0, 0, 1571973, 6908, 0, 0, 289363, 1865, 0, 0, 72821, 0, 0, 0, 24194, 0, 0, 0, 83852, 0, 0, 0, ...] (15 more)

[1023 more iterations]

The NonzeroIndices operator collects all the indices from the array storing the counts which contain entries greater than zero. These indices correspond to all the unique combinations of values contained in the passenger_count and store_and_fwd_flagcolumns and will later be used to reconstruct them:

-- Stage 1 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
nonzero_indices_10 = nonzero_indices(count_6)                NonzeroIndices<u32, i64>
Vec<I64>[0, 1, 4, 5, 8, 9, 12, 13, 16, 17, 20, 21, 24, 25, 28, 32, 36]

The vectors which store the count and summation results have some empty entries that correspond to values of passenger_count and store_and_fwd_flag which we didn’t encounter in the data set and that have to be removed:

-- Stage 2 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
sum_7        = sum_7[count_6 > 0]                            Compact<i64, u32>
Vec<I64>[9760773, 47247, 1146792536, 3759295, 282781222, 944926, 79922601, 211259, 40777881, 144505, 79317284, ...] (6 more)

-- Stage 3 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
count_6      = count_6[count_6 > 0]                          NonzeroCompact<u32>
Vec<U32>[6114, 17, 718609, 2194, 168438, 464, 48943, 98, 24564, 77, 48937, 6, 30100, 1, 5, 7, 2]

We now reconstruct the passenger_count and store_and_fwd_flag column from the unique bitpacked values:

-- Stage 5 --
batch_size: 17, max_length: 17, column_length: 1048576, iters: 1
unpacked_12  = (nonzero_indices_10 >> 2) & f                 BitUnpackOperator
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]
casted_13    = unpacked_12 as U8                             TypeConversionOperator<i64, u8>
Vec<U8>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]
decoded_14   = decode(casted_13; IntCast)                    Decode<i64>
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]

-- Stage 6 --
batch_size: 17, max_length: 17, column_length: 1048576, iters: 1
unpacked_15  = (nonzero_indices_10 >> 0) & 3                 BitUnpackOperator
Vec<I64>[0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0]
casted_16    = unpacked_15 as U8                             TypeConversionOperator<i64, u8>
Vec<U8>[0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0]
decoded_17   = decode(casted_16; StringDictionary)           Decode<&str>
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N, N]

Phew! But wait, there’s more! We still need to merge all the sub results produced on different partitions. The approach I have taken here is a kind of merge sort which can be implemented very efficiently and allows for sequential memory access and low space overhead. One limitation at the moment is that these operators are not yet able to compute results incrementally. This is possible in principle though, and should give massive speedups on queries that have to aggregate large subresults (e.g. query 8 in the benchmark).

First, we set up the data sources for the colunms in the group by:

-- Stage 0 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_0       = ConstantVec                                   ConstantVec
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]

-- Stage 1 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_1       = ConstantVec                                   ConstantVec
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N, N]

-- Stage 2 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_2      = ConstantVec                                   ConstantVec
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 9]

-- Stage 3 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_3      = ConstantVec                                   ConstantVec
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N]

The next operation partitions the two passenger_count columns into groups of identical elements, and also stops processing after it has encountered the number of elements specified in the limit clause (10).

Note: Just before publishing the blogpost, I noticed that the way this is implemented is actually not quite right, can cause less results to be returned than specified in the limit and might unfairly reduce the amount of work LocustDB does in the benchmarks. Results shouldn’t be affected since other than query 8 (which is not subject to this problem), result sets grow to at most 20K elements (query 7) and merging is a tiny fraction of total runtime. Indeed, I reran the queries with sufficiently large limit clauses just to make sure and there was no difference.

-- Stage 4 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
partitioning_4 = partition(left_0, right_2)                  Partition<i64>
Vec<Premerge>[2|2, 2|2, 2|2]

E.g. the first 2 elements in the left column and the first 2 elements in the right column are 0 which corresponds to the first group, 2|2. If there is more than two columns, there are additional passes to further split these groups apart on the middle columns. In this case, there’s only one grouping column left which can now be directly merged using the partitioning information from the first column:

-- Stage 5 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
merged_5, merge_ops_6 = merge_deduplicate_partitioned(partitioning_4, left_1, right_3) MergeDeduplicatePartitioned<&str>
Vec<Str>[N, Y, N, Y, N, Y]
Vec<MergeOp>[TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, ...] (3 more)

In addition to merging the left and right column, this operation also produced merge_ops_6, a sequence of MergeOp which records how the columns were merged. The same operations can now be applied to merge the first group by column:

-- Stage 6 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
merged_7     = merge_drop(merge_ops_6, left_0, right_2)      MergeDrop<i64>
Vec<I64>[0, 0, 1, 1, 2, 2]

Similarly, it tells us how to merge all the columns storing aggregates:

-- Stage 7 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_8       = ConstantVec                                   ConstantVec
Vec<I64>[6114, 17, 718609, 2194, 168438, 464, 48943, 98, 24564, 77, 48937, 6, 30100, 1, 5, 7, 2]

-- Stage 8 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_9      = ConstantVec                                   ConstantVec
Vec<I64>[501, 20, 732966, 7677, 147288, 1328, 42974, 291, 20802, 168, 57076, 10, 37472, 1, 1, 1]

-- Stage 9 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
aggregated_10 = merge_aggregate(Count; merge_ops_6, left_8, right_9) MergeAggregate
Vec<I64>[6615, 37, 1451575, 9871, 315726, 1792]

-- Stage 10 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_11      = ConstantVec                                   ConstantVec
Vec<I64>[9760773, 47247, 1146792536, 3759295, 282781222, 944926, 79922601, 211259, 40777881, 144505, 79317284, ...] (6 more)

-- Stage 11 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_12     = ConstantVec                                   ConstantVec
Vec<I64>[735630, 25643, 1074988616, 11860249, 226242587, 2267676, 64022166, 530681, 31227466, 286711, 85150302, ...] (5 more)

-- Stage 12 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
aggregated_13 = merge_aggregate(Sum; merge_ops_6, left_11, right_12) MergeAggregate
Vec<I64>[10496403, 72890, 2221781152, 15619544, 509023809, 3212602]

And that’s it!

13 thoughts on “How to Analyze Billions of Records per Second on a Single Desktop PC

    1. clemenswinter Post author

      I did evaluate Druid for a specific analytics use case at work one time and found performance to be underwhelming (but it might work better for other workloads and I seem to remember that Druid also offers very strong consistency and durability guarantees).

      Inverted indices make for very efficient point lookups/sparse filtering on a single column but introduce indexing overhead and don’t benefit all queries. The idea behind systems like ClickHouse and LocustDB is to make brute force scans so fast that you don’t even need an index. This does sacrifice performance/efficiency on simpler queries. But if your queries are large and comparatively infrequent (as is the case for many analytics workloads) this approach is a very good fit and should generally give superior performance and cost efficiency.

      Reply
  1. Tim McNamara 🐍🦀 (@timClicks)

    Clemens – this is a really ambitious, exciting project. Well done for continuing your motivation after the initial hackfest!

    Do you have a gut feel for how many records that you would need before a system like this would be justified? In particular, how essential is the distributed component? It sounds like it scales down to a single laptop quite well.

    Reply
    1. clemenswinter Post author

      There are a number of design constraints to consider:
      1. How many queries are run per second
      2. How costly are queries (i.e. for how long do they run, which among other things depends on the number of records they touch)
      3. Latency requirements (i.e. you want queries to complete within n seconds)
      4. How many records are stored in the system

      If any of 1-3 are the bottleneck, you will need to add additional machines/CPUs to increase throughput and/or decrease latency. If a single machine can process all your queries sufficiently quickly, the limiting factor for the number of records is now the size of your RAM (or disk, but this is not currently supported by LocustDB and will lead to lower query speeds).

      So then the number of records you can store is (more or less) given by the formula memory_size * compression_ratio / size_per_record.

      Memory size is obvious.
      Compression ratio will depend on the actual data values (and also how good the compression algorithms are). Compression ratio is difficult to predict and depends very much on the values in the dataset, typical values might be between 3x and 20x.
      The size per record will depend on how many columns your table has and the size of each column.

      To give a concrete example, suppose you have 64GB of memory, you want to leave a spare 10GB of memory for running queries, you table has two dozen columns storing mostly integers and dictionary compressible strings with total record size of 200B uncompressed and 20B compressed. Then the number of records you could store would be roughly 54GB/200B = 2.7 billion.

      Of course the numbers for your use case may be very different, and you would have to actually load and benchmark (a portion) of your dataset to get accurate estimates for compression rates and query speeds.

      Reply
  2. david

    Reminds me a lot of the concepts from SanssouciDB described in detail by Hasso Plattner in his book “In-Memory Data Management”. I thought it might be interesting for you as well, since you haven’t mentioned SAP HANA as a database using the same concepts such as caching, compression, massive parallelism, etc.
    SanssouciDB was the research project which’s concepts eventually evolved into the dbms of the commercial HANA platform.

    Reply
    1. clemenswinter Post author

      Yes, SAP HANA seems to be quite successful in this space. I’d never heard of SanssouciDB or the book before, will have to check that out!

      Reply
  3. Pingback: How Read 100s of Millions of Records per Second from a Single Disk | Clemens' Blog

  4. Jason

    I’m aware this is an old post and feature sets / versions have changed but from what I can gather ClickHouse wasn’t run with LLVM enabled (compile=1; is disabled by default but is perfectly production ready). In my environment this generally improves performance by 15-50% depending on the query. Note that the first time a query runs there is a cost as it performs code-gen but that is not indicative of real world usage where the lib would pre-exist due to previous runs.

    Additionally newer versions of ClickHouse support dictionary encoding (LowCardinality). Would be interesting to see this test repeated with these new features as I suspect the results would be quite different now.

    Reply
  5. pindash91

    For KDB, casting the column to symbol, (`$) would allow you to do the dictionary aggregation. KDB’s symbol type is what everyone else calls an enum.

    Reply
  6. Phi

    Hej, Interesting stuff. Are you aware of the more obscure “APL” like systems like kdb+ or shakti (the newer and IMHO more interesting project by Arthur Whitney)?
    Column based, memory mapped files, in memory + on disk data abse + CEP engine.

    Reply
    1. clemenswinter Post author

      I actually have some comparisons to kdb+ in part 2 of this blogpost. This is the first time I’ve heard of shakti, hard to keep up with all the different database systems these days. There doesn’t seem to be much information about it yet but looks like like a neat project, will be interesting to see if it gains traction.

      Reply

Leave a Reply to clemenswinterCancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.