Kafka Indexing Service Overlord Handoff Failing

Hello All,

I’ve got Druid setup using the Kafka indexing service. I’m able to push data into Kafka and see it get picked up by the Druid Supervisor task (I’m using Pivot to verify the data is queryable). However, the handoff to the historical node seems to be failing. I notice on my file system that there is no segments folder, so it would seem the segments just aren’t getting persisted. I tried manually creating the directory with the correct permissions, but still no luck.

I see these error for every task that has data in the segment:

2016-09-27T16:10:05,983 ERROR [accns.staging.aview.events.metrics.cellular-utilization-incremental-persist] io.druid.segment.realtime.appenderator.AppenderatorImpl - dataSource[accns.staging.aview.events.metrics.cellular-utilization] – incremental persist failed: {class=io.druid.segment.realtime.appenderator.AppenderatorImpl, segment=accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z, count=0}

``

2016-09-27T16:10:06,023 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel, type=index_kafka, dataSource=accns.staging.aview.events.metrics.cellular-utilization}]

java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Unable to delete file: var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work/persist/accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z/0/v8-tmp/.nfs00000000000a044f000006b8

at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

at io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.persist(FiniteAppenderatorDriver.java:234) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:463) ~[druid-kafka-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_92]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_92]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_92]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_92]

Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Unable to delete file: var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work/persist/accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z/0/v8-tmp/.nfs00000000000a044f000006b8

at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_92]

at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_92]

at io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.persist(FiniteAppenderatorDriver.java:226) ~[druid-server-0.9.1.1.jar:0.9.1.1]

… 7 more

Caused by: java.lang.RuntimeException: java.io.IOException: Unable to delete file: var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work/persist/accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z/0/v8-tmp/.nfs00000000000a044f000006b8

at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

at io.druid.segment.realtime.appenderator.AppenderatorImpl.persistHydrant(AppenderatorImpl.java:1168) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.AppenderatorImpl.access$700(AppenderatorImpl.java:107) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.AppenderatorImpl$6.doCall(AppenderatorImpl.java:551) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.common.guava.ThreadRenamingCallable.call(ThreadRenamingCallable.java:44) ~[druid-common-0.9.1.1.jar:0.9.1.1]

… 4 more

Caused by: java.io.IOException: Unable to delete file: var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work/persist/accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z/0/v8-tmp/.nfs00000000000a044f000006b8

at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279) ~[commons-io-2.4.jar:2.4]

at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653) ~[commons-io-2.4.jar:2.4]

at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535) ~[commons-io-2.4.jar:2.4]

at io.druid.segment.IndexMerger$10.close(IndexMerger.java:618) ~[druid-processing-0.9.1.1.jar:0.9.1.1]

at com.google.common.io.Closer.close(Closer.java:214) ~[guava-16.0.1.jar:?]

at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:1051) ~[druid-processing-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.IndexMerger.merge(IndexMerger.java:423) ~[druid-processing-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.IndexMerger.persist(IndexMerger.java:195) ~[druid-processing-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.IndexMerger.persist(IndexMerger.java:161) ~[druid-processing-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.AppenderatorImpl.persistHydrant(AppenderatorImpl.java:1147) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.AppenderatorImpl.access$700(AppenderatorImpl.java:107) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.realtime.appenderator.AppenderatorImpl$6.doCall(AppenderatorImpl.java:551) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.common.guava.ThreadRenamingCallable.call(ThreadRenamingCallable.java:44) ~[druid-common-0.9.1.1.jar:0.9.1.1]

… 4 more

``

I’m guessing I have something configured incorrectly, but I’m not sure where to look.

Supervisor Spec (from Overlord via HTTP), I currently have task duration set to a minute for testing, but the issue also occurred when set to an hour.

{

“type”: “kafka”,

“dataSchema”: {

“dataSource”: “accns.staging.aview.events.metrics.cellular-utilization”,

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “timestamp”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [“mac”, “carrier”, “data_plan”, “interface”]

}

}

},

“metricsSpec”: [{

“type”: “longSum”,

“name”: “received”,

“fieldName”: “received”

}, {

“type”: “longSum”,

“name”: “transmitted”,

“fieldName”: “transmitted”

}],

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “MINUTE”,

“queryGranularity”: {

“type”: “none”

},

“intervals”: null

}

},

“tuningConfig”: {

“type”: “kafka”,

“maxRowsInMemory”: 75000,

“maxRowsPerSegment”: 5000000,

“intermediatePersistPeriod”: “PT10M”,

“basePersistDirectory”: “/opt/aview/tmp/1474990455227-0”,

“maxPendingPersists”: 0,

“indexSpec”: {

“bitmap”: {

“type”: “concise”

},

“dimensionCompression”: null,

“metricCompression”: null

},

“buildV9Directly”: false,

“reportParseExceptions”: false,

“handoffConditionTimeout”: 0

},

“ioConfig”: {

“topic”: “accns.staging.aview.events.metrics.cellular-utilization”,

“replicas”: 1,

“taskCount”: 1,

“taskDuration”: “PT60S”,

“consumerProperties”: {

“bootstrap.servers”: “staging-app-002.accns.com:9092

},

“startDelay”: “PT5S”,

“period”: “PT30S”,

“useEarliestOffset”: false,

“completionTimeout”: “PT1800S”,

“lateMessageRejectionPeriod”: null

}

}

``

_common:

druid.startup.logging.logProperties=true

druid.zk.service.host:2181=staging-zoo-001.accns.com,staging-zoo-002.accns.com:2181,staging-zoo-003.accns.com

druid.zk.paths.base=/druid

druid.metadata.storage.type=mysql

druid.metadata.storage.connector.connectURI=jdbc:mysql://staging-mysql-001:3306/druid_staging

druid.metadata.storage.connector.user=druid

druid.metadata.storage.connector.password=#{MYSQL_PASSWORD}

For local disk (only viable in a cluster if this is a network mount):

druid.storage.type=local

druid.storage.storageDirectory=var/druid/segments

For local disk (only viable in a cluster if this is a network mount):

druid.indexer.logs.type=file

druid.indexer.logs.directory=var/druid/indexing-logs

druid.selectors.indexing.serviceName=druid/overlord

druid.selectors.coordinator.serviceName=druid/coordinator

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”, “io.druid.client.cache.CacheMonitor”]

druid.emitter=logging

druid.emitter.logging.logLevel=info

``

Thanks in advance.

Hey Andre,

I see some “.nfs” ish named files in your exceptions. Are you running on an nfs mount? Do you need to be? If you don’t need to be, try running on a local filesystem.

Hey Gian,

It is a NFS mount; I believe we need it setup this way. We have our Druid cluster running on two separate physical machines…we plan on moving to HDFS eventually but we trying to get by with the filesystem at first.

I will test this being the source of the issue by bringing down the nodes on one of the physical servers and writing to a different directory. I’ll follow up shortly.

Best Regards.

Hmm, this is actually a much more involved test than I thought. We have two physical machines with 3 VMs each. The VMs run Overlord and Coordinator, Historical and MiddleManager, and Broker. I would need to bring up a machine with all the VMs running together to test this out.

Is there anything else I can try or a different solution than NFS?

Best Regards.

I should also mention the file it is trying to delete didn’t seem to exist. “var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work/persist/accns.staging.aview.events.metrics.cellular-utilization_2016-09-27T16:09:00.000Z_2016-09-27T16:10:00.000Z_2016-09-27T16:09:55.806Z/0/v8-tmp/.nfs00000000000a044f000006b8” I checked “var/druid/task/index_kafka_accns.staging.aview.events.metrics.cellular-utilization_2365d7d9bfd526d_iidmknel/work” while the task was running and all I saw in the directory was “.lock”

Hey Gian,

I got everything running on one physical machine and one vm, it did indeed fix the issue. Awesome job catching that detail!

I still don’t know what to do for our production configuration; the minimum requirement is to have Druid running on both physical machines. I’ll talk with the guys here, but if you have any ideas, I’m all ears.

Thanks!

As an update, we ended up using HDFS.

Hey Andre,

Good to know you got it working with that approach.

I’m not sure what was wrong with the NFS setup (there are people using NFS in Druid clusters) but those filenames seem to be related to “silly renames”: http://nfs.sourceforge.net/