LinkedIn link GitHub link

Introduction to Apache Druid

Today we will cover a time series solution that’s used by the giants - Apache Druid.

Meet the Celt

After a series of smaller technologies, we tackle today a much bigger contender. We probably should not even call it that, because Druid is now a standard piece of stack for many big names around the world. Despite it being that popular, it’s still a little overshadowed by elder siblings, and we believe that it does not get its rightful place in the spotlight. Today we will make our case for Apache Druid. Hope you will grow to love it as well!

One Flew Over the Stonehenge

Ok, so what exactly is Druid? This time around the answer will be pretty straightforward - it’s a time series analytics database. There. Now, if this is all it is, then why all the fuss? We already have at least 20 of those around, right? Well, there are a few things that make Druid stand out:

  • it’s more oriented towards analytics than storage (think OLAP, not OLTP)
  • it’s resilient to the point of ridicule (exactly-once delivery from sources, data replication, data immutability, components are standalone microservices, etc.)
  • it can scale up individual modules (e.g. just the ones serving queries or ingesting data)
  • it’s very fast (does not have to “fully” process data before serving it, data in columnar format with pre-generated reverse indexes)
  • it allows a seamless switch between data versions
  • it provides a set of very efficient algorithms for working with approximate metrics
  • it ingests data from existing sources in a “pull” fashion

Those were the things that we would count as pros, but there are also some cons, or rather considerations:

  • it’s not designed to handle updates efficiently, so don’t expect miracles if trying to use it as a regular RDBS
  • your data needs to be associated with a timestamp
  • you might be forced to use only approximate metrics when dealing with high-cardinality columns
  • you will still need to keep a copy of your data (although compressed beyond comprehension)
  • joins are very inefficient, best to not use them at all

As we can see, Druid is a bit of a prima donna. It can perform some amazing things, but we must be very careful as to what we ask for. All those caveats are justified. It will soon become clear why do we need to have them when we will visit the architecture overview.

Data Under the Knife

To be able to fully comprehend what happens under Druid’s hood, we need to first understand its data model. Druid needs to ingest data from external sources (streaming or batch). It copies, processes and stores everything that it pulls. The main function of this particular piece of technology is aggregating large quantities of data and enabling fast and meaningful queries. According to study: In the tests, Druid outperformed Presto from 10X to 59X (a 90% to 98% speed improvement) and Hive by over 100X. As the authors declare, it enables us to ingest millions of events/sec, retain years of data, and provide sub-second queries. That’s pretty fast and quite a lot.

Obviously, it’s impossible to reach those parameters if we would just try weaving streams of data or dig through HDFS. Although we might think that aggregating data from external sources makes it “less of a DB”, Druid is very intertwined with its data. It’s the way the data is prepared that allows for anything else to happen. It will all become clear soon (we hope).

First, let’s introduce the concept of segment. A segment is a chunk of data that has been ingested from one data source within a set amount of time (configurable). We can imagine it can hold data similar to the following:

 Timestamp            | Name         | Ship Name       | # Cheated Death 
----------------------|--------------|-----------------|-----------------
 2021-01-01T01:01:10Z | Sam Bellamy  | Whydah          |        2          
 2021-01-01T01:01:20Z | Edward Teach | Queen Anne's R. |        4          
 2021-01-01T01:01:30Z | John Rackham | Kingston        |        3  

Here we have 3 types of columns - timestamp, dimension, and metric. There is only one primary index in Apache Druid and that is the “Timestamp” column. It is possible to introduce secondary indexes by utilizing dimension columns, but more often it’s easier just to fine-tune the granularity of the segment itself.

Dimension columns hold the bulk of our data (“Name” and “Ship Name”). As opposed to traditional RDBS, all of those are kept by default as strings. It is still possible to add numeric types (and newer versions of Druid assign long types happily in the schema), for example when adding secondary indexes, but this should be done just when we are sure that it will help our performance.

An important question would be - why do we want to keep everything as string? Would it not be very inefficient? Well, not necessarily. Internally, Druid won’t keep the data in such plain form, but first, proceed to map all distinct values into integers and keep only columns of integers and then for each value create a bitmap that shows in which rows we can find those specific values. Since an individual value won’t be usually present in too many rows, the bitmaps themselves can be compressed like any regular sparse vector. Now we have two types of queries covered - we still keep our timestamp ordering, so ranges are trivial, and we can do quick searches using bitmap AND/OR operations without scanning through all the rows.

Some of our more vigilant readers might start to raise objections right about now. How is all this possible if we want to append data in real-time? Wouldn’t we need to recalculate all this stuff each time we add a new row? Only the “Timestamp” column is unique, right? Well, you would be right to protest. We cannot do this if we append the data. All the things that we mentioned are being done only to immutable data. Druid keeps “uncommitted” segments in a completely different place and they are being queried more traditionally. We will cover it a bit further when introducing the distinction between a Historical and a MiddleManager.

One more thing that is also worth mentioning at this point is that keeping everything as a string is a double-edged sword. In cases where our data is of high cardinality (a lot of distinct values - telephone numbers, URLs, etc.) performance can degrade very quickly. To avoid this we can rely on sketches. Those are techniques that trade some information for more compact storage (e.g. count-distinct and quantile computation).

This brings us back to our original topic because we still have the metric column type left. Metric columns are completely optional and their existence usually depends on whether one wants to keep every distinct row or we can rollup some information and preserve only approximate values. At first, we might be reluctant to throw away our precious data, but for some types of queries an approximate result is as good as an exact one (do you care if you have 100001 or 100002 users per hour?). By doing a rollup and defining some good metrics we can reduce the size of a segment file and make Druid respond even faster.

Now that we understand the shape of the data it’s a bit easier to understand how Druid can fulfill all the aforementioned requirements. Still, there are a lot of things missing from the picture. We have the most important concepts mostly covered, but now let’s take a tour of how is this all structured to provide an end-to-end functionality.

Mysteries of the Architecture

There are quite a few moving parts in a full Druid deployment. All of them can be configured and scaled independently. Instead of going through each component in a list and trying to remember everything let’s start from the recommended server split - the Master/Query/Data architecture.

We already touched data a bit and mentioned two components there - the Historical and MiddleManager. Those two work in conjunction to ingest and store data. They sit on top of what we call deep store. This is the place that holds all of our immutable, already processed information. It’s completely external to Druid, and we can have different implementations - local file system, HDFS, AWS S3, MS Azure, or GCP. Data that we use is replicated in segments across Historical nodes and is ready to be queried. Before it gets there, however, it must be ingested and that’s the role of MiddleManager. A MiddleManager node spawns a number of Peons, which are just JVM-based tasks. Since Druid can consume data from real-time sources or from persistent storage it may be possible that during the ingestion we still do not possess all the data necessary to create a full segment. During this collection phase, MiddleManager takes over part of Historical responsibilities and queries data “as it is”. The segment size is limited, so if we choose the right time granularity, this “unstable” part of data should be always relatively small and fast to query. Both in MiddleManager and in Historical all the data sits always in memory, so there is no need for any disk access (apart from a situation in which a node is restored from the deep store after failure). Once the segment is complete, it gets “committed”, which means written to deep store and loaded into a Historical.

There is one more part of persistence that comes into place here - the metadata store. It’s a separate entity from the deep store. Its purpose is mainly to keep track of all the segments that are currently committed to deep store, which of those are in use, and in what versions. Metadata store is just a simple relational database, like MySQL or PostgreSQL.

In a normal situation, Historicals, MiddleManagers, and Peons are all the components that come into play when dealing with data, but there is one more alternative option. It is possible to exchange MiddleManagers+Peons for an Indexer. Since those are still experimental, we won’t cover them, but you can read more here.

Ok, we have this part covered, but now a few new questions most likely arise. How is it possible to orchestrate all those Historicals and MiddleManagers? How do we know which segment should be held by which Historical or even how many of those should there be? How do we assure that a new node is started in case one goes down?

The answer to the last question is probably the simplest - Druid relies heavily on Apache Zookeeper. This is pretty much an industry standard at the moment, but if you never had the chance to get to know it better, then maybe it’s a good opportunity to read a bit about ephemeral znodes. Zookeeper allows us to keep a consistent cluster state by avoiding split-brain scenarios and facilitates node discovery.

As for the rest of the questions, the answer to all that lies in the Master group. Here we have two types of services - Coordinators and Overlords.

The aptly named Coordinators do just that - they coordinate segment distribution. This type of service does a periodical check of cluster state and decides which segments should be loaded into which Historical node. In some sense, Historicals are “dumb” about the content that they serve. Coordinators assure that each segment is replicated a specified number of times and that the data is evenly distributed (also regarding capacity). All the communication between Historical and Coordinators happens on a dedicated load queue over Zookeeper to assure that no information is lost due to network/server issues. Over each run Coordinator will try to compact existing small segments, so even if we do not choose the right time granularity, it might not be the end of the world, because it can be optimized on this level. Clients never directly reference Coordinators in any way, this is a completely orthogonal mechanism, which means that it can be scaled up and down independently, according to our needs.

