Site icon Clemens' Blog

How to Analyze Billions of Records per Second on a Single Desktop PC

Introduction

This article gives an overview of LocustDB [1], a new and extremely fast open-source analytics database built in Rust. Part 1 gives some background on analytical query systems and my goals for LocustDB. Part 2 presents benchmark results produced on a data set of 1.46 billion taxi rides and demonstrates median speedups of 3.6x over ClickHouse and 2.1x over tuned kdb+. Part 3 is an architecture deep dive that follows two SQL statements through the guts of the query engine. Part 4 concludes with random learnings, thoughts on Rust, and other incoherent ramblings.

Online analytical processing

Simply put, online analytical processing (OLAP) involves interactively running queries on often large data sets to gain new insights. To give a concrete example, you might be logging all requests to your website and store fields like the country from which the page was accessed, the accessed url, http status code returned by the website, time it took to service the request, and any number of parameters included in the request. Then one day you get reports of performance issues on your search page and run a query like following to identify specific requests that lead to slow responses:

SELECT params.search_term, latency
FROM requests
WHERE url = '/search'
AND latency > 10000
AND day = $TODAY
ORDER BY latency DESC

Or you may want to do produce a report on the performance of your website, broken down by various attributes such as url and country. Or someone is trying to break into your systems, and you want to construct some filter that allows you to identify malicious requests and check whether they succeeded. Or one of your pages is returning errors and you want to filter down to and inspect the parameters of those requests.

Typical OLAP queries have a number of distinguishing features that impose unique design constraints:

As a corollary, elaborate indexing datastructures are often ineffective. Since the number of possible ways to filter or group a dataset grows exponentially with the number of columns, we can only provide precomputed indices/aggregates for a tiny subset of possible queries. The approach taken by LocustDB is to forgo any attempts at complex indexing structures in favor of simple partitioning, performing brute-force scans over large parts of the data set, and then making those scans as fast as hardware allows. The main techniques used to achieve this are caching data in memory, massive parallelism, various optimizations around columnar data storage [2] and general techniques for high performance computing [3]. The trade-off is that simple queries will be relatively inefficient, which restricts throughput to maybe 1000s of queries per second. LocustDB is not the first system designed in this fashion. Notably Scuba [4], a proprietary system used at Facebook, and ClickHouse [5], an open source column-oriented DBMS developed at Yandex, have successfully taken the same approach. Scuba in particular adds two additional innovations which are not currently offered by any widely available systems.

Aggressive Compression

Compression is a key tool for speeding up queries reading from disk and for reducing cost. A 2015 paper by Facebook [6] claims compression rates of 10x and larger, achieved through automatically applying multiple compression algorithms tailored to different kinds of data. Add to this the complete absence of indexing overhead, and it results in a very cost effective solution. I suspect there exist many low QPS HBase/Cassandra/Elasticsearch/… clusters that could be replaced by a system like Scuba with minimal cost increase (or even significant savings) while providing higher query speeds, more expressive query capabilities and eliminating the need to manually design indices and table schemas.

Flexible Schemas

Most analytics systems have followed the SQL approach of requiring rigid table schemas that define the type and name of all columns upfront. While this makes a lot of sense for applications using a database for long-term storage of critical data, it is often less useful in an analytics setting where data may be short-lived and data integrity is not as crucial. I believe that foregoing rigid, predefined schemas unlocks a long tail of valuable use cases related to debugging, short lived experiments, prototyping and quick iteration for which setting up or migrating schemas is prohibitively cumbersome. A more subtle benefit is that supporting a flexible schema makes it much easier to support a multitude of compression schemes that may be difficult to retrofit if your code makes more rigid assumptions.

LocustDB

LocustDB originally started out during a Dropbox 2017 hackweek as a clone of Scuba. The prototype was open sourced and I have since continued working on it for fun and profit (minus the profit). So far most of my efforts have been focused on the query engine. It is still incomplete in terms of the breadth of supported query types and operators, but already extremely fast and fairly mature architecturally. Other components include a barely functional SQL(ish) parser, in-memory storage and simple compression schemes. There was support for basic persistence built on top of RocksDB at some point but most of this was killed during a refactor. Other functionality that you might consider important and that does not exist yet include the ability to replicate data and run queries across multiple machines, advanced compression algorithms, and not crashing randomly.

Part 2: Benchmarks

Lies, damned lies and benchmarks

This section evaluates LocustDB on benchmarks based on a series of blog posts by Mark Litwintschik.

Disclaimer

Of course this benchmark can only shine light onto a small slice of functionality and the systems I am comparing against are much more complete than LocustDB. I have used the dataset and queries featured in the benchmarks during development, and while I have restrained from adding any optimizations that are tailored to the dataset this naturally still introduced bias. Still, the benchmarks cover a fairly general class of important queries and I do believe the results paint a fair picture of LocustDB’s potential if not its current capabilities.

The Dataset

The data used in the benchmark is derived from a public dataset of 1.46 billion taxi rides in New York City. The raw dataset requires additional processing to turn it into a single denormalized table (see appendix). Stored as uncompressed CSVs, the dataset takes up 602GB of space and reduces to 114GB after applying gzip. Despite Mark’s detailed instructions for generating the denormalized dataset (which built on work by Todd Schneider), the process is fairly arduous and claimed several hours of my time. Downloading the raw data took multiple hours and the denormalization process ran for about a week. As a public service, I am making the resulting data set available for download in case anyone else wants to play with it.

The Hardware

All benchmarks were run on my desktop PC which contains a i7-5820K processor (Haswell E, 3.30-3.60GHz, 6 physical and 12 virtual cores, 15MB L3 cache) and 8x8GB of DDR4-2400 RAM.

The Competition

The benchmarks feature ClickHouse and kdb+. They were chosen since they are currently among the highest performing analytic systems that don’t require specialized hardware, and because Mark already did the hard work of setting up schemas and scripts that allow the taxi ride dataset to be loaded into the systems.

The Queries

The first four queries come from Mark’s benchmarks, the others were added by me to cover a wider range of query classes.

Simple Count

LocustDB

SELECT cab_type, count(0) FROM trips;

ClickHouse

SELECT 
    cab_type, 
    count(*)
FROM trips_mergetree 
GROUP BY cab_type

kdb+

select count 1b by cab_type from trips

LocustDB  does not have a group by clause and implicitly groups by all select expressions that are not aggregations (it’s a feature). As a side note, this is not an ideal query for benchmarking because cab_type has only two possible values and in almost all of the input files there is only one value which would make it very easy to store this column as a single value + count and compute this query instantly.

Average

LocustDB

SELECT passenger_count, count(0), sum(total_amount) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    avg(total_amount)
FROM trips_mergetree 
GROUP BY passenger_count

kdb+

select avg total_amount by passenger_count from trips

The original query computes the avg(total_amount) but LocustDB does not currently support averages, so I compute the sum and count instead which performs the same amount of work.

Count by 2 columns

LocustDB

SELECT passenger_count, to_year(pickup_datetime), count(0) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    toYear(pickup_date) AS year, 
    count(*)
FROM trips_mergetree 
GROUP BY 
    passenger_count, 
    year

kdb+

select count 1b by year, passenger_count from trips

Count the number of trips, broken down by year and passenger count. As it turns out, the to_year expression reduces to a constant in most partitions which allows LocustDB to completely eliminate it from the query plan. This is very neat! But it also comes dangerously close to gaming the benchmark which I promised I wouldn’t. For this reason the results also report runtimes for a “nerfed” version of LocustDB which prevents this particular optimization and forces the ​​​ to_year(pickup_datetime) expression to be evaluated in full.

Count by 3 columns

LocustDB

SELECT passenger_count, to_year(pickup_datetime), trip_distance / 1000, count(0) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    toYear(pickup_date) AS year, 
    round(trip_distance) AS distance, 
    count(*)
FROM trips_mergetree 
GROUP BY 
    passenger_count, 
    year, 
    distance

kdb+

select count 1b by year, passenger_count, floor trip_distance from trips

In the original data set trip_distance is a floating-point column, which LocustDB does not currently support. The column is multiplied by 1000 on ingestion and then stored as an integer. The original query rounds the trip distance, which in this case is roughly equivalent to dividing by 1000 (not fully because division truncates, but close enough for the purpose of this benchmark). The same caveat about the to_year(pickup_datetime) expression applies to this query as well.

Sparse filter

LocustDB

SELECT trip_id FROM trips WHERE (passenger_count = 5) AND (vendor_id = "CMT") AND (total_amount < 500) AND (store_and_fwd_flag = "1") LIMIT 100;

ClickHouse

SELECT trip_id
FROM trips_mergetree 
WHERE (passenger_count = 5) AND (vendor_id = 'CMT') AND (total_amount < 5) AND (store_and_fwd_flag = 1)

kdb+

select trip_id from trips where passenger_count = 5,vendor_id like "CMT",total_amount < 5,store_and_fwd_flag=1

This query matches very few records (<200), which means the entire dataset has to be scanned. LocustDB returns about half as many records on this query because it differentiates between multiple different values for store_and_forward which were all forced to 1 during the ingestion process for ClickHouse/kdb+, but this does not affect performance.

Top n

LocustDB

SELECT passenger_count, trip_distance, total_amount FROM trips ORDER BY total_amount DESC LIMIT 100;

ClickHouse

SELECT 
    passenger_count, 
    trip_distance, 
    total_amount
FROM trips_mergetree 
ORDER BY total_amount DESC
LIMIT 100

kdb+

cnt:count trips
minmax:select min total_amount, max total_amount from trips
tamin:first minmax`total_amount
tamax:first minmax`total_amount1
low:0
mid:cnt%10
hight:cnt
estimate:((tamax - tamin) % 10) + tamin
cnt2:count select total_amount from trips where total_amount > estimate
while[(cnt2<100)|(cnt2>100000);estimate:(((tamax - tamin)*mid) % cnt) + tamin;cnt2:count select total_amount from trips where total_amount > estimate;if[cnt2<100;high:mid;mid:mid - ((mid - low)*0.9)];if[cnt2>100000;low:mid;mid:mid + ((high - mid)%2)]]
result:select total_amount, passenger_count, trip_distance from trips where total_amount > estimate

This query selects the 100 records with the largest total_amount. I found it very difficult to efficiently express this query in Q (kdb+’s query language). The query/small program I ended up using was the result of kind assistance by members of the kdb+ users group. It looks quite horrifying (no doubt due to my poor grasp of Q), but I think it speaks for the expressiveness of Q that it is even possible to make this query work at all without inbuilt support.

Reducible high cardinality

LocustDB

SELECT pickup_puma, dropoff_puma, passenger_count, count(0) FROM trips;

ClickHouse

SELECT 
    pickup_puma, 
    dropoff_puma, 
    passenger_count, 
    count()
FROM trips_mergetree 
GROUP BY 
    pickup_puma, 
    dropoff_puma, 
    passenger_count

kdb+

select count 1b by pickup_puma, dropoff_puma, passenger_count from trips;

For all the other group by queries above, LocustDB is able to statically determine that the range of possible values in the group by is relatively small which makes it possible to compute aggregations by indexing directly into a fixed size array. This query has a much wider range of possible values in the group by and requires an additional hash map pass in about 87% of partitions to reduce the cardinality.

Irreducible high cardinality

LocustDB

SELECT trip_id / 5, sum(total_amount) FROM trips;

ClickHouse

SELECT 
    intDiv(trip_id, 5), 
    sum(total_amount)
FROM trips_mergetree 
GROUP BY intDiv(trip_id, 5)
LIMIT 100

kdb+

select sum total_amount by trip_id div 5 from trips

The values in the trip_id column are unique integers ranging from 1 to 1.46 billion, which makes this a pathologically difficult query that requires operating on large temporary results. The / 5 is added to make the query a bit easier, without it LocustDB tries to allocate at least 16B * 1.46 * 10^9 = 24GB of memory and crashes (ClickHouse and kdb+ are not exempt from this problem, but have the advantage of being able to evict unused data from memory and spill over to disk).

Dictionary Encoding

LocustDB

SELECT pickup_ntaname, count(0) FROM trips;

ClickHouse

SELECT 
    pickup_ntaname, 
    count()
FROM trips 
GROUP BY pickup_ntaname

kdb+

select count 1b by pickup_ntaname from trips

This is a completely unfair query meant to demonstrate the power of supporting flexible and automatic dictionary encoding. The pickup_ntaname column is stored as varchars by ClickHouse and kdb+, and as dictionary encoded single byte values by LocustDB. It is very common for string data to be amenable to dictionary encoding in this way, even and especially when it is impractical to enumerate all possible values ahead of time (urls, countries, languages, names of all kind, error codes, log lines, …).

Results & discussion

Following Mark’s methodology, results are reported as the best time out of 10 runs (actually 10±1 because counting is hard). Variance for all systems was relatively low after the first run, most of the time a small single digit percentage. Results marked with * are not directly comparable to other results. Results marked with † were reading from disk and were run less than 10 times.

#QueryLocustDB/s (nerfed/s)ClickHouse/skdb+ with index/s
1Simple Count0.250.890.84
2Average0.532.33.0*
3Count by 20.52* (3.4)3.70.74*
4Count by 33.0* (6.0*)5.46.4*
5Sparse Filter1.12.00.62*
6Top N0.232.55.3
7Reducible Cardinality4.25.07.6*
8 Irreducible Cardinality24140†9.5
9Dictionary Encoding0.20*111012†

* Not directly comparable to other results                  Query reads from/writes to disk.

One note about the results for kdb+: The ingestion scripts I used for kdb+ partition/index the data on the year and passenger_count columns. This may give it a somewhat unfair advantage over ClickHouse and LocustDB on all queries that group or filter on these columns (queries 2, 3, 4, 5 and 7). I was going to figure out how to remove that partitioning and report those results as well, but didn’t manage before my self-imposed deadline.

On all but one of the results that allow for direct comparison, LocustDB comes out ahead of both ClickHouse and kdb+. I am unsure how exactly I am able to get these speedups. What LocustDB already does very well is generating close-to-optimal query plans that contain very few superfluous operations and are specialized to work directly on column data without any type conversions. E.g. 95% of runtime in query 1 is spent on just 7 machine code instructions that are part of this loop. So far I have not made any attempts at improving the operators used by the query engine through use of SIMD, loop unrolling, elimination of bounds checks, not using Vec::push everywhere, or other micro-optimizations. I think there are still significant gains to be made in that fashion before hitting fundamental limitations of the hardware.

The main outliers are queries 3 and 4. These queries are dominated by converting the pickup_date column from UNIX timestamp converting to a calendar year. This is a relatively expensive operation and accounts for 64% and 36% of runtime in queries 3 and 4 respectively. The year is precomputed during the ingestion for kdb+. Query 4 is further burdened by having to compute an integer division (48% for the 3.0s result, 27% for the 6.0s result) as opposed to floating-point rounding. According to Agner Fogidiv r32 has latency of 22-29 and reciprocal throughput of 8-11 on Haswell. I don’t know exactly which instructions ClickHouse/kdb+ use for rounding but they are bound to be significantly faster.

Query 6 receives it’s lightning speed from this neat little loop which I might elaborate on in a future blog post. It’s still subject to a pathological case for approximately sorted datasets, e.g. select trip_id from trips order by trip_id desc limit 100 takes over 4 seconds (ClickHouse: 1.2s). This is actually quite likely to occur in practice e.g. when selecting the 100 most recent events from a dataset. I have some ideas on how to fix this edge case but they didn’t fit in the margin and are left as an exercise for the reader.

On query 8, ClickHouse was allocating a lot of memory which caused parts of the trip_id column to be evicted from memory before the end of the query, after which it had to be read from disk again. I ran the query a total of 7 times, and 3 times they hit the memory limit and aborted. LocustDB still has inefficiencies for merging large result sets and struggled on query 8 as well. Execution time varied widely between 24s and 60s, and memory had to be written to swap on occasion. So at least for now, kdb+ takes the crown with an impressive and consistent 9.4s.

There’s not much to be said about query 9 other than if you are going to build an analytics database in 2018 you should strongly consider support for dictionary encoding. kdb+ seemed unable to keep the pickup_ntaname column cached in memory and I only collected results from a single run because ain’t nobody got time for that.

One curiosity about query 9 is that it is quite a bit faster than query 1 (0.20s vs. 0.25s) despite executing an identical query plan. This confused me for the longest time until I realized that consecutive values in the cab_type column are almost always identical. This introduces a data dependency between subsequent loop iterations that read from/write to memory locations storing the current count for each cab type. My hypothesis is that these data dependencies cause pipeline stalls, but I don’t know this for certain nor the precise mechanism by which it occurs.

Memory bandwidth

I have encountered multiple claims that in-memory analytics databases are often constrained by memory bandwidth, and I myself held that misconception for longer than I want to admit. While developing LocustDB, I have found it quite difficult to reach fundamental memory bandwidth limitations. Using query 1 as an example, memory bandwidth can be calculated as 1.46 x 10^9B / 0.25s = 5.4GiB/s. This is far below peak memory bandwidth on my system, which is 68GB/s according to Intel spec sheets and 50GiB/s according to pmbw. Query 2 reaches speeds of 13GiB/s. Even if passenger_count was a 64bit integer, total_amount a 64bit float, and query runtime unchanged, bandwidth for query 2 would still only come to 41GiB/s. Fast queries reading from a lot of wide columns could get there, but e.g. query 5 is still only at 6.2GiB/s. The more complex queries have significantly lower read speeds and get nowhere close to the limit. As further evidence, the figures below show the runtime and read speed of select passenger_count, count(0) from trips as the width of passenger_count column is varied from 1byte to 8bytes:

Runtime is largely unaffected by column width, and only drops slightly for 8byte when read speed maxes out at 43.5GiB/s. SIMD and other optimizations could push queries closer to the limit, as might running on systems with multiple CPUs and/or a large number of cores. On the other hand, adding compression to reduce data volume at the cost of additional cycles would tilt the balance decisively into the opposite direction. A possible exception are queries that produce large temporary results that exceed the cache size (e.g. query 9), but there are still many optimizations to be applied before that judgement can be made.

Scalability

This obligatory graph shows how LocustDB’s performance scales with number of threads. The results were collected at 106acb as the geometric mean of LocustDB’s inbuilt benchmark suite run at various thread counts. The suite contains a similar mix of queries as used in above benchmarks, and runs on 100M records of the same taxi trip dataset. Before collecting these results I disabled turbo boost in BIOS to make the numbers look better.

Pretty picture

Hard numbers

TheadsGeometric Mean Runtime/msNormalized Performance
14801
22471.95
31672.88
41263.80
51024.70
6905.34
7895.40
8865.60
9786.16
10736.59
11716.80
12687.09

LocustDB scales well enough up the number of physical cores and then exposes hyperthreading as a lie. (I kid, getting an additional 33% of performance is nothing to sneer at.)

Learn all about how LocustDB works in Part 3.

Appendix

Data export

Some of the datatypes/column names in the original dataset changed and I had to make a few small adjustments to Mark’s data export script: taxi_data_export.sql

Data import

To allow LocustDB to load the dataset into memory, I dropped about 70% of columns/data volume. LocustDB did not support floating point numbers, and I imported floating point columns by multiplying by some fixed amount (e.g. 100, 1000) and then truncating and storing as integers. The Ingestion process was defined by this function. The data load took a bit less than 20min, all credit goes to the flate2 crate/miniz and the csv crate.

The import process for ClickHouse and kdb+ is mostly identical to Mark’s original scripts. I had to make some adjustments for changed column datatypes and different hardware setup.

I used these scripts to import the dataset into ClickHouse: trans.py, create_table, import.sh, import_mergetree

I used these scripts to import the dataset into kdb+: load.q, loads.sh

Methodology

All benchmarks were run under Ubuntu-16.04. I ran sudo cpupower frequency-set -g performance to set frequency scaling to high performance mode which reduces query runtimes by maybe 30ms. I killed any background programs that dared consume more than 1% of CPU.

The version of LocustDB used for the benchmarks was 9a3ac4 and was run with the command RUSTFLAGS="-Ccodegen-units=1" CARGO_INCREMENTAL=0 cargo run --release --bin repl under rustc 1.28.0-nightly (2a0062974 2018-06-09).

The version of ClickHouse used for the benchmarks was 1.1.54388. The client driver was invoked with --max_threads=12 and --max_memory_usage=60000000000 to allow all cores to be utilized, and allow for sufficient memory for all queries. All other options were left at the default settings.

The version of kdb+ used for the benchmarks was 3.6 2018.06.14, 64bit edition. The binary was invoked with -s 12 to allow all cores to be utilized. All other options were left at the default settings.

Raw results

When I say raw results, I mean a combined total of two hundred thousand lines of terminal scrollback that includes query result tables, typos, evidence of my poor counting skills and probably other embarrassments. Opening the links below might lock up your browser.

ClickHouse: clickhouse-results.txt

LocustDB: locustdb-results.txt

LocustDB (nerfed): locustdb-nerfed-results.txt

kdb+: I forgot to copy the scrollback before closing the terminal and didn’t want to rerun everything. Sue me.

Everything you never wanted to know about LocustDB

This section provides a detailed overview of how LocustDB executes queries by following two queries through all stages of query execution.

High level architecture

In-memory storage

All tables are stored in a hash map that maps table names to the table data structures. Each table is subdivided into a list of immutable partitions that hold some subset of the records for the table. Each of these partitions consists of a list of columns. Each column stores the data associated with the column and some metadata like the name of the column, the encoding scheme and summary information about the stored values.

Query execution

When the system receives a query string, it is parsed into an abstract syntax tree (AST) and the referenced table is extracted. LocustDB then creates a snapshot of that table by cloning the list of partitions that make up the table. This does not actually involve copying any of the table data, it just clones the reference counted pointers to the (immutable) partitions. The table snapshot, AST and some other data are all packaged up in a QueryTask object and added to a queue of pending tasks. On start up, LocustDB will spawn one worker thread for each core, all of which are notified when a new task is put onto the queue. The query task is picked up by all idle worker threads, which can process individual partitions in parallel. Sub results from each partition are combined by the worker thread that produced them, and a final pass by a single thread combines the subresults of each worker thread after all partitions have been processed.

Select + filter

After this high level overview, let’s dive into the details of how queries are actually executed by following an example query through all stages of execution. All of the example output shown in this section was generated by running commands in the LocustDB REPL. If you want to follow along, or play around with your own data set or different queries, you can do so by following these instructions. (Fair warning: you will have to compile from source but Rust’s excellent build system/package manager Cargo makes that painless). Anyway, this is the first query we will dissect:

locustdb> SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

Scanned 2097152 rows in 14ms (0.14B rows/s)!

pickup_ntaname
------------------------------
"Lenox Hill-Roosevelt Island"
"Airport"
"Midtown-Midtown South"
"Central Harlem South"
"Midtown-Midtown South"

Pretty straightforward, we just select five nonempty entries from the pickup_ntaname column. As you will see, there’s still a lot going behind the scenes! The first thing that needs to happen is converting the query string into abstract syntax trees. My parser is very unremarkable and all I will say about it is that if you format your queries just right and all the stars align it will produce something like this:

locustdb> :ast SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;
Query {
    select: [
        ColName(
            "pickup_ntaname"
        )
    ],
    table: "trips",
    filter: Func2(
        NotEquals,
        ColName(
            "pickup_ntaname"
        ),
        Const(
            Str(
                ""
            )
        )
    ),
    aggregate: [],
    order_by: None,
    order_desc: false,
    limit: LimitClause {
        limit: 5,
        offset: 0
    },
    order_by_index: None
}

The next step is to turn these ASTs into a sequence of vector operators that can be executed by the query engine. Since each partition may use a different encoding for the same column, the query plan can actually be different for each partition. In this case, we only need to touch a single partition to find our five records so we only have a single query plan, which can be inspected with the :explain command:

locustdb> :explain SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

Query plan in 1 batches
-- Stage 0 (streaming) --
column_0     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
constant_1   = ""                                            Constant
encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt>
column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
filtered_5   = column_4[equals_3]                            Filter<u8>
decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>

Each line in the output represents a single operator. The pseudo-code expression on the left attempts to summarize what the operator does, borrowing from Matlabs indexing notation. The identifier on the right corresponds to the name of the Rust structure that implements the operator (you can find them all in this folder). To make it clearer what’s going on, let’s go through each operation individually. The following snippets by the :show command print not only the query plan, but also the output generated by each operator on the first mini-batch of the first partition. (Note: the columns in the actual dataset are very uniform and I manually edited the outputs for clarity).

The first line of the output marks the beginning of stage 0. The query engine automatically groups all operators that can be executed concurrently into stages. In this query, all operators can be grouped under a single stage.

-- Stage 0 --

The next line gives information about the number of elements in the partition (1048576) and the length of the temporary buffers that are allocated by the operators. In this case, we will process the data set in chunks of size 1024 for a total of 1024 iterations. This is a very important optimization that allows all temporary results to remain inside L1 cache, and speeds up typical queries by a small integer multiple.

batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024

Below is the first actual operator, which writes a reference to the column data to the shared scratchpad. One detail to note is that pickup_ntaname is stored as dictionary encoded U8 (byte) values.

column_0 = [pickup_ntaname; U8; StringDictionary]             GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more) 

This next operator simply assigns the string constant “” from the original query string to variable constant_1.

constant_1   = ""                                            Constant
Scalar("")

The next operator finally does something interesting. It encodes above constant using the string dictionary, which works by looking up at what index the "" string is stored at, in this case index 0. The query engine does this so that the pickup_ntaname <> "" expression can be evaluated by comparing this encoded integer value with the raw column data for pickup_ntaname. No string comparisons required!

encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
Scalar(0)

The actual comparison operation. This gives us a vector of bytes with values 0/1 that tells us which records we want to select.

equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt<u8>>
Vec<U8>[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)

The Filter operation jointly scans the vector we just produced together with the raw column to select matching records:

column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more) 
filtered_5   = column_4[equals_3]                            Filter<u8>
Vec<U8>[96, 1]

Finally, the selected records are turned back into strings by looking up the corresponding entries in the dictionary:

decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>
Vec<Str>[Lenox Hill-Roosevelt Island, Airport]

And then we do all of this again 1023 more times:

[1023 more iterations]

Below you can see the entire output together:

locustdb> :show SELECT pickup_ntaname FROM trips WHERE pickup_ntaname <> "" LIMIT 5;

-- Stage 0 --
batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024
column_0     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
constant_1   = ""                                            Constant
Scalar("")
encoded_2    = encode(constant_1; StringDictionary)          EncodeStrConstant
Scalar(0)
equals_3     = column_0 <> encoded_2                         VecConstBoolOperator<u8, i64, NotEqualsInt>
Vec<U8>[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
column_4     = [pickup_ntaname; U8; StringDictionary]        GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 96, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
filtered_5   = column_4[equals_3]                            Filter<u8>
Vec<U8>[96, 1]
decoded_6    = decode(filtered_5; StringDictionary)          Decode<&str>
Vec<Str>[Lenox Hill-Roosevelt Island, Airport]

[1023 more iterations]

Scanned 3145728 rows in 15ms (0.20B rows/s)!

pickup_ntaname
------------------------------
"Lenox Hill-Roosevelt Island"
"Airport"
"Midtown-Midtown South"
"Central Harlem South"
"Midtown-Midtown South"

Group by and aggregation

Still bearing with me? Then let’s look at a more complex query that performs grouping and also aggregates subresults from different partitions:

locustdb> SELECT passenger_count, store_and_fwd_flag, count(0), sum(total_amount) FROM trips LIMIT 10;

Scanned 1464785771 rows in 1330ms (1.10B rows/s)!

passenger_count | store_and_fwd_flag | count_0   | sum_1
----------------+--------------------+-----------+--------------
0               | ""                 | 10712     | 23670371
0               | "0"                | 239       | 395984
0               | "N"                | 3988526   | 4351189801
0               | "Y"                | 107966    | 134586050
1               | ""                 | 319360760 | 420920786930
1               | "*"                | 1         | 470
1               | "0"                | 76996375  | 82565436325
1               | "1"                | 1363418   | 1512292719
1               | "N"                | 622044598 | 907071715859
1               | "Y"                | 8422445   | 13233585913

This time, the query plan consists of 19 operators grouped into multiple stages. The query planner will make stages as large as possible, but some operators cannot produce or operate on partial results and have to be separated out into their own stage. For example, the summation operator has to run for the entire partition before its output can be used, otherwise some of the entries might still be incomplete. Other operators, like sorting, can’t be computed incrementally and need all of their inputs in full before they are run.

There are actually 4 slightly different query plans this time due to different encodings for the total_amount column. Here, I am just showing the most common query plan, which is constructed for 988 out of 1465 partitions:

Query plan in 988 batches
-- Stage 0 (streaming) --
column_0     = [store_and_fwd_flag; U8; StringDictionary]    GetEncoded
casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
column_2     = [passenger_count; U8; IntCast]                GetEncoded
casted_3     = column_2 as I64                               TypeConversionOperator<u8, i64>
bitpacked_4  = casted_1 + (casted_3 << $shift)               ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>
constant_5   = Constant                                      Constant
count_6[bitpacked_4] += 1                                    VecCount<i64>
column_8     = [total_amount; U16; IntCast]                  GetEncoded
sum_7[bitpacked_4] += column_8                               VecSum<u16, i64>

-- Stage 1 --
nonzero_indices_9 = nonzero_indices(count_6)                 NonzeroIndices<u32, i64>

-- Stage 2 --
sum_7        = sum_7[count_6 > 0]                            Compact<i64, u32>

-- Stage 3 --
count_6      = count_6[count_6 > 0]                          NonzeroCompact<u32>

-- Stage 4 --
decoded_10   = decode(count_6; IntCast)                      Decode<i64>

-- Stage 5 --
unpacked_11  = (nonzero_indices_9 >> $shift) & $mask         BitUnpackOperator
casted_12    = unpacked_11 as U8                             TypeConversionOperator<i64, u8>
decoded_13   = decode(casted_12; IntCast)                    Decode<i64>

-- Stage 6 --
unpacked_14  = (nonzero_indices_9 >> $shift) & $mask         BitUnpackOperator
casted_15    = unpacked_14 as U8                             TypeConversionOperator<i64, u8>
decoded_16   = decode(casted_15; StringDictionary)           Decode<&str>

The first thing that happens is that the passenger_count and store_and_fwd_flag columns are bit packed into a single 64bit integer value:

-- Stage 0 --
batch_size: 1024, max_length: 1048576, column_length: 1048576, iters: 1024
column_0     = [store_and_fwd_flag; U8; StringDictionary]    GetEncoded
&U8[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
Vec<I64>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] (990 more)
column_2     = [passenger_count; U8; IntCast]                GetEncoded
&U8[2, 3, 3, 5, 5, 5, 1, 1, 1, 1, 3, 3, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...] (990 more)
casted_3     = column_2 as I64                               TypeConversionOperator<u8, i64>
Vec<I64>[2, 3, 3, 5, 5, 5, 1, 1, 1, 1, 3, 3, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...] (990 more)
bitpacked_4  = casted_1 + (casted_3 << 2)                    ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>
Vec<I64>[8, 12, 12, 20, 20, 20, 4, 4, 4, 4, 12, 12, 12, 12, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, ...] (993 more)

This makes it very easy to compute the count expression by simply indexing into and incrementing a single array storing the counts:

count_6[bitpacked_4] += 1                                    VecCount<i64>
Vec<U32>[7, 0, 0, 0, 765, 4, 0, 0, 144, 2, 0, 0, 32, 0, 0, 0, 10, 0, 0, 0, 39, 0, 0, 0, 21, 0, 0, 0, 0, 0, 0, 0, ...] (7 more)

The summation proceeds similarly, and the entire process is repeated 1023 more times:

sum_7[bitpacked_4] += decoded_9                              VecSum<i64, i64>
Vec<I64>[14691, 0, 0, 0, 1571973, 6908, 0, 0, 289363, 1865, 0, 0, 72821, 0, 0, 0, 24194, 0, 0, 0, 83852, 0, 0, 0, ...] (15 more)

[1023 more iterations]

The NonzeroIndices operator collects all the indices from the array storing the counts which contain entries greater than zero. These indices correspond to all the unique combinations of values contained in the passenger_count and store_and_fwd_flagcolumns and will later be used to reconstruct them:

-- Stage 1 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
nonzero_indices_10 = nonzero_indices(count_6)                NonzeroIndices<u32, i64>
Vec<I64>[0, 1, 4, 5, 8, 9, 12, 13, 16, 17, 20, 21, 24, 25, 28, 32, 36]

The vectors which store the count and summation results have some empty entries that correspond to values of passenger_count and store_and_fwd_flag which we didn’t encounter in the data set and that have to be removed:

-- Stage 2 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
sum_7        = sum_7[count_6 > 0]                            Compact<i64, u32>
Vec<I64>[9760773, 47247, 1146792536, 3759295, 282781222, 944926, 79922601, 211259, 40777881, 144505, 79317284, ...] (6 more)

-- Stage 3 --
batch_size: 39, max_length: 39, column_length: 1048576, iters: 1
count_6      = count_6[count_6 > 0]                          NonzeroCompact<u32>
Vec<U32>[6114, 17, 718609, 2194, 168438, 464, 48943, 98, 24564, 77, 48937, 6, 30100, 1, 5, 7, 2]

We now reconstruct the passenger_count and store_and_fwd_flag column from the unique bitpacked values:

-- Stage 5 --
batch_size: 17, max_length: 17, column_length: 1048576, iters: 1
unpacked_12  = (nonzero_indices_10 >> 2) & f                 BitUnpackOperator
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]
casted_13    = unpacked_12 as U8                             TypeConversionOperator<i64, u8>
Vec<U8>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]
decoded_14   = decode(casted_13; IntCast)                    Decode<i64>
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]

-- Stage 6 --
batch_size: 17, max_length: 17, column_length: 1048576, iters: 1
unpacked_15  = (nonzero_indices_10 >> 0) & 3                 BitUnpackOperator
Vec<I64>[0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0]
casted_16    = unpacked_15 as U8                             TypeConversionOperator<i64, u8>
Vec<U8>[0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0]
decoded_17   = decode(casted_16; StringDictionary)           Decode<&str>
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N, N]

Phew! But wait, there’s more! We still need to merge all the sub results produced on different partitions. The approach I have taken here is a kind of merge sort which can be implemented very efficiently and allows for sequential memory access and low space overhead. One limitation at the moment is that these operators are not yet able to compute results incrementally. This is possible in principle though, and should give massive speedups on queries that have to aggregate large subresults (e.g. query 8 in the benchmark).

First, we set up the data sources for the colunms in the group by:

-- Stage 0 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_0       = ConstantVec                                   ConstantVec
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 9]

-- Stage 1 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_1       = ConstantVec                                   ConstantVec
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N, N]

-- Stage 2 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_2      = ConstantVec                                   ConstantVec
Vec<I64>[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 9]

-- Stage 3 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_3      = ConstantVec                                   ConstantVec
Vec<Str>[N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, Y, N, N]

The next operation partitions the two passenger_count columns into groups of identical elements, and also stops processing after it has encountered the number of elements specified in the limit clause (10).

Note: Just before publishing the blogpost, I noticed that the way this is implemented is actually not quite right, can cause less results to be returned than specified in the limit and might unfairly reduce the amount of work LocustDB does in the benchmarks. Results shouldn’t be affected since other than query 8 (which is not subject to this problem), result sets grow to at most 20K elements (query 7) and merging is a tiny fraction of total runtime. Indeed, I reran the queries with sufficiently large limit clauses just to make sure and there was no difference.

-- Stage 4 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
partitioning_4 = partition(left_0, right_2)                  Partition<i64>
Vec<Premerge>[2|2, 2|2, 2|2]

E.g. the first 2 elements in the left column and the first 2 elements in the right column are 0 which corresponds to the first group, 2|2. If there is more than two columns, there are additional passes to further split these groups apart on the middle columns. In this case, there’s only one grouping column left which can now be directly merged using the partitioning information from the first column:

-- Stage 5 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
merged_5, merge_ops_6 = merge_deduplicate_partitioned(partitioning_4, left_1, right_3) MergeDeduplicatePartitioned<&str>
Vec<Str>[N, Y, N, Y, N, Y]
Vec<MergeOp>[TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, MergeRight, TakeLeft, ...] (3 more)

In addition to merging the left and right column, this operation also produced merge_ops_6, a sequence of MergeOp which records how the columns were merged. The same operations can now be applied to merge the first group by column:

-- Stage 6 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
merged_7     = merge_drop(merge_ops_6, left_0, right_2)      MergeDrop<i64>
Vec<I64>[0, 0, 1, 1, 2, 2]

Similarly, it tells us how to merge all the columns storing aggregates:

-- Stage 7 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_8       = ConstantVec                                   ConstantVec
Vec<I64>[6114, 17, 718609, 2194, 168438, 464, 48943, 98, 24564, 77, 48937, 6, 30100, 1, 5, 7, 2]

-- Stage 8 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_9      = ConstantVec                                   ConstantVec
Vec<I64>[501, 20, 732966, 7677, 147288, 1328, 42974, 291, 20802, 168, 57076, 10, 37472, 1, 1, 1]

-- Stage 9 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
aggregated_10 = merge_aggregate(Count; merge_ops_6, left_8, right_9) MergeAggregate
Vec<I64>[6615, 37, 1451575, 9871, 315726, 1792]

-- Stage 10 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
left_11      = ConstantVec                                   ConstantVec
Vec<I64>[9760773, 47247, 1146792536, 3759295, 282781222, 944926, 79922601, 211259, 40777881, 144505, 79317284, ...] (6 more)

-- Stage 11 --
batch_size: 1, max_length: 1, column_length: 1, iters: 1
right_12     = ConstantVec                                   ConstantVec
Vec<I64>[735630, 25643, 1074988616, 11860249, 226242587, 2267676, 64022166, 530681, 31227466, 286711, 85150302, ...] (5 more)

-- Stage 12 --
batch_size: 17, max_length: 17, column_length: 1, iters: 1
aggregated_13 = merge_aggregate(Sum; merge_ops_6, left_11, right_12) MergeAggregate
Vec<I64>[10496403, 72890, 2221781152, 15619544, 509023809, 3212602]

And that’s it!

Rust and other random learnings

Finally, I will conclude with various random learnings I acquired while working on LocustDB and give some unsolicited thoughts on Rust. This is not meant to be a comprehensive evaluation of Rust, but rather specific examples where I found Rust to work particularly well (or poorly) on this project. This section is arranged to start off with general learnings and devolve into arcane areas of Rust towards the end. Lest anyone get the wrong impression, let me pre-face this section by saying that I have found Rust to be an incredibly well designed, fun and productive language to work with. Massive kudos to the team at Mozilla and the many external contributors.

WSL

I currently use Windows almost exclusively (long story). This was a bit of a problem when it came to generating the data set, which involves running a bunch of bash scripts and applications installed via apt-get (notably PostgreSQL) for multiple days. Lacking other options, I figured I would try to get it to work on the Windows Subsystem for Linux (WSL). Not knowing what WSL does and how, I didn’t really expect this to work. Much to my surprise, it did! There were a few minor roadblocks where functionality differed somewhat from a standard Ubuntu install, but they all had reasonable and easy to find workarounds. I still don’t know what WSL does and how, but I now expect it to work (my favorite kind of technology).

Profiling

I love profiling, but for most of the initial parts of this project it was singularly useless. By design, LocustDB has very little overhead and spends the majority of its time running a few simple operations with a predictable performance. So pretty much all profiling would tell me was “more than 90% of runtime is spent in these two simple loops”. Well, OK then, I guess I didn’t do anything really stupid. I did get some mileage out of it on some of the more complex queries that are comprised of dozens of operations where the relative costs are less obvious.

VTune

One instance where profiling was invaluable was when I was debugging an incredibly strange performance regression that was caused by varying the partition size (which in theory should have minimal performance impact, unless the partition size is made very small). After some experimentation I found some pathological partition sizes where performance would degrade by more than 2(!!) orders of magnitude! On these pathological cases CPU utilization would plummet, which was really weird because the query engine contains only a small number of very shortly-held locks. Instrumentation showed occasional massive slowdowns in seemingly arbitrary sections of the code. Profiling with CodeXL did not show anything other than a larger fraction of time spent in the long tail.

Out of ideas, I decided to give Intel’s VTune Amplifier profiler a shot, which immediately flagged low parallelism as a problem and pointed me to a spinlock in the RtlFreeHeap function of ntdll.dll that was blocking threads and consuming most of the runtime. Thanks, Microsoft, for reminding me to not produce pathological memory allocation patterns I guess? Best I could tell, what triggers this is making multiple fairly large (KB-MB) allocations in several threads, and then deallocating them soon afterwards.

This problem was mostly sidestepped by reducing the size of temporary allocations performed by the query engine, which is something I wanted to do anyway, but some (less optimized) queries are still affected by this. Compiling under Linux, where Rust will link jemalloc as the memory allocator by default, gives massive speedups on those queries. Unfortunately there doesn’t seem to be a way to compile with jemalloc under Windows at the moment but I think there is some work in that direction.

Benchmarks

Rust includes an (unstable) benchmarking framework that is integrated with the testing framework and makes it super easy to define and run (micro) benchmarks. The benchcmp tool makes it very convenient to compare outputs of different benchmark runs. One limitation I have run into as I started to use larger data sets and longer running benchmarks is that currently the benchmarking framework will run each benchmark case at least 350 times, but by and large it has been very useful and convenient. I’ve also heard good things about criterion but the built-in benchmarking has worked well enough for me so far.

The cake is a lie

On one of my routine benchmark runs after a code change that I wasn’t really expecting to affect performance at all, I saw consistent 20-30% performance regressions across the board. After much puzzlement, I determined that the regression was related to using a more recent version of the Rust compiler. Further digging revealed that compiler changes introducing incremental/parallel compilation prevent various cross-module optimizations, and that in addition to passing --release it is now necessary to set the RUSTFLAGS=​"-Ccodegen-units=1"  and CARGO_INCREMENTAL=0 environment variables to get optimal performance. Good thing I was running on an old version of Rust, otherwise I would never have known what I was missing out on. It would be great if there was a single flag that you could rely on to set all current (and future) options required for maximum performance. In related news, there is also RUSTFLAGS=-Ctarget-cpu=native which allows use of architecture specific instructions, but also hurts portability in a major way so it makes sense for it to be set explicitly and I didn’t actually see any speedups from it on LocustDB.

Tests

In a lot of programming languages, creating tests feels kind of tedious. You have to maintain a separate folder hierarchy, create a new file, add a bunch of imports and boilerplate (cough Java cough). Rust makes it very easy to define unit tests within the file they are testing which for me greatly reduces the friction of adding tests.

Macros

So far I’ve only written a few very simple macros myself, but made extensive use of the (excellent) nom parser combinator crate for implementing the query parser. Nom allows you to generate extremely efficient parsers at compile time, and relies heavily on macros. One unfortunate aspect of macros (at least currently in IntelliJ) is that much of the tooling ceases to work inside macro contexts (automatic reformatting, typeahead, jump to definition, …). This made me somewhat regret my choice of nom, especially since query parsing doesn’t actually have to be particularly fast.

Fearless concurrency

The parallelism in LocustDB is fairly coarse-grained so I didn’t do anything crazy here. The initial query engine was single threaded and converting it to run multithreaded required making sure that pretty much all the data structures used anywhere were thread safe. Amazingly, the Rust compiler is able to detect any thread unsafe structures automatically which made hunting down the various offenders trivial. In a different language, I could have achieved the same thing by meticulously checking all my data structures, and I probably would not have missed anything, but it is really nice I didn’t have to.

unsafe

The unsafe keyword in Rust provides an escape hatch which unlocks some additional capabilities at the cost of memory safety. All of LocustDB  contains essentially just three different uses of unsafe. The first is calls to str::from_utf8_unchecked which I used to reconstruct strings that are packed into contiguous byte arrays. Strings in Rust are guaranteed to be valid UTF8 (and it can cause unsafe/undefined behavior if they are not), so the safe version of this function has to spend some cycles to check that the byte slice it is given actually constitutes valid UTF8. The other two uses of unsafe are places where I was not able to make the Rust borrow checker work out for me, and had to resort to transmute lifetimes to the unbounded 'static lifetime (more on that later). So far, I have not really felt the need for unsafe with respect to performance optimizations.

Documentation

I have found the Rust documentation to be very solid overall. One difficulty is that on many objects a lot of the methods are implemented for a trait (and maybe generically or as an extension method). This can make specific methods difficult to hunt down. One feature that could be very useful here is searching methods by type signature. For example, you might know/suspect that there is a method that will test whether some predicate is true for all elements in a vector but have no idea what it’s called. Clearly though, one of the inputs to the method has to be Vec or Iterator and it has to return a bool which narrows candidates down to a manageable shortlist. Apparently this kind of search was actually possible at one point but for some reason I’ve never gotten it to work. I occasionally Ctrl-F for -> ReturnType inside the browser but it’s a poor substitute.

One neat feature of the Rust docs is that all modules/methods/structs/… have a link to their source code which makes it very easy to satisfy your curiosity about how something is implemented. It may also lead you to discover gems such as this.

GADTs

If you’ve never heard of generalized algebraic data types (GADTs), don’t feel bad because they are a fairly obscure construct relegated to languages such as Haskell and Scala. Basically what they allow you to do is add a generic type parameter to an enum type that can be instantiated to a different type for each variant of the enum. This can be useful for modelling expression trees that may evaluate to different types (as used in LocustDB’s query engine). If Rust had support for GADTs (it does not), I would without a doubt have spent much time integrating them with my query engine. As it turns out everything works just fine without. I did run into a few bugs that they could have prevented, but they were fairly obvious and easy to fix. So maybe Golang has a point after all (a part of my soul died writing this sentence).

Zero-cost abstractions

