Overlord unavailability causes entire cluster instability in druid v0.9.0 and causes data loss

Hi druid team,

we saw that the code here is receiving a connection reset by peer from the overlord while it’s trying to submit the task action:

When the overlord throws has a connectivity issues and closes the connection, “Connection reset by peer” is throws and I see the following in the log:

2018-11-20T14:00:00,461 WARN [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Exception submitting action for task[index_realtime_vidmessage_2018-11-20T21:00:00.000Z_45_1]

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_181]

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_181]

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_181]

at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_181]

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_181]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

2018-11-20T14:00:00,468 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Will try again in [PT59.359S].

Is this a known issue to cause the entire cluster to go unstable? On the producer side where there is a kafka consumer that pushes data to druid, on all the nodes of the producer, we see this error:

627183:com.twitter.finagle.GlobalRequestTimeoutException: exceeded 1.minutes+30.seconds to disco!firehose:druid:overlord:vidmessage-021-0045-0001 while waiting for a response for the request, including retries (if applicable)

627402- at com.twitter.finagle.NoStacktrace(Unknown Source)

This connection reset is only happening on one partition out of 248 partitions that we have setup and this causes all the 248 partitions (of different granularities) to lose the data partially. Partial data loss is really that 25% of the data will be ingested to druid and rest of it is dropped by druid. We have 2 instances of overlord. Doesn’t the index task of peon retry to connect to another overlord if one overlord fails?

Hey Mohan,

In general I would expect the peon / task to retry if there is a problem talking to the overlord. I wouldn’t expect the damage to be more severe than perhaps a brief hiccup in ingestion. I would expect ingestion to pause briefly while the retries are going on but then resume afterwards. If this happens often you could try setting druidBeam.firehoseRetryPeriod or druidBeam.indexRetryPeriod higher (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

Also, there’s a log message there: “RemoteTaskActionClient - Will try again in [PT59.359S].” Does the task try again after that minute? What happens then?

Btw, Druid 0.9.0 is about 2.5 years old - I don’t recall the specifics of what was improved in this area since then, but there have been lots of improvements in ingestion, and it’s possible that later versions handle things better here!

Another btw: if you are using Kafka, I’d encourage you to check out later versions of Druid + its Kafka indexing service, which have nicer operational properties. See here for some info on how it works: https://imply.io/post/exactly-once-streaming-ingestion

Hi Gian,

Thanks for the reply. I increased the retry period for both index and firehose that you had mentioned. It didn’t help. I modified the v0.9.0 druid code to deliberately throw an IOException to reproduce the issue. On the producer side, I increased the firehose and index retry period. On the overlord, I think it retries after this timeout to connect to the overlord and if the call fails again, the timeout in the next iteration is set to a exponential value (from 59s to 142s) and it just keeps waiting instead of retrying a different overlord (when we have multiple overlord instances). This instance going bad will affect all the other partitions as well from ingesting the data which is weird.

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_181]

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_181]

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_181]

at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_181]

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_181]

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[druid-services-0.9.0-selfcontained.jar:0.9.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

2018-11-20T14:33:42,058 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Will try again in [PT580.178S].

we already are using v0.12.x for our other projects for direct kafka indexing. current project using v0.9.0 is a big project to migrate. so I thought if there is an easy way to fix it, then I could get some community help.

Best,

Mohan

Hey Mohan,

On the overlord, I think it retries after this timeout to connect to the overlord and if the call fails again, the timeout in the next iteration is set to a exponential value (from 59s to 142s) and it just keeps waiting instead of retrying a different overlord (when we have multiple overlord instances).

Only the leader overlord is eligible for actions to get sent to, so it makes sense that it’d wait to retry the same one. It can’t use the others (they are standbys).

This instance going bad will affect all the other partitions as well from ingesting the data which is weird.

This is probably due to the relatively small buffers used by Tranquility senders. If any partition gets stuck then the others will get stuck soon too, since the Tranquility sender will be waiting for the stuck partition to come back and won’t be sending on other partitions in the meantime. Druid’s native Kafka indexing handles this better: the partitions aren’t linked in the same way.

Back to the original connection reset. It’s possible this is happening because the overlord is… overloaded. I’d try taking some jstacks ("jstack -l ") of the overlord while the connection resets are happening and see if you see anything interesting. Perhaps it will lead to a resolution.

Gian

Hi Gian,

Thanks for the update. I changed the code for handling the actions to overlord in a future thread hoping it would succeed but return a mock response in the meanwhile so that the real-time indexing can proceed. If overlord is unavailable until the end, then I am ending this partition to lead to failure so that the replica partition will succeed writing data to hdfs. But again, this is only in the meantime until we move to v0.12.x

But I will try the jstack to see if I see anything interesting why only that partition will get stuck with overlord connectivity.

Best,

Mohan