Ingestion speed on large amounts of sparsely timestamped data.

Hi,

We have a system that sends us about 50.000.000 unique events every 15 minutes (all have the same timestamp). That should be considered as after roll-up value, in Druid terms. We’re investigating the best way to get this data into Druid.

At first, we tried using the “index” task, but found out it was way too slow for this (it took more than 1 hour to ingest… 1 hour).

Then we turned to submitting a “index_realtime” task and pushing data through HTTP. That was much better, as we went down to ~30min per hour, but still too slow. Surprisingly, sending with more connections in parallel does not seem to offer any significant speedup.

We’re doing the tests on a Intel i7-6700 machine (8 cores) with 32GB of RAM and two SSD drives in RAID0. When the ingestion is taking place, we see that the load avg. is about 4 and there’s at most ~100MB/s IO, so there seems to still be plenty of power to use. Is there any way to tune a single “index_realtime” task to use more resources (I will attach the Overlord configs, etc. below)? Or is spawning more tasks in parallel the only way? The latter is cumbersome, as a single task working is currently using >10GB of RAM, so we’re pretty short on that.

One other problem we’ve been facing is that sometimes there are errors saying “Tried to write [<some_number>] bytes, which is too much”. As far as we understand, that’s because there are too many rows on the same timestamp. We think that sharding can help us here, is this correct?

Any general advices on handling such type of data?

Configs: https://gist.github.com/KenjiTakahashi/906998772f65cd3e68501ad6255a3c2c . That’s running Druid 0.9.0.

Let me know if we can supply any more information.

Thanks!

Hey Karol,

Parallel tasks is the way to get better use of resources. Each task has only a single thread devoted to indexing. If you’re using Tranquility then you can set the parallelism through “task.partitions” (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

10GB of RAM per task sounds like a lot, you should be able to get by with a GB or two of heap and a GB or two offheap. That 8 core, 32GB machine should be able to run 4–8 tasks.

That should also help with the “wrote too many bytes” errors; those happen when output columns are too large (no single column can be more than 2GB).

Thanks Gian for the answer. Sorry for long delay, we've been investigating
based on your response.

It seems that java is just hungry for RAM [surprise ;-)!], if we put a
lower limit, it is still able to keep up without using that much memory. We
were able to run 4 tasks on the above machine and cut the ingestion time to
about 6-7 hours per day, which is a significant improvement.

Still looking for more optimizations, if anyone has any ideas :-).

Partitioning (aka sharding) indeed helps with "too much bytes" errors, but
it makes HDD space usage a lot less efficient. We think that partitioning
on IDs (that's the only "dimension" we have in this data) would yield
better results, than on timestamps (which - like I wrote - are pretty
sparse).

We are not using Tranquility yet, we're mostly waiting for "windowPeriod"
removal [Also not enough time to transition current setups...] :-). For
now, we've created our own "poor man's Tranquility", which uses
"index_realtime" tasks directly.

I have a couple of implementation questions, if you don't mind. Hope you
can help :-).

1. Is there a way to put CSV formatted data to "index_realtime" endpoint,
instead of JSON? We've tried setting the same parserSpec that works with
"index" task, but it still says it expects JSON.
2. How to find out which port is assigned to which task?
We know this is stored in zookeeper, but IDs in zookeeper are different
than task IDs in Overlord.
Looking at Tranquility code, it seems to be using "runningTasks" endpoint
on Overlord and a "location" property of task. But when we query this,
there's no "location" property. Is there anything else to setup to get
these or am I misunderstanding the code? Right now we've come around this
by setting separate "serviceName" for each task, but that bloats zookeeper
a lot.
3. As far as we understand, there's currently no way to finish a task,
other than by using the "timed" firehose, right? Any plans/ETAs on changing
that?

Thanks again!

see Inline

EventReceiverFirehose only supports JSON formatted data at present.

Current master has a way to discover task port using overlord HTTP Api to make task discovery simpler. ( https://github.com/druid-io/druid/pull/2419 )

We recently added a way to manually specify shutdown time to event receiver firehose. ( https://github.com/druid-io/druid/pull/2803 )

We hope to release 0.9.1-rc1 in a week or two, till then you can try building a release from current master and try it out.

These changes look really good, will simplify our logic quite a bit.
Looking forward to the next release :-).