Rust’s generics provide a powerful way of abstracting over types. Generic parameters can be constrained to implement specific traits which makes it easy to abstract over e.g. integer types or any other class of values that can be modeled by a trait. The compiler will determine all the concrete instantiations of the type parameters at compile time and generate specialized functions/structs for each of them that are as efficient as the equivalent code not using generics. LocustDB makes extensive use of generics to be able to deal with many different data types in a uniform way without duplicating code or introducing inefficiencies.

As an example, the sum operator, which performs grouped summation for in queries like select passenger_count, sum(total_amount), takes two input vectors: the first contains the data to be summed, and the second is an index that designates a group for each value. Both are allowed to be any integer type which makes VecSum work for, at time of writing, 16 different combinations of types (caring about binary sizes is so 1991). Without this, our choices would be wasting precious cycles and cache lines by converting to a single common type, incurring virtual function call and/or boxing overhead, or resort to some dreadful hacks or codegen that may interact poorly with the type system and introduce build system complexity. The only other language feature I’m aware of that provides similar capabilities is C++ templates (if I your favorite programming language also supports this, good on you and I meant no offense).

In addition to types, Rust’s generics also make it possible to abstract over operators by adding a (phantom) type parameter that is constrained to a trait with one or more static functions. This makes things like the (slightly horrifying) VecConstBoolOperator possible which implements multiple different comparison operations that compute a boolean from two values which may have different types. This could be taken even further and make it possible to add support for many different operators and functions with minimal effort.

Lifetime woes

Many of Rust’s innovations and strong safety guarantees are made possible by its unique lifetime system that statically assigns lifetimes to all values and ensures they cannot be modified concurrently or accessed after they are dropped. The lifetime rules are enforced by the infamous borrow checker. A mistake that I made when I was still somewhat new to Rust was to add unnecessary lifetimes to core structs. These lifetimes are propagated by any functions or structs that use them, quickly infect large parts of your program and lead to unmaintainably complex types. I now know better and make liberal use of #[derive(Copy)]clone(), Rc, and Arc unless I absolutely know that I will need every last bit of performance.

For the most part, I’ve been able to eliminate all (non-local) lifetimes from LocustDB. But one still remains, has caused me endless pain, and is the antagonist of the remaining sections. I have easily spent dozens of hours on the various type gymnastics required to appease the borrow checker in this case (plus an additional 10 hours writing this rant. Yes, I do question my decision making sometimes…). On several occasions, the addition of a seemingly trivial feature suddenly resulted in multiple hours of figuring out how to make the lifetimes work out. By now, things are set up quite nicely and mostly just work so at least there is little ongoing cost. And of course it should also be mentioned that I have spent zero hours debugging heap corruption, segfaults and use after free bugs (excepting that one time when my hubris got the better of me).

Naturally LocustDB supports a string datatype. The simple solution of just cloning all string values used in the query engine would introduce unacceptable performance overhead and can explode memory usage. So operators that work with strings take them by reference (&str) which means they require a lifetime parameter corresponding to the lifetime of the object containing data that the string references. In some Rust programs it is possible to eliminate this type of lifetime by replacing the &str with an index into the array storing the string values but that’s not workable in this case. First, we still need to access the actual string data. Second, there isn’t a single backing store for strings. They might come from a Vec packing multiple strings into a contiguous area of memory, a String that was originally part of the query string or created on-the-fly by the query engine, or a Vec used by a dictionary codec. It must be possible to uniformly merge and operate on all these different string values originating from many different columns and &str is the only common representation that does not sacrifice performance.

‘self

When a is query is executed all the worker threads processing the query hold a reference to a shared QueryTask structure which stores reference counted pointers (Arc) to the partitions of the table that the query is operating on. Each worker thread produces a partial result from evaluating the query on some subset of partitions, which is written back to the QueryTask to be later merged with the results from the other threads. These sub results contain references to string data in the original table and so require a corresponding lifetime. Now what exactly should this lifetime be? It has to be longer than the lifetime of the individual worker threads, because they may exit before the query is fully evaluated. We do know that the string references will be valid for as long as the QueryTask object lives (as long as the field referencing the table is not mutated). What we would like to do is to tie the lifetime of the partial results to the lifetime of the containing structure. Unfortunately this is not currently possible and the semantics of adding support for this to the type system are actually quite tricky.

Currently the only solution is to use the unsafe mem::transmute function to cast the lifetime to 'static which is guaranteed to live for the duration of the program. So we are lying to the compiler, but this is safe as long as we keep those references private to the QueryTask struct and prevent them from being accessed by any code that runs after the QueryTask has been dropped. After the query completes fully and before the QueryTask object is dropped, the final result set is converted into owned values and returned without any associated lifetimes. This pattern is actually common enough that there exist crates that aim to provide a safe abstraction for it.

Codec

All columns storing data consist of two components. One is the compressed data making up the elements in the column. The second is an instance of Codec, which is a trait with a decode function that turns the compressed data back into values that the query engine knows how to operate on. Instances of Codec may themselves own some additional data as required for decoding (e.g. a dictionary of strings used for dictionary encoding). In retrospect this was a terrible design decision, even though keeping the data separate from Codec would create its own set of challenges. By the time I realized my mistake though it was difficult to undo and easier to just power through (or so I thought).

Initially, the Codecs for the various columns operated on by the query planner/engine were passed around as &'a Codec using the same lifetime already required for references to the column data. This worked well enough. At some point, I needed the query planner to be able to create new codecs on the fly. This caused me quite the headache. These codecs generated on-the-fly are allocated on the stack so it is not possible to create any (long lived) references to them. They are simple value types so copying them would be possible, but then that is not an option for any Codec owning large amounts of data. You might try to put everything inside an Arc, but that completely eliminates the lifetime 'a which is still required as the return type for the decode function which may yield strings that reference non-static data.

What finally worked was to promote the lifetime parameter to the Codec<'a> trait, require it to be clonable, and then have implementors of Codec that own data implement Codec not on the structure itself, but for references to that structure. (E.g. impl<'a> Codec<'a> for &'a DictionaryEncoding). Making all of this work out required atrocities like this, which utilizes the poorly “documented” <'a> for <'a> type expression. Gazing upon this code fills me with a sort of pride and also abject horror. (Okay, I exaggerate, but cultivating a morbid sense of humor is actually a crucial tool for remaining sane in the presence of the Borrow Checker). In all of this, the borrow checker did save me from multiple subtle use-after-free bugs that would have been difficult to avoid in other unmanaged languages.

Variance

Of course, most implementations of Codec won’t actually contain any references. E.g. we might have a very simple codec that simply subtracts a constant offset from an integer:

pub struct OffsetCodec {
    offset: i64,
}

When we implement Codec for this struct, we still need to pass on some lifetime to Codec. Well, this is easy, right? Since OffsetCodec contains no references and is valid for the entire duration of the program, we can just impl Codec<'static> for OffsetCodec. Wrong! Trait lifetime parameters in Rust are always invariant and you will now get borrow checker errors when passing OffsetCodec to functions taking a Codec<'a> and some other value with the same lifetime 'a, which is now suddenly forced to be 'static as well (but can’t be). What we can do is write impl<'a> Codec<'a> for OffsetCodec which allows the borrow checker to instantiate the lifetimes to be whatever it needs. This solution is very simple. Coming up with it when all I had to go on were far removed borrow checker errors was not.

Streaming iterators

Early on in the project, I tried for a little while to make some kind of reusable Codec trait work that would allow you to (at compile time) compose multiple compression algorithms into a single compression scheme without any runtime overhead. E.g. you might want a Codec for string data that will apply dictionary encoding to the string values, variable-width integer encoding to the indices into the dictionary, and compress the dictionary using LZ4. Or you might want to apply two passes of delta encoding and one pass of variable-width encoding, and still allow the compiler to fuse all of these passes into a single loop. There are various other complexities to be accommodated, but a simplified version of this kind of Codec trait might look like this:

pub struct Codec {
    type In;
    type Out;
    fn decode(in: In) -> Out;
}

Of course, concrete instances of In and Out might have a lifetime parameter. E.g a decoding algorithm that restores strings packed into a single vector of bytes might have In=&'a [u8] and Out=Vec<&'a str>. Crucially, we must ensure that 'a refers to the same lifetime in &'a [u8]  and Vec<&'a str>. This is currently not easily expressible in Rust, and equivalent to the problem faced by what is usually referred to as streaming iterators. There are some hacks techniques that can solve this, subject to various limitations, but I am insufficiently masochistic to give them a try. In any case, there is hope! My most anticipated RFC, the scary-sounding generic associated types, will allow us to write something like the following to express this kind of constraint:

pub struct Codec {
    type In<'a>;
    type Out<'a>;
    fn decode<'a>(in: In<'a>) -> Out<'a>;
}

The crown jewel

Do you remember how I casually mentioned that Rust made it easy for me to implement operators generically for many different types? Well, I lied. By now, it will come as no surprise to you that this capability was actually acquired at great personal cost and involved multiple blood sacrifices to the Borrow Checker.

Even simple queries are composed of multiple operators that are stitched together at run time by the query planner. This means we need a common type that allows us to pass the results from one operator as the input to another. Let’s make a trait for it and call it AnyVec<'a>. It essentially allows us to wrap any one of many different result types in a boxed trait that provides a lot of different methods that will return the original data. This looks something like this:

pub trait AnyVec<'a> {
    fn cast_ref_u8(&self) -> &[u8] { panic!("cast_ref_u8") }
    fn cast_ref_u16(&self) -> &[u16] { panic!("cast_ref_u16") }
    fn cast_ref_str<'b>(&'b self) -> &'b [&'a str] { panic!("cast_ref_str") }
    // ...
}

By default, these methods will simply panic (I know, I know, they should return a Result. Feel free to make a pull request). Each supported type implements the corresponding methods, e.g. for string data:

impl<'a> AnyVec<'a> for Vec<&'a str> {
    fn cast_ref_str<'b>(&'b self) -> &'b [&'a str] { self }
}

Ok, great! Now we can implement an overly complicated function like the following which computes an AND operation:

fn and(left: &AnyVec<'a>, right: &AnyVec<'a>) -> AnyVec<'a> {
    let left = left.cast_ref_u8();
    let right = right.cast_ref_u8();
    left.zip(right).map(|l, r| l & r).collect::<Vec<_>>()
}

But what if we want to implement a function that e.g. implements equality generically for all types? A first attempt might look like this:

fn equals(left: &AnyVec<'a>, right: &AnyVec<'a>) -> AnyVec<'a> {
    let left: &[T] = left.cast_ref_???();
    let right: &[T] = right.cast_ref_???();
    left.zip(right).map(|l, r| l == r).collect::<Vec<_>>()
}

This almost gets us there, but of course we can’t just call cast_ref_u8() to cast the inputs to byte values because we want this to work with any type. We need a way of calling the correct cast_ref_x method depending on what T is. With associated type constructors, we could define a GenericVec as follows:

pub trait GenericVec {
    type Item<'a>;
    fn unwrap<'a, 'b>(vec: &'b TypedVec<'a>) -> &'b [Item<'a>];
}

So now we can just add a trait bound equals<T>(left: &'a T, right: &'a T) where T: GenericVec + 'a, T::Item: Eq and then call T::unwrap(left) to cast the inputs into the correct types. Implementing GenericVec for specific types is once again trivial:

impl<'a> GenericVec for &'a str {
    type Item<'a> = &'a str;
    fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [Item<'a>] {
        vec.cast_ref_str()
    }
}

Alas, associated type constructors don’t exist yet. What do! Well, since you’re still reading you must be the kind of person that enjoys pain and doesn’t give up easily. So you will surely agree that the only sensible course of action is to bang our head against the type system until something gives. After some trial and error, you may stumble upon the following:

pub trait GenericVec {
    fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [T] where T: 'a;
}

Not even that horrible! We use a where clause to tie the lifetime in TypedVec<'a> to the elements T in the result. And this almost makes everything work! Implementing VecType for u8 proceeds without major issues:

impl GenericVec for u8 {
    fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [u8] where u8: 'a {
        vec.cast_ref_u8()
    }
}

The compiler demands that we add in a tautological where u8: 'a constraint to match the signature of the trait. That’s a bit irritating, for sure, but we can live with it. The real problem comes when we try to do the same for &str:

impl<'c> GenericVec<&'c str> for &'c str {
    fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [&'c str] where &'c str: 'a {
        vec.cast_ref_str()
    }
}
error[E0308]: mismatched types
  --> src/main.rs:14:81
   |
14 |     fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [&'c str] where &'c str: 'a { vec.cast_ref_str() }
   |                                                                                 ^^^^^^^^^^^^^^^^^^ lifetime mismatch
   |
   = note: expected type `&'b [&'c str]`
              found type `&'b [&'a str]`
note: the lifetime 'a as defined on the method body at 14:5...
  --> src/main.rs:14:5
   |
14 |     fn unwrap<'a, 'b>(vec: &'b AnyVec<'a>) -> &'b [&'c str] where &'c str: 'a { vec.cast_ref_str() }
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: ...does not necessarily outlive the lifetime 'c as defined on the impl at 13:1
  --> src/main.rs:13:1
   |
13 | impl<'c> GenericVec<&'c str> for &'c str {
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0308`.

Whaa! Come again? (Playground link so you can personally experience the horror). Already tasting victory, we brace for the final struggle. But this time, the Borrow Checker is unyielding. Pragmatism prevails, and we put in an unsafe call to mem::transmute and a salty comment. We can finally get back to our lives, scarred yet stronger, and ready to face the inescapable day when we will once again meet the Borrow Checker in battle.

LocustDB needs YOU

If you managed to make it all this way, you indubitably have what it takes to take LocustDB to the next level! There is still a lot of work to do, some of it easy, some of it difficult, all of it interesting. Quite frankly, I have gone about as far as I am able/willing to on my own and any further progress will be a team effort. If you are excited by the prospect of running LocustDB in production, or always wanted to build the world’s fastest analytics database, your dream is now just one click away.

Acknowledgements

Thanks to Akhil Ravidas for introducing me to Scuba, encouraging my efforts and reviewing a draft of this article. Thanks to York Winter for reviewing a draft of this article. Thanks to David Fisher for working with me on LocustDB during two amazing hackweeks. Thanks to Dropbox for holding hackweeks in the first place and allowing me to open-source the original prototype. Thanks to everyone who contributed to Rust, without which LocustDB would not exist. And thanks to the many teachers, family members, colleagues and friends that helped me develop the skills and confidence to take on ambitious projects.

References

[1] Clemens Winter, “cswinter/LocustDB,” GitHub, 2018, https://github.com/cswinter/LocustDB.
[2] Daniel Abadi, “The Design and Implementation of Modern Column-Oriented Database Systems,” Foundations and Trends® in Databases 5, no. 3 (2012): 197–280, https://doi.org/10.1561/1900000024.
[3] Ulrich Drepper, “What Every Programmer Should Know About Memory,” What Every Programmer Should Know About Memory, 2007, https://people.freebsd.org/~lstewart/articles/cpumemory.pdf.
[4] Lior Abraham et al., “Scuba: Diving into Data at Facebook,” Proceedings of the VLDB Endowment 6 (2013): 1057–1067.
[5] “ClickHouse DBMS,” ClickHouse — open source distributed column-oriented DBMS, 2009, https://clickhouse.yandex/.
[6] Nathan Bronson, Thomas Lento, and Janet L. Wiener, “Open Data Challenges at Facebook,” in 2015 IEEE 31st International Conference on Data Engineering (2015 IEEE 31st International Conference on Data Engineering (ICDE), IEEE, 2015), https://doi.org/10.1109/icde.2015.7113415.
Exit mobile version