Spark streaming can't ingest data into druid through Tranquility

There are some exception when spark streaming ingest data into druid through “Tranquility”
It is described as below:

16/02/24 15:11:15 WARN beam.ClusteredBeam: Emitting alert: [anomaly] Failed to propagate events: overlord/new_rcv_log_test
{
  "eventCount" : 1,
  "timestamp" : "2016-02-24T15:10:00.000Z",
  "beams" : "HashPartitionBeam(DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 0, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_0_0/new_rcv_log_test-10-0000-0000]), DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 1, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_1_0/new_rcv_log_test-10-0001-0000]))"
}
com.twitter.finagle.FailedFastException: Endpoint firehose:overlord:new_rcv_log_test-10-0001-0000 is marked down. For more details see: [https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s](https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s)
	at com.twitter.finagle.NoStacktrace(Unknown Source)
16/02/24 15:11:15 INFO core.LoggingEmitter: Event [{"feed":"alerts","timestamp":"2016-02-24T15:11:15.711Z","service":"tranquility","host":"localhost","severity":"anomaly","description":"Failed to propagate events: overlord/new_rcv_log_test","data":{"exceptionType":"com.twitter.finagle.FailedFastException","exceptionStackTrace":"com.twitter.finagle.FailedFastException: Endpoint firehose:overlord:new_rcv_log_test-10-0001-0000 is marked down. For more details see: [https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s\n\tat](https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s%5Cn%5Ctat) com.twitter.finagle.NoStacktrace(Unknown Source)\n","timestamp":"2016-02-24T15:10:00.000Z","beams":"HashPartitionBeam(DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 0, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_0_0/new_rcv_log_test-10-0000-0000]), DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 1, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_1_0/new_rcv_log_test-10-0001-0000]))","eventCount":1,"exceptionMessage":"Endpoint firehose:overlord:new_rcv_log_test-10-0001-0000 is marked down. For more details see: [https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s](https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s)"}}]
16/02/24 15:11:19 WARN beam.ClusteredBeam: Emitting alert: [anomaly] Failed to propagate events: overlord/new_rcv_log_test
{
  "eventCount" : 1,
  "timestamp" : "2016-02-24T15:10:00.000Z",
  "beams" : "HashPartitionBeam(DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 0, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_0_0/new_rcv_log_test-10-0000-0000]), DruidBeam(interval = 2016-02-24T15:10:00.000Z/2016-02-24T15:15:00.000Z, partition = 1, tasks = [index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_1_0/new_rcv_log_test-10-0001-0000]))"
}
com.twitter.finagle.FailedFastException: Endpoint firehose:overlord:new_rcv_log_test-10-0001-0000 is marked down. For more details see: [https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s](https://twitter.github.io/finagle/guide/FAQ.html#why-do-clients-see-com-twitter-finagle-failedfastexception-s)
	at com.twitter.finagle.NoStacktrace(Unknown Source)

The task is running successfully by getting from overlord UI, but there are no data ingested  in druid.


And there are some logs from successfully task.

2016-02-24T15:10:23,245 INFO [task-runner-0] io.druid.data.input.FirehoseFactory - Firehose created, will shut down at: 2016-02-24T15:21:00.000Z
2016-02-24T15:11:00,073 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - Server Disappeared[DruidServerMetadata{name=‘ip-10-3-6-123:8101’, host=‘ip-10-3-6-123:8101’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:11:00,376 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - Server Disappeared[DruidServerMetadata{name=‘ip-10-3-6-123:8100’, host=‘ip-10-3-6-123:8100’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:15:43,627 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/ip-10-3-6-123:8100
2016-02-24T15:15:43,627 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘ip-10-3-6-123:8100’, host=‘ip-10-3-6-123:8100’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:15:43,672 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/ip-10-3-6-123:8101
2016-02-24T15:15:43,673 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘ip-10-3-6-123:8101’, host=‘ip-10-3-6-123:8101’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:16:00,003 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.
2016-02-24T15:16:00,004 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] segments. Attempting to hand off segments that start before [1970-01-01T00:00:00.000Z].
2016-02-24T15:16:00,004 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge
2016-02-24T15:20:39,797 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/ip-10-3-6-123:8104
2016-02-24T15:20:39,798 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘ip-10-3-6-123:8104’, host=‘ip-10-3-6-123:8104’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:20:40,778 INFO [ServerInventoryView-0] io.druid.curator.inventory.CuratorInventoryManager - Created new InventoryCacheListener for /druid/segments/ip-10-3-6-123:8105
2016-02-24T15:20:40,778 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘ip-10-3-6-123:8105’, host=‘ip-10-3-6-123:8105’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:21:00,000 INFO [timed-shutoff-firehose-0] io.druid.data.input.FirehoseFactory - Closing delegate firehose.
2016-02-24T15:21:00,000 INFO [timed-shutoff-firehose-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Firehose closing.
2016-02-24T15:21:00,001 INFO [timed-shutoff-firehose-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Unregistering chat handler[firehose:overlord:new_rcv_log_test-10-0000-0000]
2016-02-24T15:21:00,001 INFO [timed-shutoff-firehose-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Unannouncing service[DruidNode{serviceName=‘firehose:overlord:new_rcv_log_test-10-0000-0000’, host=‘ip-10-3-6-123’, port=8102}]
2016-02-24T15:21:00,004 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.
2016-02-24T15:21:00,004 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] segments. Attempting to hand off segments that start before [1970-01-01T00:00:00.000Z].
2016-02-24T15:21:00,004 INFO [new_rcv_log_test-overseer-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge
2016-02-24T15:21:00,180 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - Server Disappeared[DruidServerMetadata{name=‘ip-10-3-6-123:8103’, host=‘ip-10-3-6-123:8103’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]
2016-02-24T15:21:00,463 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Submitting persist runnable for dataSource[new_rcv_log_test]
2016-02-24T15:21:00,464 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Shutting down…
2016-02-24T15:21:00,466 INFO [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Removing task directory: /data/persistent/task/index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_0_0/work
2016-02-24T15:21:00,475 INFO [task-runner-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_realtime_new_rcv_log_test_2016-02-24T15:10:00.000Z_0_0”,
“status” : “SUCCESS”,
“duration” : 637573
}

The DruidBeam I created as below:

DruidBeams
 .builder((eventMap: Map[String, Any]) => new DateTime(eventMap("timestamp").asInstanceOf[Long] * 1000))
 .curator(curator)
 .discoveryPath(discoveryPath)
 .location(DruidLocation.create(indexService, dataSource))
 .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
 .tuning(
  ClusteredBeamTuning(
   segmentGranularity = Granularity.FIVE_MINUTE,
   windowPeriod = new Period("PT2m"),
   partitions = 1,
   replicants = 1
  )
 ).timestampSpec(new TimestampSpec("timestamp", "posix", null))
 .beamMergeFn(beams => new HashPartitionBeam(beams.toIndexedSeq))
 .buildBeam()

The timestamp of event I generate is current time, so it will be in time window.

var event = MapString, Any

result += “timestamp” -> (new DateTime(DateTimeZone.UTC)).getMillis / 1000

I have no idea why the data can’t be ingested into druid. Could somebody help to find the reason?it confused me so long time.

Thanks,

Tao

Hey luotao,

The “FailedFastException” in that log snip is not very useful- there should be a realer exception thrown earlier in the logs. It should be associated with the very first time you see “Failed to propagate events”. That realer exception will have the actual thing that went wrong- possibly it’s a connection timeout, or service discovery problem, or connection refused, etc.

Btw, in the next release we are disabling FailedFastExceptions by default, as they are not really useful for us. That will make all of your exceptions actually useful. If you like, you could pull this patch now: https://github.com/druid-io/tranquility/pull/123

Hi, Gian:

Thanks for your help. I set a new finagleRegistry and disable FailFast. The reason is somebody set the wrong work ip of middelmanager. I changed the it, there are no exception now. But there are no data ingest into druid.

Is the timestamp of event out of the time window range? How can I found some related log about this reason?

the event timestamp I set is current time.

var event = MapString, Any

result += “timestamp” -> (new DateTime(DateTimeZone.UTC)).getMillis / 1000


And the timeSpec I set is: new TimestampSpec("timestamp", "posix", null)

Is the setting right?

Thanks,

Tao




在 2016年2月25日星期四 UTC+8上午10:32:26,Gian Merlino写道:

The attachment is spark streaming log and task log.

在 2016年2月25日星期四 UTC+8上午10:32:26,Gian Merlino写道:

spark.txt (42.7 KB)

task.txt (172 KB)

have the problem is resolved?

在 2016年2月25日星期四 UTC+8下午12:04:08,luo…@conew.com写道:

Hi tao,

My i have your contact method? i have some problems in the integration with spark streaming…thanks

在 2016年2月24日星期三 UTC+8下午11:43:47,luo…@conew.com写道: