Feeding Kafka data into Druid with Tranquility

I noticed that the demo code for the Tranquility library seems to assume that you will have Samza or Storm in front of your Kafka feed. We don’t have one, we’re just feeding event data straight into Druid from Kafka. As I understand it, the Tranquility library just acts as an Overlord client. So, it should be possible to do something like this:

//Create service

Service<List<Map<String, Object>>, Integer> druidService = DruidBeams.buildJavaService();

while(true){

List<Map<String,Object>> listOfEvents = getEventsFromKafka();

Future numSentFuture = druidService.apply(listOfEvents);

}

Sounds simple enough, but I have a couple of concerns about this process. First, we can potentially ingest hundreds of thousands of events per minute from Kafka so indexing might take a long time. I’ve indexed about 10 million records with a batch task, and it took hours unless I used Hadoop. How fast will realtime indexing be able to process data?

Second, the realtime indexing task example just seems to turn a middle manager into a realtime node. Why is this any different from a normal realtime node? Can we use Hadoop index tasks to process realtime data in batches from Tranquility and avoid the need for a middle manager node? It seems like it would be somewhat more reliable than just another realtime node.

Hey Taylor, yeah, that would work. You could also use the BeamPacketizer class from Tranquility if it makes your life a little simpler (you can send one message at a time and it do batching for you).

Each realtime task should be able to ingest 1000–10000 messages/sec depending on the complexity of your data. If you need a higher ingestion rate then you can ask tranquility for more “partitions” and it will spawn more tasks. Each middle manager can potentially run multiple tasks based on what you set its capacity to be. I think budgeting 1–3 CPUs (depending on query load) and 5–6GB of memory per task is about right. About half the memory should be used for the java heap, and the other half left free to be used for off heap stuff.

Tranquility always spawns realtime tasks- you can’t use it with hadoop. The difference between what tranquility does and what standalone realtime nodes do is that tranquility is push-based, supports replication, and can make it easier to manage parallelism and schema rollover (with standalone realtime nodes, you create a slightly different specFile on each node and update them all when you want to reconfigure- with tranquility you have a single config shared across all clients).

Could you explain how a realtime task differs from a Hadoop task aside from the fact that one uses Hadoop and one doesn’t? The documentation isn’t really all that clear; the example given is a Kafka-based realtime indexing task but Tranquility won’t do that (I think) because it’s actually sending data from Kafka to the Overlord node.

I haven’t heard back from anyone yet, but I’ve been poking around in the source code. As far as I know, you can’t upload data directly to any other kind of task except realtime. The normal and Hadoop indexing jobs seem to require that the files exist on the filesystem already. There’s also the fact that the Hadoop indexer requires fixed sets of data; MapReduce can’t work if the data set is changing during its run. To use it, you’d need to batch up index jobs into fixed time periods or amounts, and you’d lose the ability to see that data in true realtime.

So, I guess it makes sense that the realtime indexing job would be required for, well, realtime data. The part I haven’t figured out on my own is how the realtime job handles segment merging. If my segmentGranularity is set to HOUR, a realtime node won’t hand it off to deepStorage until it’s completely fallen out of the current window. For instance, if the realtime node starts capturing data at 9:00, the segment won’t be handed off to deep storage until 10:00.

The realtime indexing tasks won’t have this luxury, so how does the segment they’re working on get updated?

Hi Taylor, apologies for the delayed response. The realtime index task with tranquility requires events to be pushed over HTTP, which Tranquility helps to manage. The hadoop index task acts as a driver and kicks off a job to an existing Hadoop cluster. Druid uses MVCC for segments, and the indexing service additionally provides locking over a certain time interval (so realtime and batch tasks don’t clobber each other). Segments are immutable once created, so to update data in a segment, you have to run a batch index task to rebuild a segment for an interval.

Hey Taylor,

You’re right- the realtime tasks are the only ones that can accept data directly.

I don’t think I understand what you mean by “how the realtime job handles segment merging.” Can you go into a little more detail on what you’re wanting to learn?

Let’s say I have two realtime tasks that have hourly segmentGranularity. If I make a Tranquility client feed data to it, the middle managers are going to have to write segments as they come in. They can’t wait till the full hours worth of data has come in. They also have to somehow coordinate with the rest of the system so multiple realtime tasks can operate on the same segment.

Segments are immutable so the realtime tasks can’t be simply appending to the same segment. But somehow it’s possible for the two realtime tasks to make that data available immediately. How does this work?

The only explanation I had was that the realtime tasks periodically merge (with copy-on-write since segments are immutable) in-memory data into a single segment in deep storage. From your responses, I take it that isn’t right and I’m missing something important. The documentation is pretty light on how realtime tasks actually work, so I’m really curious.

Ah okay. It’s actually only possible for one task to write to each segment. If you have multiple realtime tasks, either they’re replicas (in which case they’re writing two copies of the “same” segment, and only one of them will eventually end up in deep storage) or they’re not replicas (in which case they’re writing to different segments). It’s possible to have multiple different segments for each segmentGranularity, they’ll just have different partition numbers.