Intermittent failure of replicated realtime ingestion tasks after upgrade to Druid 0.9.0

We recently upgraded our development Druid cluster to 0.9.0, backed by a CDH 5.3.3 Hadoop cluster. Prior to this upgrade we used a hand-built Druid that downgraded our version of Jackson in order to work properly with Hadoop and things worked fine. As part of the upgrade, we instead used the stock Druid release with the pull-deps tool to get our extensions, though we did need to re-build the Druid HDFS extension to pull the CDH 5.3.3 jars instead of the stock Hadoop 2.3.0 jars (see https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/druid-user/N-c-P09leJA/g1RwJLZbBQAJ). We also set the “mapreduce.job.user.classpath.first” property in our reingestion jobs to true to get around Jackson versioning issues.

Batch reingestion works fine with no issues, but frequently one of our 2x replicated realtime ingestion tasks will fail with this exception:

2016-07-05T00:15:03,160 ERROR [delta_netflow-2016-07-04T23:00:00.000Z-persist-n-merge] io.druid.segment.realtime.plumber.RealtimePlumber - Failed to persist merged index[delta_netflow]: {class=io.druid.segment.realtime.plumber.RealtimePlumber, exceptionType=class org.apache.hadoop.ipc.RemoteException, exceptionMessage=No lease on /druid/delta_netflow/20160704T230000.000Z_20160705T000000.000Z/2016-07-04T23_00_05.235Z/0/index.zip (inode 18375798): File does not exist. Holder DFSClient_NONMAPREDUCE_-1070421989_1 does not have any open files.

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3358)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3446)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3416)

at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:675)

at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:219)

at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:520)

at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

, interval=2016-07-04T23:00:00.000Z/2016-07-05T00:00:00.000Z}

org.apache.hadoop.ipc.RemoteException: No lease on /druid/delta_netflow/20160704T230000.000Z_20160705T000000.000Z/2016-07-04T23_00_05.235Z/0/index.zip (inode 18375798): File does not exist. Holder DFSClient_NONMAPREDUCE_-1070421989_1 does not have any open files.

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3358)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3446)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3416)

at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:675)

at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:219)

at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:520)

at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

at org.apache.hadoop.ipc.Client.call(Client.java:1411) ~[?:?]

at org.apache.hadoop.ipc.Client.call(Client.java:1364) ~[?:?]

at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~[?:?]

at com.sun.proxy.$Proxy63.complete(Unknown Source) ~[?:?]

at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:435) ~[?:?]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_67]

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_67]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_67]

at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_67]

at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) ~[?:?]

at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[?:?]

at com.sun.proxy.$Proxy64.complete(Unknown Source) ~[?:?]

at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2164) ~[?:?]

at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2148) ~[?:?]

at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?]

at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[?:?]

at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:241) ~[?:1.7.0_67]

at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:360) ~[?:1.7.0_67]

at com.metamx.common.CompressionUtils.zip(CompressionUtils.java:104) ~[java-util-0.27.7.jar:?]

at io.druid.storage.hdfs.HdfsDataSegmentPusher.push(HdfsDataSegmentPusher.java:92) ~[?:?]

at io.druid.segment.realtime.plumber.RealtimePlumber$4.doRun(RealtimePlumber.java:550) [druid-server-0.9.0.jar:0.9.0]

at io.druid.common.guava.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:42) [druid-common-0.9.0.jar:0.9.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_67]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_67]

at java.lang.Thread.run(Thread.java:745) [?:1.7.0_67]

The realtime ingestion task is marked as FAILED in the coordinator console, and this problem occurs maybe 6-10 times per data source in a 24 hour period. We haven’t observed any case where both realtime ingestion tasks fail and it looks like the data is properly stored in HDFS.

We tried compiling against the stock Hadoop 2.6.0 jars and the exact problem still occurs. Has anyone else successfully gotten Druid 0.9+ to work with a non-Hadoop 2.3.0 cluster? Thanks!

Hey TJ,

I bet this is something related to the fact that the two replica tasks are both going to try to publish the same segment to the same location. Probably one of them succeeds and the other fails – which would explain why you haven’t noticed any missing data. It might work better for the hdfs pusher to create a tmp file first (index.zip.uuid?) and then rename it into place.

I filed an issue for this: https://github.com/druid-io/druid/issues/3219

Other than it being annoying that tasks are showing up FAILED, is this causing any other problems? Things should be okay with your actual data, given that one replica should always end up actually writing the segment.

Right, it looks like there aren’t any other problems, other than the task being marked as failed. Thanks!
–T