Tranquility not emitting event to druid

Hi,

We are trying to setup druid cluster with 3 nodes with m4.xlarge machine [8gb, 2 core]

With druid1 running overload and coordinator process,

druid2 running historical and middlemanager process,

druid3 running broker and transquility process,

But all the events which we are emitting to kafka is simply getting rejected by tranquility, and same events are getting consumed with 1 node standalone cluster. Overload server doesn’t have any logs for the job, only its showing status FAILED. Attached screenshot of overload console and bellow is the payload and status:

Status:

Vikas Rana (vikas.rana@olacabs.com)

{“task”:“index_realtime_booking_data_2016-05-12T06:00:00.000Z_0_0”,“status”:{“id”:“index_realtime_booking_data_2016-05-12T06:00:00.000Z_0_0”,“status”:“FAILED”,“duration”:-1}}

Payload

{“task”:“index_realtime_booking_data_2016-05-12T06:00:00.000Z_0_0”,“payload”:{“id”:“index_realtime_booking_data_2016-05-12T06:00:00.000Z_0_0”,“resource”:{“availabilityGroup”:“booking_data-06-0000”,“requiredCapacity”:1},“spec”:{“dataSchema”:{“dataSource”:“booking_data”,“parser”:{“type”:“map”,“parseSpec”:{“format”:“json”,“timestampSpec”:{“column”:“timestamp”,“format”:“millis”,“missingValue”:null},“dimensionsSpec”:{“dimensionExclusions”:[“lookupKey”,“routingKey”,“timestamp”],“spatialDimensions”:}}},“metricsSpec”:[{“type”:“count”,“name”:“count”}],“granularitySpec”:{“type”:“uniform”,“segmentGranularity”:“HOUR”,“queryGranularity”:{“type”:“none”},“intervals”:null}},“ioConfig”:{“type”:“realtime”,“firehose”:{“type”:“clipped”,“delegate”:{“type”:“timed”,“delegate”:{“type”:“receiver”,“serviceName”:“firehose:druid:overlord:booking_data-06-0000-0000”,“bufferSize”:100000},“shutoffTime”:“2016-05-12T07:25:00.000Z”},“interval”:“2016-05-12T06:00:00.000Z/2016-05-12T07:00:00.000Z”},“firehoseV2”:null},“tuningConfig”:{“type”:“realtime”,“maxRowsInMemory”:100000,“intermediatePersistPeriod”:“PT20M”,“windowPeriod”:“PT20M”,“basePersistDirectory”:"/home/vikasrana/druid-0.9.0/var/tmp/1463034512955-0",“versioningPolicy”:{“type”:“intervalStart”},“rejectionPolicy”:{“type”:“none”},“maxPendingPersists”:0,“shardSpec”:{“type”:“linear”,“partitionNum”:0},“indexSpec”:{“bitmap”:{“type”:“concise”},“dimensionCompression”:null,“metricCompression”:null},“buildV9Directly”:false,“persistThreadPriority”:0,“mergeThreadPriority”:0,“reportParseExceptions”:false}},“context”:null,“groupId”:“index_realtime_booking_data”,“dataSource”:“booking_data”}}

2016-05-12 12:18:14,867 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {booking_sample3={receivedCount=5, sentCount=0, failedCount=5}} pending messages in 2ms and committed offsets in 4ms.

Druid _common runtime config:

druid.extensions.loadList=[“druid-kafka-eight”, “druid-s3-extensions”, “druid-histogram”, “druid-datasketches”, “druid-namespace-lookup”, “mysql-metadata-storage”]

druid.startup.logging.logProperties=true

druid.zk.service.host=zk1,zk2,zk3

druid.zk.paths.base=/druid

druid.metadata.storage.type=mysql

druid.metadata.storage.connector.connectURI=jdbc:mysql://xyz.com:3306/druid

druid.metadata.storage.connector.user=dr

druid.metadata.storage.connector.password=druid

druid.storage.type=s3

druid.storage.bucket=s3://XXXXXX/dev/druid

druid.storage.baseKey=

druid.s3.accessKey=YYYYYY

druid.s3.secretKey=CCCCCCCCCCC

druid.indexer.logs.type=file

druid.indexer.logs.directory=var/druid/indexing-logs

druid.selectors.indexing.serviceName=druid:overlord

druid.selectors.coordinator.serviceName=druid:prod:coordinator

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]

druid.emitter=logging

druid.emitter.logging.logLevel=info

overload run time config:

druid.host=druid1.com

druid.service=druid/overlord

druid.port=8090

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote

druid.indexer.storage.type=metadata

coordinator run time config:

druid.host=druid1.com

druid.service=druid/coordinator

druid.port=8081

druid.coordinator.startDelay=PT30S

druid.coordinator.period=PT30S

broker run time config:

druid.host=druid3.com

druid.service=druid/broker

druid.port=8082

druid.broker.http.numConnections=5

druid.server.http.numThreads=15

druid.processing.buffer.sizeBytes=536870912

druid.processing.numThreads=7

druid.broker.cache.useCache=true

druid.broker.cache.populateCache=true

druid.cache.type=local

druid.cache.sizeInBytes=2000000000

historical run time config:

druid.host=druid2.com

druid.service=druid/historical

druid.port=8083

druid.server.http.numThreads=10

druid.processing.buffer.sizeBytes=536870912

druid.processing.numThreads=7

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:130000000000}]

druid.server.maxSize=130000000000

middlemanager run time config:

druid.host=druid2.com

druid.service=druid/middleManager

druid.port=8091

druid.worker.capacity=3

druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

druid.indexer.task.baseTaskDir=var/druid/task

druid.server.http.numThreads=15

druid.processing.buffer.sizeBytes=536870912

druid.processing.numThreads=2

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.3.0”]

Hi Lovenish,
do you see any exceptions in the task logs ?

Hey Lovenish,

In your configuration, druid.selectors.indexing.serviceName=druid:overlord but your overlord is configured to: druid.service=druid/overlord. These two must match for service discovery to succeed. I would try setting these to the same thing and seeing if that helps. Note that the druid.selectors.indexing.serviceName in your Tranquility configuration must also match. Also, your druid.selectors.coordinator.serviceName similarly does not match the service name of your coordinator.

Hi David,

