Hadoop index tasks with Azure Blob Storage

Hi, All,

we are in the process of evaluating Druid and we have a cluster of 4 machines (1 Coord/Overlord/Zk, 1 Broker, 2 Data nodes with Historical/MMnager/Tranquility)

Our requirement is to use Azure Blob Storage as our Deep Storage and this works well for basic realtime ingestion.

I know the Azure extensions are community extensions and not core, and I am not sure if this is the right place to post these questions, but it would be great if we could get some help on this, as we are stuck…

But we are having trouble doing 2 things:

Issue #1: Re-index our segments which are already present in our Deep Storage.

We are trying to rollup our data from queryGranularity=MINUTE to queryGranularity=HOUR, in order to compress our existing data.

But when we try to run this as a hadoop index task (we are using the hadoop client which was packaged with the imply.io distribution), we are getting the following error:

2016-05-27T10:09:02,762 INFO [LocalJobRunner Map Task Executor #0] io.druid.indexer.hadoop.DatasourceRecordReader - Getting storage path for segment [moosend_events_2016-05-27T09:55:00.000Z_2016-05-27T10:00:00.000Z_2016-05-27T09:57:40.825Z]

2016-05-27T10:09:02,762 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - Starting flush of map output

2016-05-27T10:09:02,764 INFO [Thread-94] org.apache.hadoop.mapred.LocalJobRunner - map task executor complete.

2016-05-27T10:09:02,764 WARN [Thread-94] org.apache.hadoop.mapred.LocalJobRunner - job_local1451395154_0001

java.lang.Exception: com.metamx.common.IAE: Cannot figure out loadSpec {“type”:“azure”,“containerName”:“data”,“blobPath”:“events/2016-05-27T09:55:00.000Z_2016-05-27T10:00:00.000Z/2016-05-27T09:57:40.825Z/1/index.zip”}

** at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) ~[hadoop-mapreduce-client-common-2.3.0.jar:?]**

** at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) [hadoop-mapreduce-client-common-2.3.0.jar:?]**

Caused by: com.metamx.common.IAE: Cannot figure out loadSpec {“type”:“azure”,“containerName”:“data”,“blobPath”:“moosend_events/2016-05-27T09:55:00.000Z_2016-05-27T10:00:00.000Z/2016-05-27T09:57:40.825Z/1/index.zip”}

** at io.druid.indexer.JobHelper.getURIFromSegment(JobHelper.java:715) ~[druid-indexing-hadoop-0.9.0.jar:0.9.0]**

at io.druid.indexer.hadoop.DatasourceRecordReader$1.apply(DatasourceRecordReader.java:82) ~[druid-indexing-hadoop-0.9.0.jar:0.9.0]

at io.druid.indexer.hadoop.DatasourceRecordReader$1.apply(DatasourceRecordReader.java:76) ~[druid-indexing-hadoop-0.9.0.jar:0.9.0]

at com.google.common.collect.Lists$TransformingRandomAccessList$1.transform(Lists.java:582) ~[guava-16.0.1.jar:?]

at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[guava-16.0.1.jar:?]

at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[guava-16.0.1.jar:?]

at com.metamx.common.guava.BaseSequence.makeYielder(BaseSequence.java:104) ~[java-util-0.27.7.jar:?]

at com.metamx.common.guava.BaseSequence.toYielder(BaseSequence.java:81) ~[java-util-0.27.7.jar:?]

at com.metamx.common.guava.ConcatSequence.toYielder(ConcatSequence.java:58) ~[java-util-0.27.7.jar:?]

at io.druid.segment.realtime.firehose.IngestSegmentFirehose.(IngestSegmentFirehose.java:171) ~[druid-server-0.9.0.jar:0.9.0]

at io.druid.indexer.hadoop.DatasourceRecordReader.initialize(DatasourceRecordReader.java:114) ~[druid-indexing-hadoop-0.9.0.jar:0.9.0]

at org.apache.hadoop.mapreduce.lib.input.DelegatingRecordReader.initialize(DelegatingRecordReader.java:84) ~[hadoop-mapreduce-client-core-2.3.0.jar:?]

at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:525) ~[hadoop-mapreduce-client-core-2.3.0.jar:?]

at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) ~[hadoop-mapreduce-client-core-2.3.0.jar:?]

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) ~[hadoop-mapreduce-client-core-2.3.0.jar:?]

at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) ~[hadoop-mapreduce-client-common-2.3.0.jar:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) ~[?:1.7.0_101]

at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_101]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ~[?:1.7.0_101]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ~[?:1.7.0_101]

at java.lang.Thread.run(Thread.java:745) ~[?:1.7.0_101]

2016-05-27T10:09:03,515 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_local1451395154_0001 running in uber mode : false

2016-05-27T10:09:03,517 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 0% reduce 0%

2016-05-27T10:09:03,520 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_local1451395154_0001 failed with state FAILED due to: NA

2016-05-27T10:09:03,529 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Counters: 0

2016-05-27T10:09:03,529 ERROR [task-runner-0-priority-0] io.druid.indexer.DeterminePartitionsJob - Job failed: job_local1451395154_0001

2016-05-27T10:09:03,529 INFO [task-runner-0-priority-0] io.druid.indexer.JobHelper - Deleting path[/datadrive/druid/hadoop-tmp/events/2016-05-27T100854.262Z/b5ed9bbf7390496baad3fc86f165bb3f]

This is our task config.

{

“type” : “index_hadoop”,

“spec” : {

“ioConfig” : {

“type” : “hadoop”,

“inputSpec” : {

"type" : "dataSource",

"ingestionSpec" : {

  "dataSource": "events",

  "intervals": ["2016-06-26/2016-06-27"]

}

}

},

“dataSchema” : {

“dataSource” : “events”,

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “HOUR”,

“queryGranularity” : “HOUR”,

“intervals” : [“2016-05-10/2016-05-30”]

},

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“dimensionsSpec” : {

“dimensions”: ,

“dimensionExclusions”: [“timestamp”]

},

“timestampSpec” : {

“format” : “auto”,

“column” : “timestamp”

}

}

},

“metricsSpec”: [{ “type”: “count”, “name”: “count” }]

},

“tuningConfig” : {

“type” : “hadoop”,

“partitionsSpec” : {

“type” : “dimension”,

“targetPartitionSize” : 5000000,

“partitionDimension” : “userId”

},

“jobProperties” : {}

}

}

}

We have the following properties in common.properties:

druid.extensions.directory=dist/druid/extensions

druid.extensions.hadoopDependenciesDir=dist/druid/hadoop-dependencies

druid.extensions.loadList=[“mysql-metadata-storage”, “druid-azure-extensions”]

In dist/druid/extensions, we have created a directory druid-azure-extensions and we have placed azure-storage-4.2.0.jar and druid-azure-extensions-0.9.0.jar in there.

This enables Druid to use Azure as its deep storage and it works fine.

However, the hadoop job does not seem to work.

Inside dist/druid/hadoop-dependencies we have placed azure-storage-4.2.0.jar and druid-azure-extensions-0.9.0.jar, but we still get the above error.

Do we have to change anything in our configuration or the dataSource ioConfig -> inputSpec does not generally work for Azure?

Issue #2

We would like to be able to run a hybrid indexing model as suggested here: http://druid.io/docs/latest/tutorials/ingestion.html#hybrid-batch-streaming. So, we are saving each event to an AppendBlob (one AppendBlob per minute) in Azure Storage following the format y=XXXX/m=XX/d=XX/H=XX/M=XX as suggested in the “granularity” inputSpec for batch index tasks.

However, in the extensions for Azure http://druid.io/docs/latest/development/extensions-contrib/azure.html there is only a StaticAzureBlobStoreFirehose which only allows hardcoding of segment paths.

Is the “granularity” inputSpec supported in Azure, and if not, is there a way to get the segments in an interval without hardcoding the segment paths?

Thank you for the support and apologies if this is not the right place for the questions.

Petros

Hey Petros,

The Azure extension is not officially supported but I’ll try my best to answer your questions.

About issue #1, it looks like the Azure deep storage extension was never really hooked up with the Hadoop indexer, and that’s where the “can’t figure out” error is coming from. If you can tolerate running the reindexing on your Druid machines rather than a Hadoop cluster, you might be able to use the IngestSegmentFirehose (http://druid.io/docs/latest/ingestion/firehose.html#ingestsegmentfirehose) with a basic “index” task (http://druid.io/docs/latest/ingestion/tasks.html#index-task). I think this should work in principle, although that mechanism does not scale as well as hadoop based indexing, so you might have issues if you have a lot of data.

About issue #2, the “granularity” inputSpec is a Hadoop indexing thing and so you’re going to run against the same Azure-extension-not-hooked-up-to-Hadoop issue as in #1. Similar to #1, if you don’t have that much data, you can probably get away with the basic non-hadoop batch index mechanism.

I saw Azure storage extension mentioned in http://druid.io/docs/latest/development/extensions-contrib/azure.html, but not in druid cluster http://druid.io/docs/latest/tutorials/cluster.html configure.

Anyone tried it and works?