On Friday, we announced DNS analytics for all Cloudflare customers. Because of our scale –– by the time you’ve finished reading this, Cloudflare DNS will have handled millions of DNS queries –– we had to be creative in our implementation. In this post, we’ll describe the systems that make up DNS Analytics which help us comb through trillions of these logs each month.
How logs come in from the edge
Cloudflare already has a data pipeline for HTTP logs. We wanted to utilize what we could of that system for the new DNS analytics. Every time one of our edge services gets an HTTP request, it generates a structured log message in the Cap’n Proto format and sends it to a local multiplexer service. Given the volume of the data, we chose not to record the full DNS message payload, only telemetry data we are interested in such as response code, size, or query name, which has allowed us to keep only ~150 bytes on average per message. It is then fused with processing metadata such as timing information and exceptions triggered during query processing. The benefit of fusing data and metadata at the edge is that we can spread the computational cost across our thousands of edge servers, and log only what we absolutely need.
The multiplexer service (known as “log forwarder”) is running on each edge node, assembling log messages from multiple services and transporting them to our warehouse for processing over a TLS secured channel. A counterpart service running in the warehouse receives and demultiplexes the logs into several Apache Kafka clusters. Over the years, Apache Kafka has proven to be an invaluable service for buffering between producers and downstream consumers, preventing data loss when consumers fail over or require maintenance. Since version 0.10, Kafka allows rack-aware allocation of replicas, which improves resilience against rack or site failure, giving us fault tolerant storage of unprocessed messages.
Having a queue with structured logs has allowed us to investigate issues retrospectively without requiring access to production nodes, but it has proven to be quite laborious at times. In the early days of the project we would skim the queue to find offsets for the rough timespan we needed, and then extract the data into HDFS in Parquet format for offline analysis.
The HTTP analytics service was built around stream processors generating aggregations, so we planned to leverage Apache Spark to stream the logs to HDFS automatically. As Parquet doesn’t natively support indexes or arranging the data in a way that avoids full table scans, it’s impractical for on-line analysis or serving reports over an API. There are extensions like parquet-index that create indexes over the data, but not on-the-fly. Given this, the initial plan was to only show aggregated reports to customers, and keep the raw data for internal troubleshooting.
The problem with aggregated summaries is that they only work on columns with low cardinality (a number of unique values). With aggregation, each column in a given time frame explodes to the number of rows equal to the number of unique entries, so it’s viable to aggregate on something like response code which only has 12 possible values, but not a query name for example. Domain names are subject to popularity, so if, for example, a popular domain name gets asked 1,000 times a minute, one could expect to achieve 1000x row reduction for per-minute aggregation, however in practice it is not so.
Due to how DNS caching works, resolvers will answer identical queries from cache without going to the authoritative server for the duration of the TTL. The TTL tends to be longer than a minute. So, while authoritative servers see the same request many times, our data is skewed towards non-cacheable queries such as typos or random prefix subdomain attacks. In practice, we see anywhere between 0 - 60x row reduction when aggregating by query names, so storing aggregations in multiple resolutions almost negates the row reduction. Aggregations are also done with multiple resolution and key combinations, so aggregating on a high cardinality column can even result in more rows than original.
For these reasons we started by only aggregating logs at the zone level, which was useful enough for trends, but too coarse for root cause analysis. For example, in one case we were investigating short bursts of unavailability in one of our data centers. Having unaggregated data allowed us to narrow the issue down to the specific DNS queries experiencing latency spikes, and then correlated the queries with a misconfigured firewall rule. Cases like these would be much harder to investigate with only aggregated data as it only affected a tiny percentage of requests that would be lost in the aggregations.
So we started looking into several OLAP (Online Analytical Processing) systems. The first system we looked into was Druid. We were really impressed with the capabilities and how the front-end (Pivot and formerly Caravel) is able to slice and dice the data, allowing us to generate reports with arbitrary dimensions. Druid has already been deployed in similar environments with over 100B events/day, so we were confident it could work, but after testing on sampled data we couldn’t justify the hardware costs of hundreds of nodes. Around the same time Yandex open-sourced their OLAP system, ClickHouse.
And then it Clicked
ClickHouse has a much simpler system design - all the nodes in a cluster have equal functionality and use only ZooKeeper for coordination. We built a small cluster of several nodes to start kicking the tires, and found the performance to be quite impressive and true to the results advertised in the performance comparisons of analytical DBMS, so we proceeded with building a proof of concept. The first obstacle was a lack of tooling and the small size of the community, so we delved into the ClickHouse design to understand how it works.
ClickHouse doesn’t support ingestion from Kafka directly, as it’s only a database, so we wrote an adapter service in Go. It read Cap’n Proto encoded messages from Kafka, converted them into TSV, and inserted into ClickHouse over the HTTP interface in batches. Later, we rewrote the service to use a Go library using the native ClickHouse interface to boost performance. Since then, we’ve contributed some performance improvements back to the project. One thing we learned during the ingestion performance evaluation is that ClickHouse ingestion performance highly depends on batch size - the number of rows you insert at once. To understand why, we looked further into how ClickHouse stores data.
The most common table engine ClickHouse uses for storage is the MergeTree family. It is conceptually similar to LSM algorithm used in Google’s BigTable or Apache Cassandra, however it avoids an intermediate memory table, and writes directly to disk. This gives it excellent write throughput, as each inserted batch is only sorted by “primary key”, compressed, and written to disk to form a segment. The absence of a memory table or any notion of “freshness” of the data also means that it is append-only and data modification or deletion isn’t supported. The only way to delete data currently is to remove data by calendar months, as segments never overlap a month boundary. The ClickHouse team is actively working on making this configurable. On the other hand, this makes writing and segment merging conflict-free, so the ingestion throughput scales linearly with the number of parallel inserts until the I/O or cores are saturated. This, however, also means it is not fit for tiny batches, which is why we rely on Kafka and inserter services for buffering. ClickHouse then keeps constantly merging segments in the background, so many small parts will be merged and written more times (thus increasing write amplification) and too many unmerged parts will trigger aggressive throttling of insertions until the merging progresses. We have found that several insertions per table per second work best as a tradeoff between real-time ingestion and ingestion performance.
The key to table read performance is indexing and the arrangement of the data on disk. No matter how fast processing is, when the engine needs to scan terabytes of data from disk and use only a fraction of it, it’s going to take time. ClickHouse is a columnar store, so each segment contains a file for each column, with sorted values for each row. This way whole columns not present in the query can be skipped, and then multiple cells can be processed in parallel with vectorized execution. In order to avoid full scans, each segment also has a sparse index file. Given that all columns are sorted by the “primary key”, the index file only contains marks (captured rows) of every Nth row in order to be able to keep it in memory even for very large tables. For example the default settings is to make a mark of every 8,192th row. This way only 122,070 marks are required to sparsely index a table with 1 trillion rows, which easily fits in memory. See primary keys in ClickHouse for a deeper dive into how it works.
When querying the table using primary key columns, the index returns approximate ranges of rows considered. Ideally the ranges should be wide and contiguous. For example, when the typical usage is to generate reports for individual zones, placing the zone on the first position of the primary key will result in rows sorted by zone in each column, making the disk reads for individual zones contiguous, whereas sorting primarily by timestamp would not. The rows can be sorted in only one way, so the primary key must be chosen carefully with the typical query load in mind. In our case, we optimized read queries for individual zones and have a separate table with sampled data for exploratory queries. The lesson learned is that instead of trying to optimize the index for all purposes and splitting the difference, we have made several tables instead.
One such specialisation are tables with aggregations over zones. Queries across all rows are significantly more expensive, as there is no opportunity to exclude data from scanning. This makes it less practical for analysts to compute basic aggregations on long periods of time, so we decided to use materialized views to incrementally compute predefined aggregations, such as counters, uniques, and quantiles. The materialized views leverage the sort phase on batch insertion to do productive work - computing aggregations. So after the newly inserted segment is sorted, it also produces a table with rows representing dimensions, and columns representing aggregation function state. The difference between aggregation state and final result is that we can generate reports using an arbitrary time resolution without actually storing the precomputed data in multiple resolutions. In some cases the state and result can be the same - for example basic counters, where hourly counts can be produced by summing per-minute counts, however it doesn’t make sense to sum unique visitors or latency quantiles. This is when aggregation state is much more useful, as it allows meaningful merging of more complicated states, such as HyperLogLog (HLL) bitmap to produce hourly unique visitors estimate from minutely aggregations. The downside is that storing state can be much more expensive than final values - the aforementioned HLL state tends to be 20-100 bytes / row when compressed, while a counter is only 8 bytes (1 byte compressed on average). These tables are then used to quickly visualise general trends across zones or sites, and also by our API service that uses them opportunistically for simple queries. Having both incrementally aggregated and unaggregated data in the same place allowed us simplify the architecture by deprecating stream processing altogether.
Infrastructure and data integrity
We started with RAID-10 composed of 12 6TB spinning disks on each node, but reevaluated it after the first inevitable disk failures. In the second iteration we migrated to RAID-0, for two reasons. First, it wasn’t possible to hot-swap just the faulty disks, and second the array rebuild took tens of hours which degraded I/O performance. It was significantly faster to replace a faulty node and use internal replication to populate it with data over the network (2x10GbE), than to wait for an array to finish rebuilding. To compensate for higher probability of node failure, we switched to 3-way replication and allocated replicas of each shard to different racks, and started planning for replication to a separate data warehouse.
Another disk failure highlighted a problem with the filesystem we used. Initially we used XFS, but it started to lock up during replication from 2 peers at the same time, thus breaking replication of segments before it completed. This issue has manifested itself with a lot of I/O activity with little disk usage increase as broken parts were deleted, so we gradually migrated to ext4 that didn’t have the same issue.
At the time we relied solely on Pandas and ClickHouse’s HTTP interface for ad-hoc analyses, but we wanted to make it more accessible for both analysis and monitoring. Since we knew Caravel - now renamed to Superset - from the experiments with Druid, we started working on an integration with ClickHouse.
Superset is a data visualisation platform designed to be intuitive, and allows analysts to interactively slice and dice the data without writing a single line of SQL. It was initially built and open sourced by AirBnB for Druid, but over time it has gained support for SQL-based backends using SQLAlchemy, an abstraction and ORM for tens of different database dialects. So we wrote and open-sourced a ClickHouse dialect and finally a native Superset integration that has been merged a few days ago.
Superset has served us well for ad-hoc visualisations, but it is still not polished enough for our monitoring use case. At Cloudflare we’re heavy users of Grafana for visualisation of all our metrics, so we wrote and open-sourced a Grafana integration as well.
It has allowed us to seamlessly extend our existing monitoring dashboards with the new analytical data. We liked it so much that we wanted to give the same ability to look at the analytics data to you, our users. So we built a Grafana app to visualise data from Cloudflare DNS Analytics. Finally, we made it available in your Cloudflare dashboard analytics. Over time we’re going to add new data sources, dimensions, and other useful ways how to visualise your data from Cloudflare. Let us know what you’d like to see next.
Does solving these kinds of technical and operational challenges excite you? Cloudflare is always hiring for talented specialists and generalists within our Engineering, Technical Operations and other teams.