HDFS handover problems in realtime indexing

Hi there,
we are running realtime indexing tasks on the Druid Indexing-Service, deployed with several Middlemanagers and one Overlord. All indexing tasks consume data from Kafka over the kafka firehose and store the indexed data into HDFS as the configured deep storage.
This worked well for the last month, now as the load increases on one of the indexing tasks (about 3GB per hourly segment on Druid), we observe a very strange behaviour:

Even though the indexing task is working fine on its own Middlemanager (which is not really high loaded, as the load avg. is at 2 on an r3.2xlarge AWS machine), the handoff to HDFS stopped working.
This means, that - on the Middlemanager - the druid.indexer.task.baseTaskDir fills up with temporary index data that is not handed off to HDFS.

In baseTaskDir/$indexing_job_id/work/persist/$data_source/, the following files are accumulating:

drwxr-xr-x 20 root root 4096 Jul 16 01:45 2015-07-15T22:00:00.000Z_2015-07-15T23:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 03:50 2015-07-16T00:00:00.000Z_2015-07-16T01:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 06:40 2015-07-16T02:00:00.000Z_2015-07-16T03:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 09:29 2015-07-16T03:00:00.000Z_2015-07-16T04:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 08:01 2015-07-16T04:00:00.000Z_2015-07-16T05:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 10:48 2015-07-16T05:00:00.000Z_2015-07-16T06:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 13:14 2015-07-16T06:00:00.000Z_2015-07-16T07:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 14:22 2015-07-16T07:00:00.000Z_2015-07-16T08:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 15:29 2015-07-16T08:00:00.000Z_2015-07-16T09:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 12:01 2015-07-16T09:00:00.000Z_2015-07-16T10:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:57 2015-07-16T10:00:00.000Z_2015-07-16T11:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 20:06 2015-07-16T11:00:00.000Z_2015-07-16T12:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 18:35 2015-07-16T12:00:00.000Z_2015-07-16T13:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 14:02 2015-07-16T13:00:00.000Z_2015-07-16T14:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 15:02 2015-07-16T14:00:00.000Z_2015-07-16T15:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:02 2015-07-16T15:00:00.000Z_2015-07-16T16:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 17:02 2015-07-16T16:00:00.000Z_2015-07-16T17:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 18:03 2015-07-16T17:00:00.000Z_2015-07-16T18:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 20:06 2015-07-16T18:00:00.000Z_2015-07-16T19:00:00.000Z/
drwxr-xr-x 19 root root 4096 Jul 16 20:03 2015-07-16T19:00:00.000Z_2015-07-16T20:00:00.000Z/
drwxr-xr-x 5 root root 4096 Jul 16 20:11 2015-07-16T20:00:00.000Z_2015-07-16T21:00:00.000Z/

As long as the Middlemanager continues with the indexing task, queries on this task are answered completely. The segments are also visible in the coordinator console, but with 0Bytes of size, as the segments are only stored at the middlemanager and not in the historical nodes. Thus the system is functional, until the middlemanager runs out of disk space for the baseTaskDir.
However, if we restart the task (in order to try to fix the handoff to HDFS), the indexed segments are dropped in the coordinator console and not queryable anymore.

Question 1:
What keeps the druid middlemanager away from pushing the indexed segments to HDFS and advertising them to zookeeper?

Question 2:
After a restart of the middlemanager, the old indexed data (that seems to be gone in the coordinator console) is still available in the basetaskdir. How am I able to push and advertise the missing segments to the historical node via HDFS manually?

Thanks in advance,
Constantin

Are there any exceptions or interesting error messages in the overlord, middle manager or task log?

– Himanshu

No, too badly there are no ERROR or WARNING entries in even one of the logs. I may post the indexing task definition and the configuration for the middlemanager / overlord / coordinator here, but I would be generally interested in getting the indexed data back into the system again. Is this possible at all? The only thing that was not working was the upload to HDFS and the advertising of segments to zookeeper. Can you handle that somewhere outside a regular realtime job?

Hi Constantin, thoughts inline:

Hi there,
we are running realtime indexing tasks on the Druid Indexing-Service, deployed with several Middlemanagers and one Overlord. All indexing tasks consume data from Kafka over the kafka firehose and store the indexed data into HDFS as the configured deep storage.
This worked well for the last month, now as the load increases on one of the indexing tasks (about 3GB per hourly segment on Druid), we observe a very strange behaviour:

Even though the indexing task is working fine on its own Middlemanager (which is not really high loaded, as the load avg. is at 2 on an r3.2xlarge AWS machine), the handoff to HDFS stopped working.
This means, that - on the Middlemanager - the druid.indexer.task.baseTaskDir fills up with temporary index data that is not handed off to HDFS.

In baseTaskDir/$indexing_job_id/work/persist/$data_source/, the following files are accumulating:

drwxr-xr-x 20 root root 4096 Jul 16 01:45 2015-07-15T22:00:00.000Z_2015-07-15T23:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 03:50 2015-07-16T00:00:00.000Z_2015-07-16T01:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 06:40 2015-07-16T02:00:00.000Z_2015-07-16T03:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 09:29 2015-07-16T03:00:00.000Z_2015-07-16T04:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 08:01 2015-07-16T04:00:00.000Z_2015-07-16T05:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 10:48 2015-07-16T05:00:00.000Z_2015-07-16T06:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 13:14 2015-07-16T06:00:00.000Z_2015-07-16T07:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 14:22 2015-07-16T07:00:00.000Z_2015-07-16T08:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 15:29 2015-07-16T08:00:00.000Z_2015-07-16T09:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 12:01 2015-07-16T09:00:00.000Z_2015-07-16T10:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:57 2015-07-16T10:00:00.000Z_2015-07-16T11:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 20:06 2015-07-16T11:00:00.000Z_2015-07-16T12:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 18:35 2015-07-16T12:00:00.000Z_2015-07-16T13:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 14:02 2015-07-16T13:00:00.000Z_2015-07-16T14:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 15:02 2015-07-16T14:00:00.000Z_2015-07-16T15:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:02 2015-07-16T15:00:00.000Z_2015-07-16T16:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 17:02 2015-07-16T16:00:00.000Z_2015-07-16T17:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 18:03 2015-07-16T17:00:00.000Z_2015-07-16T18:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 20:06 2015-07-16T18:00:00.000Z_2015-07-16T19:00:00.000Z/
drwxr-xr-x 19 root root 4096 Jul 16 20:03 2015-07-16T19:00:00.000Z_2015-07-16T20:00:00.000Z/
drwxr-xr-x 5 root root 4096 Jul 16 20:11 2015-07-16T20:00:00.000Z_2015-07-16T21:00:00.000Z/

As long as the Middlemanager continues with the indexing task, queries on this task are answered completely. The segments are also visible in the coordinator console, but with 0Bytes of size, as the segments are only stored at the middlemanager and not in the historical nodes. Thus the system is functional, until the middlemanager runs out of disk space for the baseTaskDir.
However, if we restart the task (in order to try to fix the handoff to HDFS), the indexed segments are dropped in the coordinator console and not queryable anymore.

Do you have any of the task logs where handoff was not occurring? Events at the end of the log will be interesting.

Question 1:
What keeps the druid middlemanager away from pushing the indexed segments to HDFS and advertising them to zookeeper?

http://druid.io/docs/latest/ingestion/faq.html. There’s a section there about handoff failing.

Question 2:
After a restart of the middlemanager, the old indexed data (that seems to be gone in the coordinator console) is still available in the basetaskdir. How am I able to push and advertise the missing segments to the historical node via HDFS manually?

Restarting a middle manager fails all tasks. We use replication for HA for middle managers, although https://github.com/druid-io/druid/pull/1521 should make restarting them easier.

Hi Constantin, can you share one of the logs of a task that failed to hand off?

Hi,

I don’t think there is an easy way to do handoff manually. Best way would be to take the raw data ( I hope you tee off your raw events to hdfs in addition to sending to druid) and do the batch ingestion.

Since there are no errors on the overlord/MM, One [good] possibility is that index task was able to push data to deep storage and update db (can you check hdfs and db to ensure that the segments in question exist there or not?), but historicals are not being able to load them for some reason (in that case there would be exceptions in some historical node), such as you have reached your historical nodes max capacity or something. In that case also, index task wouldn’t delete them from their local storage even if the “handoff” has really happened.

– Himanshu

Hi Fangjin, hey Himanshu!
Thanks for the reply first. We will go over the logs and check HDFS to see at which point the handoff was stuck and will then show our findings here.

One more question regarding HA on middlemanagers:

Restarting a middle manager fails all tasks. We use replication for HA for middle managers, although https://github.com/druid-io/druid/pull/1521 should make restarting them easier.

This means, that if a realtime indexing job is executed on several middlemanagers in parallel (without sharding), there is an overwrite / ignore policy between the middlemanagers? I’m curious, because I thought about such a setup too, but dropped it because I didn’t want to risk duplicate datapoints in the indexed data.

Best regards,

Constantin

Hi,

Considering that you are using kafka firehose, You can setup either sharding or replication. If you are already setup to shard, unfortunately, you can’t get replication (this is something we are actively working on and will fix in future versions of kafka firehose).
However, if you are not doing sharding then you can setup for replication and there will be no duplication. Druid takes care of choosing only one of the replicated segments when preparing query results.

Please see http://druid.io/docs/latest/ingestion/overview.html#ingest-from-apache-kafka for better understanding.

– Himanshu

Hi there!
Thanks for your reply Himanshu! And sorry for the last week - unfortunately we were busy with other stuff.

Anyway, we figured out how to get around the problem for us:
As you already specified in the documentation about the Design of Segments we exceeded the recommended segmentsize (300-700 MB) by far (we had about 2-3 GB per segment) so we decreased the segment granularity from “hour” to “fifteen_minute” for our indexing job. Segments are now between 500-750 MB and the issue seems to be fixed with that. Anyway, searching the logs has not brought any presentable output.

“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “fifteen_minute”,
“queryGranularity”: “minute”
}

Back to your last post, Himanshu:

Considering that you are using kafka firehose, You can setup either sharding or replication. If you are already setup to shard, unfortunately, you can’t get replication (this is something we are actively working on and will fix in future versions of kafka firehose).
However,
if you are not doing sharding then you can setup for replication and there will be no duplication. Druid takes care of choosing only one of the replicated segments when preparing query results.

Just to be clear about this: Replication means, that we just start the same indexing task twice (on another middlemanager)? Searching about replication, I have not found a specific value to set in the indexing task, so having two times the exact same indexing task is the way to go - and would be exactly what you mean by “Druid takes care of choosing only one of the replicated segments when preparing query results”?

Thanks again for your great support!

Hi,

For sharding/replication, pls see http://druid.io/docs/0.8.0/ingestion/realtime-ingestion.html#scale-and-redundancy .

Basically 2 segments with same datasource, interval and partitionNumber are assumed to have “same data” and druid will only use one of them while preparing query results.

– Himanshu

Perfect, thank you!

Hi, Himanshu

We met a similar problem. We did observe that index task was able to push data to deep storage and update db. And we found that historicals were busy loading segments(under replicated due to load rule update or historical restart)when this problem occurred. The coordinator seems much busier. It seems to stop emitting metrics. As a result we know little about the loading process. We can do nothing but wait until the load process is done, which may take hours. What’s more, realtime index tasks can not quit because handoff is not over from overlord’s point of view, we have to kill early index tasks when druid cluster is running out of task slots.

The recently generated segments are expected to be loaded with a higher priority (compared to under replicated), right ?

Thanks in advance,
Wong3

在 2015年7月19日星期日 UTC+8上午4:41:57,Himanshu写道:

Hi,

I don’t think there is an easy way to do handoff manually. Best way would be to take the raw data ( I hope you tee off your raw events to hdfs in addition to sending to druid) and do the batch ingestion.

Since there are no errors on the overlord/MM, One [good] possibility is that index task was able to push data to deep storage and update db (can you check hdfs and db to ensure that the segments in question exist there or not?), but historicals are not being able to load them for some reason (in that case there would be exceptions in some historical node), such as you have reached your historical nodes max capacity or something. In that case also, index task wouldn’t delete them from their local storage even if the “handoff” has really happened.

– Himanshu

Hi Constantin, can you share one of the logs of a task that failed to hand off?

No, too badly there are no ERROR or WARNING entries in even one of the logs. I may post the indexing task definition and the configuration for the middlemanager / overlord / coordinator here, but I would be generally interested in getting the indexed data back into the system again. Is this possible at all? The only thing that was not working was the upload to HDFS and the advertising of segments to zookeeper. Can you handle that somewhere outside a regular realtime job?

Are there any exceptions or interesting error messages in the overlord, middle manager or task log?

– Himanshu

