corrupted files in deep storage

Hello,
I have a druid cluster with druid.storage.type=local and storageDirectory pointing to a nfs path. I have two data servers with historical, middleManager and broker nodes and two master servers with coordinator and overlord nodes

Ingestion is done with a kafka supervisor with taskCount=2 and replicas=2

Every
once in a while a receive an error from the historical nodes complaining that they can’t load a segment because the zip file is corrupted (see log below).

I am wondering if it is a bug, or I am
forced to use another kind of deep storage, because nfs does not guarantee that there are no concurrent writes on the same file from different replicas. Could it be that there is not a lock mechanism to avoid that two replicas write on the same file on nfs?

Here is the error that I find in the historical logs

2016-09-21T23:00:04,582
ERROR [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - Failed to load segment for dataSource: {class=io.druid.server.coordination.ZkCoordinator, exceptionType=class io.druid.segment.loading.SegmentLoadingException, exceptionMessage=Exception loading segment[content20stats_2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z_2016-09-12T00:00:00.136Z_1],
segment=DataSegment{size=377662, shardSpec=NumberedShardSpec{partitionNum=1, partitions=0}, metrics=[count, impressions, clicks, boximpressions, totstaytime, fblike, fbshare, fbcomment, twcount, searchres], dimensions=[a_attrs, a_boxes_ctr_id, a_boxes_id, n_boximpression, n_breakpoint, n_click, n_doc_type, n_fbcomment, n_fblike, n_fbshare, n_gplus, n_impression, n_info, n_mappa, n_searchno, n_staytime, n_twcount, s_area, s_box, s_cat1, s_cat2, s_cat3, s_dest_id, s_doc_id, s_domain, s_link_type, s_pag_id, s_page, s_ref_host, s_ref_path, s_search, s_ua], version=‘2016-09-12T00:00:00.136Z’, loadSpec={type=local, path=/data/druid/content/deep-storage/content20stats/2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z/2016-09-12T00:00:00.136Z/1/index.zip},
interval=2016-09-12T00:00:00.000Z/2016-09-13T00:00:00.000Z, dataSource=‘content20stats’, binaryVersion=‘9’}}
io.druid.segment.loading.SegmentLoadingException:
Exception loading segment[content20stats_2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z_2016-09-12T00:00:00.136Z_1]
at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:309) ~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ZkCoordinator.addSegment(ZkCoordinator.java:350) [druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.server.coordination.SegmentChangeRequestLoad.go(SegmentChangeRequestLoad.java:44)
[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ZkCoordinator$1.childEvent(ZkCoordinator.java:152) [druid-server-0.9.1.1.jar:0.9.1.1]

at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:522)
[curator-recipes-2.10.0.jar:?]
at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:516)
[curator-recipes-2.10.0.jar:?]
at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)
[curator-framework-2.10.0.jar:?]
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
[guava-16.0.1.jar:?]
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)
[curator-framework-2.10.0.jar:?]
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:514)
[curator-recipes-2.10.0.jar:?]
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
[curator-recipes-2.10.0.jar:?]
at org.apache.curator.framework.recipes.cache.PathChildrenCache$9.run(PathChildrenCache.java:772)
[curator-recipes-2.10.0.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_101]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.RuntimeException: java.util.zip.ZipException: invalid entry size (expected 376503 but got 376409 bytes)
at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:146) ~[java-util-0.27.9.jar:?]

at io.druid.segment.loading.LocalDataSegmentPuller.getSegmentFiles(LocalDataSegmentPuller.java:162)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.loading.LocalLoadSpec.loadSegment(LocalLoadSpec.java:64) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegmentFiles(SegmentLoaderLocalCacheManager.java:143)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegment(SegmentLoaderLocalCacheManager.java:95)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ServerManager.loadSegment(ServerManager.java:152) ~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:305) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 18 more
Caused by: java.util.zip.ZipException: invalid entry size (expected 376503 but got 376409 bytes)
at java.util.zip.ZipInputStream.readEnd(ZipInputStream.java:384) ~[?:1.8.0_101]
at java.util.zip.ZipInputStream.read(ZipInputStream.java:196) ~[?:1.8.0_101]
at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_101]
at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
at com.google.common.io.ByteSink.writeFrom(ByteSink.java:139) ~[guava-16.0.1.jar:?]
at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:248) ~[java-util-0.27.9.jar:?]
at com.metamx.common.CompressionUtils$1.call(CompressionUtils.java:138) ~[java-util-0.27.9.jar:?]
at com.metamx.common.CompressionUtils$1.call(CompressionUtils.java:134) ~[java-util-0.27.9.jar:?]
at com.metamx.common.RetryUtils.retry(RetryUtils.java:60) ~[java-util-0.27.9.jar:?]
at com.metamx.common.RetryUtils.retry(RetryUtils.java:78) ~[java-util-0.27.9.jar:?]
at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:132) ~[java-util-0.27.9.jar:?]

at io.druid.segment.loading.LocalDataSegmentPuller.getSegmentFiles(LocalDataSegmentPuller.java:162)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.loading.LocalLoadSpec.loadSegment(LocalLoadSpec.java:64) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegmentFiles(SegmentLoaderLocalCacheManager.java:143)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegment(SegmentLoaderLocalCacheManager.java:95)
~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ServerManager.loadSegment(ServerManager.java:152) ~[druid-server-0.9.1.1.jar:0.9.1.1]
at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:305) ~[druid-server-0.9.1.1.jar:0.9.1.1]
… 18 more

Regards,
Tommaso

Hey Tommaso,

Your guess sounds likely. For a workaround you could try using another deep storage or setting task.replicas = 1 to prevent concurrent writes.

In the long run I think this would fix the problem: https://github.com/druid-io/druid/issues/3493

Thanks Gian. I have set task.replicas = 1 and everything seems to be working fine.

Now that I think about it, maybe I didn’t need more than one replica. I have two data servers. If one fails, the overlord after a timeout should become aware of it and spawn a new task for the given partition on the other server.
I will probably loose only the data that the first peon (the one that was running on the crashed server) has already fetched from kafka but has not yet pushed to deep-storage. If I have taskDuration=PT1H I will loose at most an hour of data.

Is this assumption correct?

Regards,
Tommaso

With the Kafka indexing service, you actually won’t lose any data permanently. If a server fails, Druid will relaunch its tasks on another server and that will re-read the same data from Kafka. With replicas = 1 your risk is just that the data will be unavailable for querying for some time, until it can be re-read from Kafka.