How to Analyze Billions of Records per Second on a Single Desktop PC

Lies, damned lies and benchmarks

This section evaluates LocustDB on benchmarks based on a series of blog posts by Mark Litwintschik.

Disclaimer

Of course this benchmark can only shine light onto a small slice of functionality and the systems I am comparing against are much more complete than LocustDB. I have used the dataset and queries featured in the benchmarks during development, and while I have restrained from adding any optimizations that are tailored to the dataset this naturally still introduced bias. Still, the benchmarks cover a fairly general class of important queries and I do believe the results paint a fair picture of LocustDB’s potential if not its current capabilities.

The Dataset

The data used in the benchmark is derived from a public dataset of 1.46 billion taxi rides in New York City. The raw dataset requires additional processing to turn it into a single denormalized table (see appendix). Stored as uncompressed CSVs, the dataset takes up 602GB of space and reduces to 114GB after applying gzip. Despite Mark’s detailed instructions for generating the denormalized dataset (which built on work by Todd Schneider), the process is fairly arduous and claimed several hours of my time. Downloading the raw data took multiple hours and the denormalization process ran for about a week. As a public service, I am making the resulting data set available for download in case anyone else wants to play with it.

The Hardware

All benchmarks were run on my desktop PC which contains a i7-5820K processor (Haswell E, 3.30-3.60GHz, 6 physical and 12 virtual cores, 15MB L3 cache) and 8x8GB of DDR4-2400 RAM.

The Competition

The benchmarks feature ClickHouse and kdb+. They were chosen since they are currently among the highest performing analytic systems that don’t require specialized hardware, and because Mark already did the hard work of setting up schemas and scripts that allow the taxi ride dataset to be loaded into the systems.

The Queries

The first four queries come from Mark’s benchmarks, the others were added by me to cover a wider range of query classes.

Simple Count

LocustDB

SELECT cab_type, count(0) FROM trips;

ClickHouse

SELECT 
    cab_type, 
    count(*)
FROM trips_mergetree 
GROUP BY cab_type

kdb+

select count 1b by cab_type from trips

LocustDB  does not have a group by clause and implicitly groups by all select expressions that are not aggregations (it’s a feature). As a side note, this is not an ideal query for benchmarking because cab_type has only two possible values and in almost all of the input files there is only one value which would make it very easy to store this column as a single value + count and compute this query instantly.

Average

LocustDB

SELECT passenger_count, count(0), sum(total_amount) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    avg(total_amount)
FROM trips_mergetree 
GROUP BY passenger_count

kdb+

select avg total_amount by passenger_count from trips

The original query computes the avg(total_amount) but LocustDB does not currently support averages, so I compute the sum and count instead which performs the same amount of work.

Count by 2 columns

LocustDB

SELECT passenger_count, to_year(pickup_datetime), count(0) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    toYear(pickup_date) AS year, 
    count(*)
FROM trips_mergetree 
GROUP BY 
    passenger_count, 
    year

kdb+

select count 1b by year, passenger_count from trips

Count the number of trips, broken down by year and passenger count. As it turns out, the to_year expression reduces to a constant in most partitions which allows LocustDB to completely eliminate it from the query plan. This is very neat! But it also comes dangerously close to gaming the benchmark which I promised I wouldn’t. For this reason the results also report runtimes for a “nerfed” version of LocustDB which prevents this particular optimization and forces the ​​​ to_year(pickup_datetime) expression to be evaluated in full.

Count by 3 columns

LocustDB

SELECT passenger_count, to_year(pickup_datetime), trip_distance / 1000, count(0) FROM trips;

ClickHouse

SELECT 
    passenger_count, 
    toYear(pickup_date) AS year, 
    round(trip_distance) AS distance, 
    count(*)
FROM trips_mergetree 
GROUP BY 
    passenger_count, 
    year, 
    distance

kdb+

select count 1b by year, passenger_count, floor trip_distance from trips

In the original data set trip_distance is a floating-point column, which LocustDB does not currently support. The column is multiplied by 1000 on ingestion and then stored as an integer. The original query rounds the trip distance, which in this case is roughly equivalent to dividing by 1000 (not fully because division truncates, but close enough for the purpose of this benchmark). The same caveat about the to_year(pickup_datetime) expression applies to this query as well.

Sparse filter

LocustDB

SELECT trip_id FROM trips WHERE (passenger_count = 5) AND (vendor_id = "CMT") AND (total_amount < 500) AND (store_and_fwd_flag = "1") LIMIT 100;

ClickHouse

SELECT trip_id
FROM trips_mergetree 
WHERE (passenger_count = 5) AND (vendor_id = 'CMT') AND (total_amount < 5) AND (store_and_fwd_flag = 1)

kdb+

select trip_id from trips where passenger_count = 5,vendor_id like "CMT",total_amount < 5,store_and_fwd_flag=1

This query matches very few records (<200), which means the entire dataset has to be scanned. LocustDB returns about half as many records on this query because it differentiates between multiple different values for store_and_forward which were all forced to 1 during the ingestion process for ClickHouse/kdb+, but this does not affect performance.

Top n

LocustDB

SELECT passenger_count, trip_distance, total_amount FROM trips ORDER BY total_amount DESC LIMIT 100;

ClickHouse

SELECT 
    passenger_count, 
    trip_distance, 
    total_amount
FROM trips_mergetree 
ORDER BY total_amount DESC
LIMIT 100

kdb+

cnt:count trips
minmax:select min total_amount, max total_amount from trips
tamin:first minmax`total_amount
tamax:first minmax`total_amount1
low:0
mid:cnt%10
hight:cnt
estimate:((tamax - tamin) % 10) + tamin
cnt2:count select total_amount from trips where total_amount > estimate
while[(cnt2<100)|(cnt2>100000);estimate:(((tamax - tamin)*mid) % cnt) + tamin;cnt2:count select total_amount from trips where total_amount > estimate;if[cnt2<100;high:mid;mid:mid - ((mid - low)*0.9)];if[cnt2>100000;low:mid;mid:mid + ((high - mid)%2)]]
result:select total_amount, passenger_count, trip_distance from trips where total_amount > estimate

This query selects the 100 records with the largest total_amount. I found it very difficult to efficiently express this query in Q (kdb+’s query language). The query/small program I ended up using was the result of kind assistance by members of the kdb+ users group. It looks quite horrifying (no doubt due to my poor grasp of Q), but I think it speaks for the expressiveness of Q that it is even possible to make this query work at all without inbuilt support.

Reducible high cardinality

LocustDB

SELECT pickup_puma, dropoff_puma, passenger_count, count(0) FROM trips;

ClickHouse

SELECT 
    pickup_puma, 
    dropoff_puma, 
    passenger_count, 
    count()
FROM trips_mergetree 
GROUP BY 
    pickup_puma, 
    dropoff_puma, 
    passenger_count

kdb+

select count 1b by pickup_puma, dropoff_puma, passenger_count from trips;

For all the other group by queries above, LocustDB is able to statically determine that the range of possible values in the group by is relatively small which makes it possible to compute aggregations by indexing directly into a fixed size array. This query has a much wider range of possible values in the group by and requires an additional hash map pass in about 87% of partitions to reduce the cardinality.

Irreducible high cardinality

LocustDB

SELECT trip_id / 5, sum(total_amount) FROM trips;

ClickHouse

SELECT 
    intDiv(trip_id, 5), 
    sum(total_amount)
FROM trips_mergetree 
GROUP BY intDiv(trip_id, 5)
LIMIT 100

kdb+

select sum total_amount by trip_id div 5 from trips

The values in the trip_id column are unique integers ranging from 1 to 1.46 billion, which makes this a pathologically difficult query that requires operating on large temporary results. The / 5 is added to make the query a bit easier, without it LocustDB tries to allocate at least 16B * 1.46 * 10^9 = 24GB of memory and crashes (ClickHouse and kdb+ are not exempt from this problem, but have the advantage of being able to evict unused data from memory and spill over to disk).

Dictionary Encoding

LocustDB

SELECT pickup_ntaname, count(0) FROM trips;

ClickHouse

SELECT 
    pickup_ntaname, 
    count()
FROM trips 
GROUP BY pickup_ntaname

kdb+

select count 1b by pickup_ntaname from trips

This is a completely unfair query meant to demonstrate the power of supporting flexible and automatic dictionary encoding. The pickup_ntaname column is stored as varchars by ClickHouse and kdb+, and as dictionary encoded single byte values by LocustDB. It is very common for string data to be amenable to dictionary encoding in this way, even and especially when it is impractical to enumerate all possible values ahead of time (urls, countries, languages, names of all kind, error codes, log lines, …).

Results & discussion

Following Mark’s methodology, results are reported as the best time out of 10 runs (actually 10±1 because counting is hard). Variance for all systems was relatively low after the first run, most of the time a small single digit percentage. Results marked with * are not directly comparable to other results. Results marked with † were reading from disk and were run less than 10 times.

#QueryLocustDB/s (nerfed/s)ClickHouse/skdb+ with index/s
1Simple Count0.250.890.84
2Average0.532.33.0*
3Count by 20.52* (3.4)3.70.74*
4Count by 33.0* (6.0*)5.46.4*
5Sparse Filter1.12.00.62*
6Top N0.232.55.3
7Reducible Cardinality4.25.07.6*
8 Irreducible Cardinality24140†9.5
9Dictionary Encoding0.20*111012†

* Not directly comparable to other results                  Query reads from/writes to disk.

One note about the results for kdb+: The ingestion scripts I used for kdb+ partition/index the data on the year and passenger_count columns. This may give it a somewhat unfair advantage over ClickHouse and LocustDB on all queries that group or filter on these columns (queries 2, 3, 4, 5 and 7). I was going to figure out how to remove that partitioning and report those results as well, but didn’t manage before my self-imposed deadline.

On all but one of the results that allow for direct comparison, LocustDB comes out ahead of both ClickHouse and kdb+. I am unsure how exactly I am able to get these speedups. What LocustDB already does very well is generating close-to-optimal query plans that contain very few superfluous operations and are specialized to work directly on column data without any type conversions. E.g. 95% of runtime in query 1 is spent on just 7 machine code instructions that are part of this loop. So far I have not made any attempts at improving the operators used by the query engine through use of SIMD, loop unrolling, elimination of bounds checks, not using Vec::push everywhere, or other micro-optimizations. I think there are still significant gains to be made in that fashion before hitting fundamental limitations of the hardware.

The main outliers are queries 3 and 4. These queries are dominated by converting the pickup_date column from UNIX timestamp converting to a calendar year. This is a relatively expensive operation and accounts for 64% and 36% of runtime in queries 3 and 4 respectively. The year is precomputed during the ingestion for kdb+. Query 4 is further burdened by having to compute an integer division (48% for the 3.0s result, 27% for the 6.0s result) as opposed to floating-point rounding. According to Agner Fogidiv r32 has latency of 22-29 and reciprocal throughput of 8-11 on Haswell. I don’t know exactly which instructions ClickHouse/kdb+ use for rounding but they are bound to be significantly faster.

Query 6 receives it’s lightning speed from this neat little loop which I might elaborate on in a future blog post. It’s still subject to a pathological case for approximately sorted datasets, e.g. select trip_id from trips order by trip_id desc limit 100 takes over 4 seconds (ClickHouse: 1.2s). This is actually quite likely to occur in practice e.g. when selecting the 100 most recent events from a dataset. I have some ideas on how to fix this edge case but they didn’t fit in the margin and are left as an exercise for the reader.

On query 8, ClickHouse was allocating a lot of memory which caused parts of the trip_id column to be evicted from memory before the end of the query, after which it had to be read from disk again. I ran the query a total of 7 times, and 3 times they hit the memory limit and aborted. LocustDB still has inefficiencies for merging large result sets and struggled on query 8 as well. Execution time varied widely between 24s and 60s, and memory had to be written to swap on occasion. So at least for now, kdb+ takes the crown with an impressive and consistent 9.4s.

There’s not much to be said about query 9 other than if you are going to build an analytics database in 2018 you should strongly consider support for dictionary encoding. kdb+ seemed unable to keep the pickup_ntaname column cached in memory and I only collected results from a single run because ain’t nobody got time for that.

One curiosity about query 9 is that it is quite a bit faster than query 1 (0.20s vs. 0.25s) despite executing an identical query plan. This confused me for the longest time until I realized that consecutive values in the cab_type column are almost always identical. This introduces a data dependency between subsequent loop iterations that read from/write to memory locations storing the current count for each cab type. My hypothesis is that these data dependencies cause pipeline stalls, but I don’t know this for certain nor the precise mechanism by which it occurs.

Memory bandwidth

I have encountered multiple claims that in-memory analytics databases are often constrained by memory bandwidth, and I myself held that misconception for longer than I want to admit. While developing LocustDB, I have found it quite difficult to reach fundamental memory bandwidth limitations. Using query 1 as an example, memory bandwidth can be calculated as 1.46 x 10^9B / 0.25s = 5.4GiB/s. This is far below peak memory bandwidth on my system, which is 68GB/s according to Intel spec sheets and 50GiB/s according to pmbw. Query 2 reaches speeds of 13GiB/s. Even if passenger_count was a 64bit integer, total_amount a 64bit float, and query runtime unchanged, bandwidth for query 2 would still only come to 41GiB/s. Fast queries reading from a lot of wide columns could get there, but e.g. query 5 is still only at 6.2GiB/s. The more complex queries have significantly lower read speeds and get nowhere close to the limit. As further evidence, the figures below show the runtime and read speed of select passenger_count, count(0) from trips as the width of passenger_count column is varied from 1byte to 8bytes:

Runtime is largely unaffected by column width, and only drops slightly for 8byte when read speed maxes out at 43.5GiB/s. SIMD and other optimizations could push queries closer to the limit, as might running on systems with multiple CPUs and/or a large number of cores. On the other hand, adding compression to reduce data volume at the cost of additional cycles would tilt the balance decisively into the opposite direction. A possible exception are queries that produce large temporary results that exceed the cache size (e.g. query 9), but there are still many optimizations to be applied before that judgement can be made.

Scalability

This obligatory graph shows how LocustDB’s performance scales with number of threads. The results were collected at 106acb as the geometric mean of LocustDB’s inbuilt benchmark suite run at various thread counts. The suite contains a similar mix of queries as used in above benchmarks, and runs on 100M records of the same taxi trip dataset. Before collecting these results I disabled turbo boost in BIOS to make the numbers look better.

Pretty picture

Hard numbers

TheadsGeometric Mean Runtime/msNormalized Performance
14801
22471.95
31672.88
41263.80
51024.70
6905.34
7895.40
8865.60
9786.16
10736.59
11716.80
12687.09

LocustDB scales well enough up the number of physical cores and then exposes hyperthreading as a lie. (I kid, getting an additional 33% of performance is nothing to sneer at.)

Learn all about how LocustDB works in Part 3.

Appendix

Data export

Some of the datatypes/column names in the original dataset changed and I had to make a few small adjustments to Mark’s data export script: taxi_data_export.sql

Data import

To allow LocustDB to load the dataset into memory, I dropped about 70% of columns/data volume. LocustDB did not support floating point numbers, and I imported floating point columns by multiplying by some fixed amount (e.g. 100, 1000) and then truncating and storing as integers. The Ingestion process was defined by this function. The data load took a bit less than 20min, all credit goes to the flate2 crate/miniz and the csv crate.

The import process for ClickHouse and kdb+ is mostly identical to Mark’s original scripts. I had to make some adjustments for changed column datatypes and different hardware setup.

I used these scripts to import the dataset into ClickHouse: trans.py, create_table, import.sh, import_mergetree

I used these scripts to import the dataset into kdb+: load.q, loads.sh

Methodology

All benchmarks were run under Ubuntu-16.04. I ran sudo cpupower frequency-set -g performance to set frequency scaling to high performance mode which reduces query runtimes by maybe 30ms. I killed any background programs that dared consume more than 1% of CPU.

The version of LocustDB used for the benchmarks was 9a3ac4 and was run with the command RUSTFLAGS="-Ccodegen-units=1" CARGO_INCREMENTAL=0 cargo run --release --bin repl under rustc 1.28.0-nightly (2a0062974 2018-06-09).

The version of ClickHouse used for the benchmarks was 1.1.54388. The client driver was invoked with --max_threads=12 and --max_memory_usage=60000000000 to allow all cores to be utilized, and allow for sufficient memory for all queries. All other options were left at the default settings.

The version of kdb+ used for the benchmarks was 3.6 2018.06.14, 64bit edition. The binary was invoked with -s 12 to allow all cores to be utilized. All other options were left at the default settings.

Raw results

When I say raw results, I mean a combined total of two hundred thousand lines of terminal scrollback that includes query result tables, typos, evidence of my poor counting skills and probably other embarrassments. Opening the links below might lock up your browser.

ClickHouse: clickhouse-results.txt

LocustDB: locustdb-results.txt

LocustDB (nerfed): locustdb-nerfed-results.txt

kdb+: I forgot to copy the scrollback before closing the terminal and didn’t want to rerun everything. Sue me.

9 thoughts on “How to Analyze Billions of Records per Second on a Single Desktop PC

    1. clemenswinter Post author

      I did evaluate Druid for a specific analytics use case at work one time and found performance to be underwhelming (but it might work better for other workloads and I seem to remember that Druid also offers very strong consistency and durability guarantees).

      Inverted indices make for very efficient point lookups/sparse filtering on a single column but introduce indexing overhead and don’t benefit all queries. The idea behind systems like ClickHouse and LocustDB is to make brute force scans so fast that you don’t even need an index. This does sacrifice performance/efficiency on simpler queries. But if your queries are large and comparatively infrequent (as is the case for many analytics workloads) this approach is a very good fit and should generally give superior performance and cost efficiency.

      Reply
  1. Tim McNamara 🐍🦀 (@timClicks)

    Clemens – this is a really ambitious, exciting project. Well done for continuing your motivation after the initial hackfest!

    Do you have a gut feel for how many records that you would need before a system like this would be justified? In particular, how essential is the distributed component? It sounds like it scales down to a single laptop quite well.

    Reply
    1. clemenswinter Post author

      There are a number of design constraints to consider:
      1. How many queries are run per second
      2. How costly are queries (i.e. for how long do they run, which among other things depends on the number of records they touch)
      3. Latency requirements (i.e. you want queries to complete within n seconds)
      4. How many records are stored in the system

      If any of 1-3 are the bottleneck, you will need to add additional machines/CPUs to increase throughput and/or decrease latency. If a single machine can process all your queries sufficiently quickly, the limiting factor for the number of records is now the size of your RAM (or disk, but this is not currently supported by LocustDB and will lead to lower query speeds).

      So then the number of records you can store is (more or less) given by the formula memory_size * compression_ratio / size_per_record.

      Memory size is obvious.
      Compression ratio will depend on the actual data values (and also how good the compression algorithms are). Compression ratio is difficult to predict and depends very much on the values in the dataset, typical values might be between 3x and 20x.
      The size per record will depend on how many columns your table has and the size of each column.

      To give a concrete example, suppose you have 64GB of memory, you want to leave a spare 10GB of memory for running queries, you table has two dozen columns storing mostly integers and dictionary compressible strings with total record size of 200B uncompressed and 20B compressed. Then the number of records you could store would be roughly 54GB/200B = 2.7 billion.

      Of course the numbers for your use case may be very different, and you would have to actually load and benchmark (a portion) of your dataset to get accurate estimates for compression rates and query speeds.

      Reply
  2. david

    Reminds me a lot of the concepts from SanssouciDB described in detail by Hasso Plattner in his book “In-Memory Data Management”. I thought it might be interesting for you as well, since you haven’t mentioned SAP HANA as a database using the same concepts such as caching, compression, massive parallelism, etc.
    SanssouciDB was the research project which’s concepts eventually evolved into the dbms of the commercial HANA platform.

    Reply
    1. clemenswinter Post author

      Yes, SAP HANA seems to be quite successful in this space. I’d never heard of SanssouciDB or the book before, will have to check that out!

      Reply
  3. Pingback: How Read 100s of Millions of Records per Second from a Single Disk | Clemens' Blog

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.