Some many tasks pending when using tranquility

Hi,
When I using spark streaming to ingest data to druid through “tranquility”.

There are so many tasks pending.

And there are some exception:

java.io.IOException: Unable to push events to task: index_realtime_new_rcv_log_2015-12-01T05:55:00.000Z_4_0 (status = TaskRunning)
	at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:160)
	at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:146)
	at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:863)
	at com.twitter.util.Try$.apply(Try.scala:13)
	at com.twitter.util.Future$.apply(Future.scala:90)
	at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863)
	at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863)
	at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:824)
	at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:823)
	at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:100)
	at com.twitter.util.Promise$Transformer.k(Promise.scala:100)
	at com.twitter.util.Promise$Transformer.apply(Promise.scala:110)
	at com.twitter.util.Promise$Transformer.apply(Promise.scala:91)
	at com.twitter.util.Promise$$anon$2.run(Promise.scala:345)
	at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:186)
	at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
	at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:212)
	at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:86)
	at com.twitter.util.Promise.runq(Promise.scala:331)
	at com.twitter.util.Promise.updateIfEmpty(Promise.scala:642)
	at com.twitter.util.Promise.update(Promise.scala:615)
	at com.twitter.util.Promise.setValue(Promise.scala:591)
	at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:76)
	at com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:45)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
	at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459)
	at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
	at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
	at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)
	at com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:86)
	at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
	at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)
	at com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35)
	at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.twitter.finagle.NoBrokersAvailableException: No hosts are available for druid:firehose:new_rcv_log-55-0004-0000

Here are some part of spark streaming code to ingest data:

dsteaming.foreachRDD(rdd => {
 rdd.foreachPartition { partitionOfRecords =>
  val druidCuratorAndBeam = druidSink.value.getDruidBeam()
  val curator = druidCuratorAndBeam._1
  val druidService = druidCuratorAndBeam._2
  var messages = Seq[Map[String, Any]]()
  partitionOfRecords.foreach(record => {
   messages :+= jsonStrToDruidMap(record._2)
   if (messages.length >= _events) {
    val startTime = (new Date()).getTime
    Await.result(druidService(messages))
    val endTime = (new Date()).getTime
    loggingInfo(s"It cost ${endTime - startTime}ms to send ${messages.length} events")
    messages = Seq[Map[String, Any]]()
   }
  })
  if (messages.nonEmpty) {
   val startTime = (new Date()).getTime
   val num = Await.result(druidService(messages))
   val endTime = (new Date()).getTime
   loggingInfo(s"Send $num events successfully")
   loggingInfo(s"It cost ${endTime - startTime}ms to send ${messages.length} events")
   loggingInfo(s"${messages.head}")
   messages = Seq[Map[String, Any]]()
  }
  Await.result(druidService.close())
  curator.close()
 }
})

And why this realtime data can’t be persisted and be loaded to historical node?

在 2015年12月1日星期二 UTC+8下午1:59:13,luo…@conew.com写道:

Hello,

Are you running overlord in remote mode ? If yes, it seems like you do not have enough middle manager capacity that is why tasks are in pending state. The exception you see is a transient exception and it happens either when your task is running but not yet started receiving data (if this is the case then the exception should go away within a minute or so) or your tasks is not being run by any middle manager and tranquility is trying to push data to it. See druid.worker.capacity setting for your middle manager.

Are your running tasks not shutting down at all, after interval end time+ window Period + 5 minutes ? If not then you should look at the task logs to see any potential issues or make sure the segments created by the task is loaded by historical. If the segment is not loaded by any historical then the task will not shutdown. Please go through this FAQ - Druid | Documentation

Druid | Documentation
Table of Contents
API documentation My Data isn’t being loaded Realtime Ingestion

View on druid.io

Preview by Yahoo

Parag

And why this realtime data can’t be persisted and be loaded to historical node?

在 2015年12月1日星期二 UTC+8下午1:59:13,luo…@conew.com写道:

Hi, Parag:

Are you running overlord in remote mode ?

Yes, it is.I run the it in test env, the druid.worker.capacity I set is only 1 for each middle manager node, there are 6 middle manager nodes.And the exception log always existed.Is middle manager node’s capacity of processing not enough?

Are your running tasks not shutting down at all, after interval end time+ window Period + 5 minutes ?

Yes,they are not shutting down at all. And what’s about 5 minutes? Could I change this? for example, interval end time+ window Period + 1 minute? by the way, if the realtime data loaded to historical, will the folders of these tasks in zookeeper be removed automatically?

在 2015年12月2日星期三 UTC+8上午12:15:00,Parag Jain写道:

There are some exception log on middle manager node:

2015-12-01T06:06:24,561 INFO [pool-5-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Removing temporary directory: /data/persistent/task/index_realtime_new_rcv_log_2015-12-01T03:25:00.000Z_3_0/482149f9-5f3a-4957-91d5-e50cd3373142

2015-12-01T06:06:24,562 ERROR [WorkerTaskMonitor-0] io.druid.indexing.worker.WorkerTaskMonitor - I can’t build there. Failed to run task: {class=io.druid.indexing.worker.WorkerTaskMonitor, exceptionType=class java.util.concurrent.ExecutionException, exceptionMessage=java.lang.RuntimeException: java.io.IOException: Stream closed, task=index_realtime_new_rcv_log_2015-12-01T03:25:00.000Z_3_0}

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Stream closed

    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.7.0_91]

    at java.util.concurrent.FutureTask.get(FutureTask.java:188) ~[?:1.7.0_91]

    at io.druid.indexing.worker.WorkerTaskMonitor$1$1.run(WorkerTaskMonitor.java:144) [druid-indexing-service-0.8.1-rc2.jar:0.8.1-rc2]

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [?:1.7.0_91]

    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_91]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_91]

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_91]

    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91]

Caused by: java.lang.RuntimeException: java.io.IOException: Stream closed

    at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

    at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:262) ~[druid-indexing-service-0.8.1-rc2.jar:0.8.1-rc2]

    at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:118) ~[druid-indexing-service-0.8.1-rc2.jar:0.8.1-rc2]

    ... 4 more

Caused by: java.io.IOException: Stream closed

    at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) ~[?:1.7.0_91]

    at java.io.BufferedInputStream.read1(BufferedInputStream.java:280) ~[?:1.7.0_91]

    at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[?:1.7.0_91]

    at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.7.0_91]

    at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]

    at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:235) ~[druid-indexing-service-0.8.1-rc2.jar:0.8.1-rc2]

    at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:118) ~[druid-indexing-service-0.8.1-rc2.jar:0.8.1-rc2]

    ... 4 more

在 2015年12月2日星期三 UTC+8上午12:15:00,Parag Jain写道:

Hello,

Ideally you should have at least (num partitions*num replicants) middle manager so that all your tasks for a particular segment interval can run concurrently and all of the replicant tasks are ready to accept the events sent by tranquility to the replicated tasks.

Are your running tasks creating any segments at all, i.e. are they receiving any data ? If they are creating segments and publishing that information to metadata store then coordinator should pick them up and ask historical to load the segments and after historical announces that it has loaded the segment then the task will un-announce that same segment and shutdown. Unannouncing the segment will delete the information regarding announcement of segment by the task will be deleted from zookeeper.

5 minutes is the firehose grace period in tranquility. It can be changed by setting your own DruidBeamConfig while creating the DruidService for tranquility to send data. However, having it as 5 minutes should not cause any issues.

For the latest exception you posted I am not sure what could be the problem but this discussion may help - https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!searchin/druid-user/i$20can$27t$20build$20there/druid-user/VxmzJ8EWTCE/sIsnCAONCAAJ

One Clarification - Ideally you should have at least total worker slots on middle managers = (num partitionsnum replicants)

Thanks for your detailed explain.
and have some other questions to ask you,

I found the “rejectionPolicy” is “none”, should I change the “rejectionPolicy” to “serverTime”? So that the data which is out of windowPeriod will be ignored, and after segment interval + windowPeriod + firehoseGracePeriod, these segment will be loaded to historical node.

Should I setting the appropriate “maxRowsInMemory” and “intermediatePersistPeriod” to improve the efficiency of segments generating? Because the data in memory will merge into the segment on disk will cost much time?

在 2015年12月3日星期四 UTC+8上午8:34:06,pja…@yahoo-inc.com写道:

And if you are convenient, could you help to reply this thread https://groups.google.com/forum/#!topic/druid-user/pvG4ubRTC1Q
:slight_smile: Thanks for your help

在 2015年12月3日星期四 UTC+8上午8:34:06,pja…@yahoo-inc.com写道:

You need not set the “rejectionPolicy” if you are using tranquility as tranquility take care of dropping events ,which are outside window period, on the client side.
So those events doesn not even reach druid hence even setting a rejectionPolicy won’t have any affect.

Thanks

Rohit

Thanks, Rohit.
And should I setting the appropriate “maxRowsInMemory” and “intermediatePersistPeriod” to improve the efficiency of segments generating? Because the data in memory will merge into the segment on disk will cost much time?

在 2015年12月3日星期四 UTC+8下午4:43:11,rohit kochar写道:

Hello,

I am not completely sure for what numThreads will be used for in case of MiddleManager (therefore let’s wait for someone to reply on that thread). Regarding the memory, Druid Nodes memory maps the segments they are serving so yes the remaining memory can be used to memory map the segments. If you have enough memory then potentially all the queries can be served from directly from segments loaded in main memory and there can be no paging.

If you are seeing a lot of GC because of receiving too many events before intermediate persist happens or final segment creation is taking a long time to merge then you can play around with the settings “maxRowsInMemory” and “intermediatePersistPeriod” to see if it improves your performance.

Note - Suggested final segment size is in range 300mb-700mb

Parag

Thanks Parag very much

在 2015年12月4日星期五 UTC+8上午2:30:42,pja…@yahoo-inc.com写道: