Tranquility7.2+Spark 1.6.0: Too many Tranquilizer threads in waiting and keeps growing

Hi,

For a uptime of 12 hours of our spark streaming job, we observe that the number of Tranquilizer threads(as per Spark application UI/ Executor Thread Dump) keep growing at regular interval.

They all have following description

*java.lang.Object.wait(Native Method) *

*java.lang.Object.wait(Object.java:502) *

*com.metamx.tranquility.tranquilizer.Tranquilizer$anonfun$4.apply(Tranquilizer.scala:86)
com.metamx.common.scala.concurrent.package$anon$1.run(package.scala:37) *

java.lang.Thread.run(Thread.java:745)

The memory usage of spark executor also grows and I guess it is related.

We generally use Tranquilizer as follows

Tranquilizer<Map<String,Object>> tqlzr = Tranquilizer.create(objDruidBeam)

tqlzr.start();

foreachMap {

//send individual event and Await.result(future)

}

We leave the tranquilizer object open because stop and close methods seem to close the beam also.

Please advise. This is really hampering uptime of our spark streaming application.

Thanks,
Dave

This is what happened to my spark streaming job after an uptime of

16/02/11 15:38:21 WARN AbstractChannel: Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x0ecd764c]
java.lang.OutOfMemoryError: unable to create new native thread

The above log is of the executor. All the piled up threads are gone now and my application stopped working.

Hey Dave,

It looks like you’re not using tranquility’s builtin Spark adapter, is that right?

One thing to watch out for with spark is that it potentially deserializes the same objects many times on the cluster. This can lead to something you think should be one object, actually being instantiated many times. The best workaround I’m aware of is to store things in statics (in scala an “object”).

There’s an example here: https://github.com/druid-io/tranquility/blob/master/docs/spark.md (note the BeamInstance is a “val” in an “object”).

I think something similar should work in your case.

Thanks Gian.

We are not using spark adapter because it does not allow us to track # of sent/failed events.

We create Beam as Java static variable. here is a pseudo code

For each RDD partition,

create tranquilizer using Tranquilizer.create(staticBeamObject)

send the events one by one

Await.result for each event

record

once everything is sent, leave the beam and “new” tranquilizer as-is.

We cannot invoke flush method at this point so after each send, Await.result is used.

Do you see any issue with this?

What could be the alternative to our problem where we want to record send success rate.

Thanks,

Dave

Hey Dave,

Each tranquilizer creates its own background send thread. Tranquilizers are thread-safe, though, and intended to be shared. So rather than creating one for each batch it would be better to have a single one be a Java static and re-use it. Also doing an Await.result on each message is not as efficient as allowing batching to happen, by sending many messages without awaiting until the end of the batch.

Check out this code in the tranquility-spark adapter: https://github.com/druid-io/tranquility/blob/v0.7.3/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala

You could do something similar, but save the sent/received values somewhere rather than just using them for a log message. If “tranquilizer” is a JVM static then this approach won’t need a ton of threads and should give you the best throughput.

Thanks Gian.

We were able to implement it with something similar to the link you shared but using tranquility 7.3.

  • Dave

I notice that same issues still exist in 0.7.4. I submit an issue and a pull request.

在 2016年2月13日星期六 UTC+8上午2:37:01,Dave S写道: