Lies, damned lies and benchmarks
This section evaluates LocustDB on benchmarks based on a series of blog posts by Mark Litwintschik.
Of course this benchmark can only shine light onto a small slice of functionality and the systems I am comparing against are much more complete than LocustDB. I have used the dataset and queries featured in the benchmarks during development, and while I have restrained from adding any optimizations that are tailored to the dataset this naturally still introduced bias. Still, the benchmarks cover a fairly general class of important queries and I do believe the results paint a fair picture of LocustDB’s potential if not its current capabilities.
The data used in the benchmark is derived from a public dataset of 1.46 billion taxi rides in New York City. The raw dataset requires additional processing to turn it into a single denormalized table (see appendix). Stored as uncompressed CSVs, the dataset takes up 602GB of space and reduces to 114GB after applying gzip. Despite Mark’s detailed instructions for generating the denormalized dataset (which built on work by Todd Schneider), the process is fairly arduous and claimed several hours of my time. Downloading the raw data took multiple hours and the denormalization process ran for about a week. As a public service, I am making the resulting data set available for download in case anyone else wants to play with it.
All benchmarks were run on my desktop PC which contains a i7-5820K processor (Haswell E, 3.30-3.60GHz, 6 physical and 12 virtual cores, 15MB L3 cache) and 8x8GB of DDR4-2400 RAM.
The benchmarks feature ClickHouse and kdb+. They were chosen since they are currently among the highest performing analytic systems that don’t require specialized hardware, and because Mark already did the hard work of setting up schemas and scripts that allow the taxi ride dataset to be loaded into the systems.
The first four queries come from Mark’s benchmarks, the others were added by me to cover a wider range of query classes.
SELECT cab_type, count(0) FROM trips;
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type
select count 1b by cab_type from trips
LocustDB does not have a group by clause and implicitly groups by all select expressions that are not aggregations (it’s a feature). As a side note, this is not an ideal query for benchmarking because
cab_type has only two possible values and in almost all of the input files there is only one value which would make it very easy to store this column as a single value + count and compute this query instantly.
SELECT passenger_count, count(0), sum(total_amount) FROM trips;
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count
select avg total_amount by passenger_count from trips
The original query computes the
avg(total_amount) but LocustDB does not currently support averages, so I compute the sum and count instead which performs the same amount of work.
Count by 2 columns
SELECT passenger_count, to_year(pickup_datetime), count(0) FROM trips;
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year
select count 1b by year, passenger_count from trips
Count the number of trips, broken down by year and passenger count. As it turns out, the
to_year expression reduces to a constant in most partitions which allows LocustDB to completely eliminate it from the query plan. This is very neat! But it also comes dangerously close to gaming the benchmark which I promised I wouldn’t. For this reason the results also report runtimes for a “nerfed” version of LocustDB which prevents this particular optimization and forces the
to_year(pickup_datetime) expression to be evaluated in full.
Count by 3 columns
SELECT passenger_count, to_year(pickup_datetime), trip_distance / 1000, count(0) FROM trips;
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance
select count 1b by year, passenger_count, floor trip_distance from trips
In the original data set
trip_distance is a floating-point column, which LocustDB does not currently support. The column is multiplied by 1000 on ingestion and then stored as an integer. The original query rounds the trip distance, which in this case is roughly equivalent to dividing by 1000 (not fully because division truncates, but close enough for the purpose of this benchmark). The same caveat about the
to_year(pickup_datetime) expression applies to this query as well.
SELECT trip_id FROM trips WHERE (passenger_count = 5) AND (vendor_id = "CMT") AND (total_amount < 500) AND (store_and_fwd_flag = "1") LIMIT 100;
SELECT trip_id FROM trips_mergetree WHERE (passenger_count = 5) AND (vendor_id = 'CMT') AND (total_amount < 5) AND (store_and_fwd_flag = 1)
select trip_id from trips where passenger_count = 5,vendor_id like "CMT",total_amount < 5,store_and_fwd_flag=1
This query matches very few records (<200), which means the entire dataset has to be scanned. LocustDB returns about half as many records on this query because it differentiates between multiple different values for
store_and_forward which were all forced to 1 during the ingestion process for ClickHouse/kdb+, but this does not affect performance.
SELECT passenger_count, trip_distance, total_amount FROM trips ORDER BY total_amount DESC LIMIT 100;
SELECT passenger_count, trip_distance, total_amount FROM trips_mergetree ORDER BY total_amount DESC LIMIT 100
cnt:count trips minmax:select min total_amount, max total_amount from trips tamin:first minmax`total_amount tamax:first minmax`total_amount1 low:0 mid:cnt%10 hight:cnt estimate:((tamax - tamin) % 10) + tamin cnt2:count select total_amount from trips where total_amount > estimate while[(cnt2<100)|(cnt2>100000);estimate:(((tamax - tamin)*mid) % cnt) + tamin;cnt2:count select total_amount from trips where total_amount > estimate;if[cnt2<100;high:mid;mid:mid - ((mid - low)*0.9)];if[cnt2>100000;low:mid;mid:mid + ((high - mid)%2)]] result:select total_amount, passenger_count, trip_distance from trips where total_amount > estimate
This query selects the 100 records with the largest
total_amount. I found it very difficult to efficiently express this query in Q (kdb+’s query language). The query/small program I ended up using was the result of kind assistance by members of the kdb+ users group. It looks quite horrifying (no doubt due to my poor grasp of Q), but I think it speaks for the expressiveness of Q that it is even possible to make this query work at all without inbuilt support.
Reducible high cardinality
SELECT pickup_puma, dropoff_puma, passenger_count, count(0) FROM trips;
SELECT pickup_puma, dropoff_puma, passenger_count, count() FROM trips_mergetree GROUP BY pickup_puma, dropoff_puma, passenger_count
select count 1b by pickup_puma, dropoff_puma, passenger_count from trips;
For all the other group by queries above, LocustDB is able to statically determine that the range of possible values in the group by is relatively small which makes it possible to compute aggregations by indexing directly into a fixed size array. This query has a much wider range of possible values in the group by and requires an additional hash map pass in about 87% of partitions to reduce the cardinality.
Irreducible high cardinality
SELECT trip_id / 5, sum(total_amount) FROM trips;
SELECT intDiv(trip_id, 5), sum(total_amount) FROM trips_mergetree GROUP BY intDiv(trip_id, 5) LIMIT 100
select sum total_amount by trip_id div 5 from trips
The values in the
trip_id column are unique integers ranging from 1 to 1.46 billion, which makes this a pathologically difficult query that requires operating on large temporary results. The
/ 5 is added to make the query a bit easier, without it LocustDB tries to allocate at least
16B * 1.46 * 10^9 = 24GB of memory and crashes (ClickHouse and kdb+ are not exempt from this problem, but have the advantage of being able to evict unused data from memory and spill over to disk).
SELECT pickup_ntaname, count(0) FROM trips;
SELECT pickup_ntaname, count() FROM trips GROUP BY pickup_ntaname
select count 1b by pickup_ntaname from trips
This is a completely unfair query meant to demonstrate the power of supporting flexible and automatic dictionary encoding. The
pickup_ntaname column is stored as varchars by ClickHouse and kdb+, and as dictionary encoded single byte values by LocustDB. It is very common for string data to be amenable to dictionary encoding in this way, even and especially when it is impractical to enumerate all possible values ahead of time (urls, countries, languages, names of all kind, error codes, log lines, …).
Results & discussion
Following Mark’s methodology, results are reported as the best time out of 10 runs (actually 10±1 because counting is hard). Variance for all systems was relatively low after the first run, most of the time a small single digit percentage. Results marked with * are not directly comparable to other results. Results marked with † were reading from disk and were run less than 10 times.
|#||Query||LocustDB/s (nerfed/s)||ClickHouse/s||kdb+ with index/s|
|3||Count by 2||0.52* (3.4)||3.7||0.74*|
|4||Count by 3||3.0* (6.0*)||5.4||6.4*|
* Not directly comparable to other results † Query reads from/writes to disk.
One note about the results for kdb+: The ingestion scripts I used for kdb+ partition/index the data on the
passenger_count columns. This may give it a somewhat unfair advantage over ClickHouse and LocustDB on all queries that group or filter on these columns (queries 2, 3, 4, 5 and 7). I was going to figure out how to remove that partitioning and report those results as well, but didn’t manage before my self-imposed deadline.
On all but one of the results that allow for direct comparison, LocustDB comes out ahead of both ClickHouse and kdb+. I am unsure how exactly I am able to get these speedups. What LocustDB already does very well is generating close-to-optimal query plans that contain very few superfluous operations and are specialized to work directly on column data without any type conversions. E.g. 95% of runtime in query 1 is spent on just 7 machine code instructions that are part of this loop. So far I have not made any attempts at improving the operators used by the query engine through use of SIMD, loop unrolling, elimination of bounds checks, not using
Vec::push everywhere, or other micro-optimizations. I think there are still significant gains to be made in that fashion before hitting fundamental limitations of the hardware.
The main outliers are queries 3 and 4. These queries are dominated by converting the
pickup_date column from UNIX timestamp converting to a calendar year. This is a relatively expensive operation and accounts for 64% and 36% of runtime in queries 3 and 4 respectively. The year is precomputed during the ingestion for kdb+. Query 4 is further burdened by having to compute an integer division (48% for the 3.0s result, 27% for the 6.0s result) as opposed to floating-point rounding. According to Agner Fog,
idiv r32 has latency of 22-29 and reciprocal throughput of 8-11 on Haswell. I don’t know exactly which instructions ClickHouse/kdb+ use for rounding but they are bound to be significantly faster.
Query 6 receives it’s lightning speed from this neat little loop which I might elaborate on in a future blog post. It’s still subject to a pathological case for approximately sorted datasets, e.g.
select trip_id from trips order by trip_id desc limit 100 takes over 4 seconds (ClickHouse: 1.2s). This is actually quite likely to occur in practice e.g. when selecting the 100 most recent events from a dataset. I have some ideas on how to fix this edge case but they didn’t fit in the margin and are left as an exercise for the reader.
On query 8, ClickHouse was allocating a lot of memory which caused parts of the
trip_id column to be evicted from memory before the end of the query, after which it had to be read from disk again. I ran the query a total of 7 times, and 3 times they hit the memory limit and aborted. LocustDB still has inefficiencies for merging large result sets and struggled on query 8 as well. Execution time varied widely between 24s and 60s, and memory had to be written to swap on occasion. So at least for now, kdb+ takes the crown with an impressive and consistent 9.4s.
There’s not much to be said about query 9 other than if you are going to build an analytics database in 2018 you should strongly consider support for dictionary encoding. kdb+ seemed unable to keep the
pickup_ntaname column cached in memory and I only collected results from a single run because ain’t nobody got time for that.
One curiosity about query 9 is that it is quite a bit faster than query 1 (0.20s vs. 0.25s) despite executing an identical query plan. This confused me for the longest time until I realized that consecutive values in the
cab_type column are almost always identical. This introduces a data dependency between subsequent loop iterations that read from/write to memory locations storing the current count for each cab type. My hypothesis is that these data dependencies cause pipeline stalls, but I don’t know this for certain nor the precise mechanism by which it occurs.
I have encountered multiple claims that in-memory analytics databases are often constrained by memory bandwidth, and I myself held that misconception for longer than I want to admit. While developing LocustDB, I have found it quite difficult to reach fundamental memory bandwidth limitations. Using query 1 as an example, memory bandwidth can be calculated as
1.46 x 10^9B / 0.25s = 5.4GiB/s. This is far below peak memory bandwidth on my system, which is 68GB/s according to Intel spec sheets and 50GiB/s according to pmbw. Query 2 reaches speeds of 13GiB/s. Even if
passenger_count was a 64bit integer,
total_amount a 64bit float, and query runtime unchanged, bandwidth for query 2 would still only come to 41GiB/s. Fast queries reading from a lot of wide columns could get there, but e.g. query 5 is still only at 6.2GiB/s. The more complex queries have significantly lower read speeds and get nowhere close to the limit. As further evidence, the figures below show the runtime and read speed of
select passenger_count, count(0) from trips as the width of
passenger_count column is varied from 1byte to 8bytes:
Runtime is largely unaffected by column width, and only drops slightly for 8byte when read speed maxes out at 43.5GiB/s. SIMD and other optimizations could push queries closer to the limit, as might running on systems with multiple CPUs and/or a large number of cores. On the other hand, adding compression to reduce data volume at the cost of additional cycles would tilt the balance decisively into the opposite direction. A possible exception are queries that produce large temporary results that exceed the cache size (e.g. query 9), but there are still many optimizations to be applied before that judgement can be made.
This obligatory graph shows how LocustDB’s performance scales with number of threads. The results were collected at 106acb as the geometric mean of LocustDB’s inbuilt benchmark suite run at various thread counts. The suite contains a similar mix of queries as used in above benchmarks, and runs on 100M records of the same taxi trip dataset. Before collecting these results I disabled turbo boost in BIOS to make the numbers look better.
|Theads||Geometric Mean Runtime/ms||Normalized Performance|
LocustDB scales well enough up the number of physical cores and then exposes hyperthreading as a lie. (I kid, getting an additional 33% of performance is nothing to sneer at.)
Learn all about how LocustDB works in Part 3.
Some of the datatypes/column names in the original dataset changed and I had to make a few small adjustments to Mark’s data export script: taxi_data_export.sql
To allow LocustDB to load the dataset into memory, I dropped about 70% of columns/data volume. LocustDB did not support floating point numbers, and I imported floating point columns by multiplying by some fixed amount (e.g. 100, 1000) and then truncating and storing as integers. The Ingestion process was defined by this function. The data load took a bit less than 20min, all credit goes to the flate2 crate/miniz and the csv crate.
The import process for ClickHouse and kdb+ is mostly identical to Mark’s original scripts. I had to make some adjustments for changed column datatypes and different hardware setup.
I used these scripts to import the dataset into ClickHouse: trans.py, create_table, import.sh, import_mergetree
I used these scripts to import the dataset into kdb+: load.q, loads.sh
All benchmarks were run under Ubuntu-16.04. I ran
sudo cpupower frequency-set -g performance to set frequency scaling to high performance mode which reduces query runtimes by maybe 30ms. I killed any background programs that dared consume more than 1% of CPU.
The version of LocustDB used for the benchmarks was 9a3ac4 and was run with the command
RUSTFLAGS="-Ccodegen-units=1" CARGO_INCREMENTAL=0 cargo run --release --bin repl under rustc 1.28.0-nightly (2a0062974 2018-06-09).
The version of ClickHouse used for the benchmarks was 1.1.54388. The client driver was invoked with
--max_memory_usage=60000000000 to allow all cores to be utilized, and allow for sufficient memory for all queries. All other options were left at the default settings.
The version of kdb+ used for the benchmarks was 3.6 2018.06.14, 64bit edition. The binary was invoked with
-s 12 to allow all cores to be utilized. All other options were left at the default settings.
When I say raw results, I mean a combined total of two hundred thousand lines of terminal scrollback that includes query result tables, typos, evidence of my poor counting skills and probably other embarrassments. Opening the links below might lock up your browser.
LocustDB (nerfed): locustdb-nerfed-results.txt
kdb+: I forgot to copy the scrollback before closing the terminal and didn’t want to rerun everything. Sue me.
Have you tried middle-out compression to speed up search results? lol
Have you had a look at solution based on inverted index like Druid or Imhotep ?
I did evaluate Druid for a specific analytics use case at work one time and found performance to be underwhelming (but it might work better for other workloads and I seem to remember that Druid also offers very strong consistency and durability guarantees).
Inverted indices make for very efficient point lookups/sparse filtering on a single column but introduce indexing overhead and don’t benefit all queries. The idea behind systems like ClickHouse and LocustDB is to make brute force scans so fast that you don’t even need an index. This does sacrifice performance/efficiency on simpler queries. But if your queries are large and comparatively infrequent (as is the case for many analytics workloads) this approach is a very good fit and should generally give superior performance and cost efficiency.
Thanks for your thorough reply!
Clemens – this is a really ambitious, exciting project. Well done for continuing your motivation after the initial hackfest!
Do you have a gut feel for how many records that you would need before a system like this would be justified? In particular, how essential is the distributed component? It sounds like it scales down to a single laptop quite well.
There are a number of design constraints to consider:
1. How many queries are run per second
2. How costly are queries (i.e. for how long do they run, which among other things depends on the number of records they touch)
3. Latency requirements (i.e. you want queries to complete within n seconds)
4. How many records are stored in the system
If any of 1-3 are the bottleneck, you will need to add additional machines/CPUs to increase throughput and/or decrease latency. If a single machine can process all your queries sufficiently quickly, the limiting factor for the number of records is now the size of your RAM (or disk, but this is not currently supported by LocustDB and will lead to lower query speeds).
So then the number of records you can store is (more or less) given by the formula memory_size * compression_ratio / size_per_record.
Memory size is obvious.
Compression ratio will depend on the actual data values (and also how good the compression algorithms are). Compression ratio is difficult to predict and depends very much on the values in the dataset, typical values might be between 3x and 20x.
The size per record will depend on how many columns your table has and the size of each column.
To give a concrete example, suppose you have 64GB of memory, you want to leave a spare 10GB of memory for running queries, you table has two dozen columns storing mostly integers and dictionary compressible strings with total record size of 200B uncompressed and 20B compressed. Then the number of records you could store would be roughly 54GB/200B = 2.7 billion.
Of course the numbers for your use case may be very different, and you would have to actually load and benchmark (a portion) of your dataset to get accurate estimates for compression rates and query speeds.
Reminds me a lot of the concepts from SanssouciDB described in detail by Hasso Plattner in his book “In-Memory Data Management”. I thought it might be interesting for you as well, since you haven’t mentioned SAP HANA as a database using the same concepts such as caching, compression, massive parallelism, etc.
SanssouciDB was the research project which’s concepts eventually evolved into the dbms of the commercial HANA platform.
Yes, SAP HANA seems to be quite successful in this space. I’d never heard of SanssouciDB or the book before, will have to check that out!
Pingback: How Read 100s of Millions of Records per Second from a Single Disk | Clemens' Blog
I’m aware this is an old post and feature sets / versions have changed but from what I can gather ClickHouse wasn’t run with LLVM enabled (compile=1; is disabled by default but is perfectly production ready). In my environment this generally improves performance by 15-50% depending on the query. Note that the first time a query runs there is a cost as it performs code-gen but that is not indicative of real world usage where the lib would pre-exist due to previous runs.
Additionally newer versions of ClickHouse support dictionary encoding (LowCardinality). Would be interesting to see this test repeated with these new features as I suspect the results would be quite different now.
For KDB, casting the column to symbol, (`$) would allow you to do the dictionary aggregation. KDB’s symbol type is what everyone else calls an enum.
Hej, Interesting stuff. Are you aware of the more obscure “APL” like systems like kdb+ or shakti (the newer and IMHO more interesting project by Arthur Whitney)?
Column based, memory mapped files, in memory + on disk data abse + CEP engine.
I actually have some comparisons to kdb+ in part 2 of this blogpost. This is the first time I’ve heard of shakti, hard to keep up with all the different database systems these days. There doesn’t seem to be much information about it yet but looks like like a neat project, will be interesting to see if it gains traction.