Overlords are to MiddleManagers what Coordinators are to Historicals. Their responsibility is to manage the ingestion process, but what they actually do depends on the chosen mode. When using local mode, Overlords themselves are responsible for creating Peons (so essentially for spawning ingestion tasks). In remote mode, which will be probably the most common case for a production deployment, MiddleManagers are managing their Peons on their own and an Overlord job is just to control MiddleManager population and dispatch ingestion requests.

Great, now we have a fully functional database that can pull data from various sources. It’s highly efficient, resilient, and we probably would like to retrieve some data from it at this point. This brings us to the last group - Query.

A Query group will consist of some Broker services and optionally some Router services. Stuff is not overly complicated here. A Broker receives queries from clients, forwards them to Data nodes (Historicals and/or MiddleManagers), and then aggregates and sends back the results. Since Historicals announce on Zookeeper which segments are available, there is no need for a broker to contact a Coordinator.

Router service is an optional process, but in any non-trivial deployment, it will be something very desired. They decouple clients from concrete Broker addresses and serve as a gateway to the whole Druid cluster.

That’s it. This is how Druid works internally, and now we are ready to run some real examples.

First Sacrifice

Running a standalone deployment is fairly easy. Just download the binaries from here, unpack, go into the bin directory and run ./start-micro-quickstart. Keep in mind that Druid is built for JDK 8, so you might want to temporarily switch if you have a newer version. If you want to try running anyway with JDK 9+, then please first execute export DRUID_SKIP_JAVA_CHECK=1.

After a few seconds, you should be able to go to http://localhost:8888 and if you were lucky enough to not have anything else bound on this port, then you should see the console. In case you have something already occupying default ports, you need to execute export DRUID_SKIP_PORT_CHECK=1, then go to /conf/druid/single-server/micro-quickstart directory and go through all sub-directories checking runtime.properties files for the offending druid.plaintextPort entry.

If you go to the Services tab, you will be able to see all the processes that we covered before. We have one service of each kind. One thing worth noticing is that the one running with type router is bound to port 8888. This is the port of the console - as we mentioned, our Router is the entry point of the whole cluster.

Please switch now to the Load data tab, click on Example data, and then Load example on the right side (Wikipedia Edits should be preselected).

We will end up in a new view with some data visible and the Connect tab in the Connect and parse raw data group selected. You can familiarize yourself with the data format, but it’s not particularly important. What we can do now is to click Edit spec under Verify and submit. This small JSON is the configuration of our ingestion. There’s not that much here yet, but we can already see that we have segmentGranularity chosen to be day. The type index_parallel option is relevant only to native batch uploads and it specifies if the ingestion task can be multithreaded.

Ok, let’s try ingesting something. First, go back to the Connect tab. You should see a Next: Parse data button in the lower right. It corresponds to the next tab after Connect - Parse data. We will go from left to right through all the necessary steps. Please click the button now.

You should see a presentation of how we will parse the final data set. There is an interesting option here on the right, called Add column flattening. Druid does not support nested schema and if we would try to save some complex objects as strings then we would end up with a high cardinality column. If the data has some regular format (which is usually the case) then we can extract individual values and flatten out the structure. This should help to speed things up because we are always limited by the cardinality of individual columns, not of column sets.

Let’s click the Next: Parse time button. Here we can choose which column we want to use as our primary index. As we explained before, it always needs to be a timestamp. This will be used not only as a basis for queries but also to split up data into segments. Since we don’t have much choice, we will just go through with the default column. Press Next: Transform now.

Here we can see another minor feature of Druid. What we can do at the moment we ingest the data is to create derivative columns from the data that we pulled in. Please click on Add column transform and type in added + delta in the Expression field. Let’s name the output as addedAndDelta and click Apply. If you scroll right through the columns in the main view, you should see a preview of our new column (it might be slow to populate at first). Now, please click the Next: Filter button.

In this section, we can see a mechanism that allows us to filter out rows of data. It’s always more efficient to reduce the data size at the point of ingestion instead of transforming everything, so if there is a possibility to reduce your dataset (and in most cases we have more data than we need), then you should definitely do it.

We can create filters by two methods here. First, let’s try adding it by JSON. Copy and paste the following to the Filter text box:

{
  "type": "selector",
  "dimension": "isRobot",
  "value": "false"
}

If you click Apply there should be considerably fewer results in the main window now. The second method to create a filter is by clicking on Add column filter and then choosing the appropriate type, dimension and value. Let’s proceed with the current filter setup by clicking Next: Configure schema.

This might be one of the most important steps. First, we can see that Druid has assigned some types to our columns. If you have the same version as we, you will probably see only longs and strings. We can change those by clicking on the header and modifying the options to the right. Since numeric types have high cardinality it makes sense to keep them as longs, but if you have only a few possible numbers in a given column, you might want to change even this to string. Druid does not support a separate type for boolean values, since those can be efficiently represented using a string with only two possible values. Among other options, we can choose to disable the creation of a bitmap index. Be very careful with this one. It basically would mean that Druid has to perform a full table search each time you want to query for this particular column. It might make sense if you have columns in which there are almost no duplicate values, but in such a case you probably should also avoid building queries against those particular columns.

Let’s go back to the main options, by clicking Cancel if you have column options open. We said that this is one of the most important steps. Why? Well, because we can apply Rollup here.

First, please disable the Explicitly specify dimensions list option. It will allow Druid to automatically detect which columns should be the dimensions (fields that you will group and filter on). The rule here is very simple - every string column is a dimension.

Next, please enable the Rollup option. You will see that all your long columns (except timestamp) have changed their color. Those will be treated as metrics, which means that we will aggregate by those fields when performing rollup.

Below we can choose the Query granularity. If you change the values in this dropdown, the timestamp column will get truncated accordingly. What will happen when we finally apply the rollup, is that we will lose the possibility to distinguish our data on a timescale lower than what we chose here. Please select minute and click Next: Partition.

Here we can see the options that allow us to control our segment size. Depending on how much data comes from our sources, we might want to choose higher or lower granularity for segments. In case we choose to have high granularity and there is not enough data, we will end up with a lot of very small files, which will reduce our performance. If we choose too low granularity and there is too much data, we will bloat our file, and query time will drop as well. For our test data set, we can safely go with the default options. Please click Next: Tune.

On this screen, we can see a multitude of options that can control the way our services work. For now, we don’t have anything that we would like to change, so please click Next: Publish.

The current screen gives us several options to control error logging. We don’t have anything to do here either. Please click Next: Edit spec.

Finally, we are at the end of the process. We can look at our spec definition in JSON here. If we would choose not to use GUI, we would have to create this file ourselves. Please, feel free to click Submit any time.

You should be thrown into the Tasks view, where you can see the ingestion process running. In a short time, it should finish (hopefully without issues). We will choose the Segments tab from the main navbar now. Here we can see that we have one new segment which should be of size just over 4MB.

Please go now to Query from the main navbar, paste SELECT * FROM wikipedia into the upper-right panel, and click on RUN. What you will see is the data after rollup was applied. The timestamp column is truncated to minute level and in each minute entries with the same dimensions are squeezed into one row, with metrics adjusted accordingly.

As we can see, it’s fairly easy to fine-tune a level of detail that suits our needs best. This can happen both on the segment and row level. In any case, Druid will optimize the data set and it can even try to automatically reshuffle segments if those are too fragmented.

Drilling Through the Heart

The last thing that we will try is a little bounce-back to one of our previous posts. In this entry, we occupied ourselves with Apache Drill. If you still did not check it out, then please go take a quick look.

As of today, the current Druid version (0.22.1) is not supported by its dedicated plugin in Apache Drill (1.19.0). The reason being, that up to version 0.17.0 Druid included two types of queries - either Select or Scan. Since the latter was superior regarding memory and performance the former was removed. druid plugin still relies on Select and as such it’s not possible to use it, which is a shame. Since Druid itself is still a bit of a fresh thing (as indicated by its versions number) we hope this will get fixed in the future. In cases like that developers usually prioritize the features that have the most impact on the community, so the more interest is generated in both of those projects, the better the chances that the plugin will get repaired.

Leaving Avalon

It is time to wrap up our journey with Druid. I hope you enjoyed it as much as we did. From our perspective, it’s definitely worth checking out this piece of technology. Chances are that you will have to in the future anyway since its adoption is pretty widespread at this point. Maybe you already are using Druid? In any case, thank you for your time and we hope that you will continue to check out our tutorials!