We figured out this maybe causing the issue, so we changed service to “druid/overlord” in all the configs. After this tranquility is able to process msgs, but now we are facing another issue, tranquility after running for 8-10hrs start giving following error:

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!firehose:druid:overlord:booking_data_closure_final-013-0000-0000, Dtab.base=, Dtab.local=
at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]
2016-05-16 14:02:50,544 [Hashed wheel timer #1] INFO c.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“alerts”,“timestamp”:“2016-05-16T14:02:50.543Z”,“service”:“tranquility”,“host”:“localhost”,“severity”:“anomaly”,“description”:“Failed to propagate events: druid:overlord/booking_data_closure_final”,“data”:{“exceptionType”:“com.twitter.finagle.NoBrokersAvailableException”,“exceptionStackTrace”:“com.twitter.finagle.NoBrokersAvailableException: No hosts are available for disco!firehose:druid:overlord:booking_data_closure_final-013-0000-0000, Dtab.base=, Dtab.local=\n\tat com.twitter.finagle.NoStacktrace(Unknown Source)\n”,“timestamp”:“2016-05-16T13:00:00.000Z”,“beams”:“MergingPartitioningBeam(DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 0, tasks = [index_realtime_booking_data_closure_final_2016-05-16T13:00:00.000Z_0_0/booking_data_closure_final-013-0000-0000]))”,“eventCount”:2000,“exceptionMessage”:“No hosts are available for disco!firehose:druid:overlord:booking_data_closure_final-013-0000-0000, Dtab.base=, Dtab.local=”}}]
2016-05-16 14:03:03,625 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {lovenish={receivedCount=2005, sentCount=0, droppedCount=2005, unparseableCount=0}} pending messages in 83829ms and committed offsets in 2ms.
2016-05-16 14:04:05,383 [Hashed wheel timer #1] WARN c.m.tranquility.beam.ClusteredBeam - Emitting alert: [anomaly] Failed to propagate events: druid:overlord/booking_data_closure_final
{
“eventCount” : 1,
“timestamp” : “2016-05-16T13:00:00.000Z”,
“beams” : “MergingPartitioningBeam(DruidBeam(interval = 2016-05-16T13:00:00.000Z/2016-05-16T14:00:00.000Z, partition = 0, tasks = [index_realtime_booking_data_closure_final_2016-05-16T13:00:00.000Z_0_0/booking_data_closure_final-013-0000-0000]))”
}

Also when we checked we saw many pending task in co-ordinator UI.

And tranquility always move kafka offset further, even if msgs are failed to processed by druid. So is there a way to reprocess these failed events via tranquility.

Hi Lovenish,
Pending tasks means that the tasks are waiting for a free slot on the middlemanager to run the task.

The number of tasks on a middmanager is defined by druid.worker.capacity, make sure you have enough configured capacity to run the tasks.

Also, In case handoff is not working due to some reason, the realtime index tasks will wait until the handoff is complete. Make sure handoff is working correctly in your setup and realtime tasks are getting finished after (segment granularity + windowPeriod).

Hi Nishant,

Thanks for the reply our issue got solved after increased the worker capacity. We have one more question if any one can help us “tranquility always move kafka offset further, even if msgs are failed to processed by druid. So is there any way to reprocess these failed events via tranquility.”

Hey Lovenish,

Tranquility will “give up” on messages after a timeout has elapsed and move the kafka offsets forwards. It does this because it needs to drop anything older than your windowPeriod anyway, so there’s nothing useful it can do with older messages (see https://github.com/druid-io/tranquility/blob/master/docs/overview.md#segment-granularity-and-window-period).

For processing older messages you have a couple of options:

  1. Batch ingestion (http://druid.io/docs/latest/ingestion/batch-ingestion.html). You could do this from data saved from Kafka to HDFS or S3.

  2. Starting in the next release (0.9.1) we will offer a new, experimental Kafka ingestion option that can ingest messages from any time frame and will not give up on older messages. You could try this out as an alternative to tranquility-kafka and see if it works for you. There’ll be information on how to use it in the release notes. Or, if you want to build from source, you can check out master right now.

Thanks for the solution suggested for processing missed/dropped events via batch ingestion. So we are experiments with it, but overload tasks first waiting on lock, then got failed after 10-15 mins.

2016-05-20T14:17:26,725 INFO [TaskQueue-Manager] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_hadoop_booking_data_closure_final_2016-05-20T14:06:24.279Z]: LockTryAcquireAction{interval=2016-05-20T00:00:00.000Z/2016-05-21T00:00:00.000Z}

Batch Spec json:

Running as:

curl -L -H’Content-Type: application/json’ -XPOST --data-binary @batch.json http://druid1.overload.com:8090/druid/indexer/v1/task/

{“type”:“index_hadoop”,
“spec”:{
“dataSchema”:{“dataSource”:“booking_data_closure_final”,
“parser”:{
“type”:“hadoopyString”,
“parseSpec”:{“format”:“json”,
“timestampSpec” : {
“column” : “metadata.timestamp”,
“format” : “millis”
},
“dimensionsSpec”:{
“dimensions”:,
“dimensionExclusions”:[“data.output_fields”,“data.input_fields”,“data.tax_breakup”,“data.discount_constraints”,“metadata.timestamp”],
“spatialDimensions”:
}
}
},
“metricsSpec” : [
{
“type” : “count”,
“name” : “count”
}
],
“granularitySpec”:{
“type”:“uniform”,
“segmentGranularity”:“hour”,
“queryGranularity” :“none”,
“intervals” : [ “2016-05-20/2016-05-21” ]
}
},
“ioConfig” : {
“type” : “hadoop”,
“inputSpec” : {
“type” : “static”,
“paths” : “/home/vikasrana/sample_input”
}
},
“tuningConfig” : {
“type”: “hadoop”
}
},
"hadoopDependencyCoordinates": [“org.apache.hadoop:hadoop-client:2.7.1”]****}

And copied hadoop config file under _common dir and having hadoop 2.7.1 cluster

Can you post the task log?

Hi Fangjin,

Could you please let us know how to get the task logs. We are not able to get logs from coordinator ui.

We are only seeing below lock in overload logs.

2016-05-20T14:17:26,725 INFO [TaskQueue-Manager] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_hadoop_booking_data_closure_final_2016-05-20T14:06:24.279Z]: LockTryAcquireAction{interval=2016-05-20T00:00:00.000Z/2016-05-21T00:00:00.000Z}

Hi,

Could you please let us know how to enable task log in debug mode. We are getting only service log for each of the processes, overload,cordinator, middlemanager and broker.

The task log can be found at :

http://overlord_ip:port/console.html

By default the overlord ui is on port 8090.

Since you are running a hybrid batch/streaming setup, one other thing to make sure of is that your batch loads and your realtime loads are not overlapping. If they are, one of them will get blocked on lock acquisition.

Generally you want to think of hybrid ingestion in terms of “last X time is owned by realtime, anything earlier is owned by batch”. The “X” is determined by segmentGranularity + windowPeriod. So if your segmentGranularity is HOUR and your windowPeriod is 10 minutes, you don’t want to run any batch jobs for the most recent two hours.

Hi,

Thanks for the reply.

On overload ui we are getting “No log was found for this task. The task may not exist, or it may not have begun running yet.” in log file.

Below is the status:

{“task”: “index_hadoop_booking_data_closure_final_2016-05-31T12:51:29.878Z”,“status”: {“id”: “index_hadoop_booking_data_closure_final_2016-05-31T12:51:29.878Z”,“status”: “FAILED”,“duration”: 0}}

Could you please help us here. We are trying from last couple of days to come over from this problem.

Also we are druid logs for tasks running with tranquility. Only batch ingestion with hadoop is failing to start. Everything running fine except batch ingestion. Also is there another way for batch ingestion.

Thanks,
Vikas

Hey Lovenish,

Do you see any exceptions in the overlord log about this task? Do you see any messages in the middleManager log about it trying to run this task? Also could you double check whether or not you have task log archiving enabled? The configs would be something like one of these three, starting with “druid.indexer.logs”:

For local disk (only viable in a cluster if this is a network mount):

druid.indexer.logs.type=file

druid.indexer.logs.directory=var/druid/indexing-logs

For HDFS:

druid.indexer.logs.type=hdfs

druid.indexer.logs.directory=/druid/indexing-logs

For S3:

druid.indexer.logs.type=s3

druid.indexer.logs.s3Bucket=your-bucket

druid.indexer.logs.s3Prefix=druid/indexing-logs