Hi there,
we are running realtime indexing tasks on the Druid Indexing-Service, deployed with several Middlemanagers and one Overlord. All indexing tasks consume data from Kafka over the kafka firehose and store the indexed data into HDFS as the configured deep storage.
This worked well for the last month, now as the load increases on one of the indexing tasks (about 3GB per hourly segment on Druid), we observe a very strange behaviour:

Even though the indexing task is working fine on its own Middlemanager (which is not really high loaded, as the load avg. is at 2 on an r3.2xlarge AWS machine), the handoff to HDFS stopped working.
This means, that - on the Middlemanager - the druid.indexer.task.baseTaskDir fills up with temporary index data that is not handed off to HDFS.

In baseTaskDir/$indexing_job_id/work/persist/$data_source/, the following files are accumulating:

drwxr-xr-x 20 root root 4096 Jul 16 01:45 2015-07-15T22:00:00.000Z_2015-07-15T23:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 03:50 2015-07-16T00:00:00.000Z_2015-07-16T01:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 06:40 2015-07-16T02:00:00.000Z_2015-07-16T03:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 09:29 2015-07-16T03:00:00.000Z_2015-07-16T04:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 08:01 2015-07-16T04:00:00.000Z_2015-07-16T05:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 10:48 2015-07-16T05:00:00.000Z_2015-07-16T06:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 13:14 2015-07-16T06:00:00.000Z_2015-07-16T07:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 14:22 2015-07-16T07:00:00.000Z_2015-07-16T08:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 15:29 2015-07-16T08:00:00.000Z_2015-07-16T09:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 12:01 2015-07-16T09:00:00.000Z_2015-07-16T10:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:57 2015-07-16T10:00:00.000Z_2015-07-16T11:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 20:06 2015-07-16T11:00:00.000Z_2015-07-16T12:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 18:35 2015-07-16T12:00:00.000Z_2015-07-16T13:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 14:02 2015-07-16T13:00:00.000Z_2015-07-16T14:00:00.000Z/
drwxr-xr-x 26 root root 4096 Jul 16 15:02 2015-07-16T14:00:00.000Z_2015-07-16T15:00:00.000Z/
drwxr-xr-x 25 root root 4096 Jul 16 16:02 2015-07-16T15:00:00.000Z_2015-07-16T16:00:00.000Z/
drwxr-xr-x 24 root root 4096 Jul 16 17:02 2015-07-16T16:00:00.000Z_2015-07-16T17:00:00.000Z/
drwxr-xr-x 23 root root 4096 Jul 16 18:03 2015-07-16T17:00:00.000Z_2015-07-16T18:00:00.000Z/
drwxr-xr-x 22 root root 4096 Jul 16 20:06 2015-07-16T18:00:00.000Z_2015-07-16T19:00:00.000Z/
drwxr-xr-x 19 root root 4096 Jul 16 20:03 2015-07-16T19:00:00.000Z_2015-07-16T20:00:00.000Z/
drwxr-xr-x 5 root root 4096 Jul 16 20:11 2015-07-16T20:00:00.000Z_2015-07-16T21:00:00.000Z/

As long as the Middlemanager continues with the indexing task, queries on this task are answered completely. The segments are also visible in the coordinator console, but with 0Bytes of size, as the segments are only stored at the middlemanager and not in the historical nodes. Thus the system is functional, until the middlemanager runs out of disk space for the baseTaskDir.
However, if we restart the task (in order to try to fix the handoff to HDFS), the indexed segments are dropped in the coordinator console and not queryable anymore.

Question 1:
What keeps the druid middlemanager away from pushing the indexed segments to HDFS and advertising them to zookeeper?

Question 2:
After a restart of the middlemanager, the old indexed data (that seems to be gone in the coordinator console) is still available in the basetaskdir. How am I able to push and advertise the missing segments to the historical node via HDFS manually?

Thanks in advance,
Constantin

You received this message because you are subscribed to the Google Groups “Druid User” group.

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/fc90831b-7dce-45fa-b263-ae57692249d4%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

You received this message because you are subscribed to the Google Groups “Druid User” group.

To unsubscribe from this group and stop receiving emails from it, send an email to druid...@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/4c5243a6-b26a-43fa-8fb8-48f8d7894c1c%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

在 2015年7月19日星期日 UTC+8上午4:41:57,Himanshu写道: