Ingesting hdfs segment between datasources

Hi Druid Users and Developers,
I have hdfs segment as pushed by druid already stored in datasource named source_datasource, and want to ingest them to data source named target_datasource. I prepared json for this:
{
“type” : “index_hadoop”,
“hadoopDependencyCoordinates” : [“org.apache.hadoop:hadoop-client:2.4.0”],
“spec” : {
“dataSchema”: {
“dataSource”: “target_datasource”,
“parser”: {
“type”: “???”,
“dimensionsSpec”: {
“dimensions”: [],
“dimensionExclusions”: [],
“spatialDimensions”: []
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “rows”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “SIX_HOUR”,
“queryGranularity”: “NONE”,
“intervals”: [ “2016-10-11/2016-10-12” ]
}
},
“ioConfig”: {
“type” : “hadoop”,
“inputSpec” : {
“type”: “dataSource”,
“ingestionSpec” : {
“dataSource” : “source_datasource”,
“intervals”: [ “2016-10-11/2016-10-12” ],
“segments” : [“2016-10-11T06:00:00.000/2016-10-11T12:00:00.000”]
}
}
},
“tuningConfig” : {
“type” : “hadoop”
}
}
}
**
My questions:**1. What should I should I wirte in parser type in dataScheme?
2. Segments strings are id column values in druid_segments table of mysql. Correct?
3. If the task is correctly executed, then it will overwirte existing segments in target_datasource and only those that overlap those specified in ioConfig. Other segments fitted in the intervals of granularitySpec, and not overlapping with the segments specified in ioConfid will stay intact. Right?
4. And I want it to curl it to overlord via **curl -H “Content-Type: application/json” -X POST -d @above.json http://localhost:19083/druid/indexer/v1/task ? (**the type in above.json is index_hadoop)

  1. What should I should I wirte in parser type in dataScheme?
    Parser type doesn’t matter, you can set it to “type”:“noop” if you’d like.
  2. Segments strings are id column values in druid_segments table of mysql. Correct?
    No, the list for ‘segments’ is a list of segment descriptors which can be obtained by calling GET /druid/coordinator/v1/metadata/datasources/{dataSource}/segments?full, or if you have a specific interval you want to get the descriptor for, by calling POST on that endpoint and passing in a list of intervals in the body, for example: [“2016-01-01T00:00:00.000Z/2017-01-01T00:00:00.000Z”]. In SQL, this is the ‘payload’ column in the segments table.
    Here is an example of what a segment descriptor looks like:
    {“dataSource”:“canned-3”,“interval”:“2016-05-12T00:00:00.000Z/2016-05-13T00:00:00.000Z”,“version”:“2016-10-14T18:04:08.546Z”,“loadSpec”:{“type”:“local”,“path”:"/druid/localStorage/canned-3/2016-05-12T00:00:00.000Z_2016-05-13T00:00:00.000Z/2016-10-14T18:04:08.546Z/0/index.zip"},“dimensions”:“feed,service”,“metrics”:“count,sumValue,maxValue,minValue”,“shardSpec”:{“type”:“none”},“binaryVersion”:9,“size”:3896,“identifier”:“canned-3_2016-05-12T00:00:00.000Z_2016-05-13T00:00:00.000Z_2016-10-14T18:04:08.546Z”}

If the task is correctly executed, then it will overwirte existing segments in target_datasource and only those that overlap those specified in ioConfig. Other segments fitted in the intervals of granularitySpec, and not overlapping with the segments specified in ioConfid will stay intact. Right?
Yes that is correct. The existing segments are not actually overwritten but are overshadowed by the newer version generated by the task.

  1. And I want it to curl it to overlord via **curl -H “Content-Type: application/json” -X POST -d @above.json http://localhost:19083/druid/indexer/v1/task ? (**the type in above.json is index_hadoop)

Yup, that should work.

Thanks David for answering.
So to finish this post… so in a payload column a value can be the object like above. And segment descriptor is the value corresponding to “interval”. So in the example you gave it is : **
“2016-05-12T00:00:****00.000Z/2016-05-13T00:00:00.**000Z" and it would go to :

**“segments” : [”******2016-05-12T00:00:****00.000Z/2016-05-13T00:00:00.000Z"]

?

No, segment descriptor is the value corresponding to “segments”. So your reindex spec might look like this (taking from the example on http://druid.io/docs/latest/ingestion/update-existing-data.html):

"ioConfig" : {
  "type" : "hadoop",
  "inputSpec" : {
    "type" : "dataSource",
    "ingestionSpec" : {
      "dataSource": "canned-3",
      "intervals": ["2016-05-12T00:00:00.000Z/2016-05-13T00:00:00.000Z"],
      "segments": [{"dataSource":"canned-3","

interval":“2016-05-12T00:00:00.000Z/2016-05-13T00:00:00.000Z”,“version”:“2016-10-14T18:04:08.546Z”,“loadSpec”:{“type”:“local”,“path”:"/druid/localStorage/canned-3/2016-05-12T00:00:00.000Z_2016-05-13T00:00:00.000Z/2016-10-14T18:04:08.546Z/0/index.zip"},“dimensions”:“feed,service”,“metrics”:“count,sumValue,maxValue,minValue”,“shardSpec”:{“type”:“none”},“binaryVersion”:9,“size”:3896,“identifier”:“canned-3_2016-05-12T00:00:00.000Z_2016-05-13T00:00:00.000Z_2016-10-14T18:04:08.546Z”}]

    }
  },
  ...
}

The ‘segments’ field does not need to be specified, but as described on the page in the above link, it prevents changes made concurrently with the task execution causing the task to fail.

Hi David,
I have already the pushed segment in prod_deep_sme_backup_events datasource in hdfs. And I want this segment ingested in another datasource, here named prod_deep_sme_events.
I use the following json:
{
“type” : “index_hadoop”,
“hadoopDependencyCoordinates” : [“org.apache.hadoop:hadoop-client:2.4.0”],
“spec” : {
“dataSchema”: {
“dataSource”: “prod_deep_sme_events”,
“metricsSpec”: [
{
“type”: “count”,
“name”: “rows”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “SIX_HOUR”,
“queryGranularity”: “NONE”,
“intervals”: [ “2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z” ]
}
},
“ioConfig”: {
“type” : “hadoop”,
“inputSpec” : {
“type”: “dataSource”,
“ingestionSpec” : {
“dataSource” : “prod_deep_sme_backup_events”,
“intervals”: [ “2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z” ],
“segments” : [{“dataSource”:“prod_deep_sme_backup_events”,“interval”:“2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z”,“version”:“2017-01-16T00:00:00.000Z”,“loadSpec”:{“type”:“hdfs”,“path”:"/user/druid/prod_deep_sme_backup_events/20170116T000000.000Z_20170116T060000.000Z/2017-01-16T00_00_00.000Z/0/index.zip"},“dimensions”:“clientId,data.audio.autoplay,data.audio.controls,data.audio.currentsrc,data.audio.currenttime,data.audio.defaultmuted,data.audio.defaultplaybackrate,data.audio.duration,data.audio.ended,data.audio.error,data.audio.id,data.audio.loop,data.audio.muted,data.audio.paused,data.audio.playbackrate,data.audio.playingtime,data.audio.preload,data.audio.seeking,data.audio.src,data.audio.volume,data.dmascript.releasedate,data.dmascript.version,data.event.localtime.day,data.event.localtime.hour,data.event.localtime.minute,data.event.localtime.month,data.event.localtime.monthno,data.event.localtime.utcoffset,data.event.localtime.week,data.event.localtime.weekday,data.event.localtime.weekdayno,data.event.localtime.year,data.event.pingseq,data.event.seq,data.event.type,data.page.domain,data.page.hash,data.page.host,data.page.hostname,data.page.href,data.page.loadtime,data.page.path,data.page.protocol,data.page.query.page,data.page.query.promo,data.page.query.ref,data.page.query.src,data.page.query.utm_campaign,data.page.query.utm_content,data.page.query.utm_medium,data.page.query.utm_source,data.page.query.utm_term,data.page.search,data.page.title,data.referrer.domain,data.referrer.hash,data.referrer.host,data.referrer.hostname,data.referrer.href,data.referrer.path,data.referrer.port,data.referrer.protocol,data.referrer.query.page,data.referrer.query.ref,data.referrer.query.src,data.referrer.query.utm_campaign,data.referrer.query.utm_content,data.referrer.query.utm_medium,data.referrer.query.utm_source,data.referrer.query.utm_term,data.referrer.search,data.sme.article.bloger,data.sme.article.blogername,data.sme.article.char_count,data.sme.article.free_char_count,data.sme.article.id,data.sme.article.keyword.count,data.sme.article.keyword.tags,data.sme.article.lock,data.sme.article.name,data.sme.article.path,data.sme.article.published.date,data.sme.article.published.day,data.sme.article.published.fraction,data.sme.article.published.hour,data.sme.article.published.isoweek,data.sme.article.published.isoweekday,data.sme.article.published.millisecond,data.sme.article.published.minute,data.sme.article.published.month,data.sme.article.published.monthno,data.sme.article.published.second,data.sme.article.published.utcoffset,data.sme.article.published.week,data.sme.article.published.weekday,data.sme.article.published.weekdayno,data.sme.article.published.year,data.sme.article.template,data.sme.author.count,data.sme.author.name,data.sme.bar.button.class,data.sme.bar.button.content,data.sme.bar.button.path,data.sme.bar.type,data.sme.bloger_list,data.sme.debug.advert-load.action,data.sme.debug.advert-load.name,data.sme.debug.advert-load.time,data.sme.debug.advert-load.version,data.sme.device.layout,data.sme.logo.class,data.sme.nav.button.class,data.sme.nav.button.content,data.sme.nav.button.path,data.sme.nav.type,data.sme.page.ad-blocker,data.sme.page.aim-nodes,data.sme.page.body.content,data.sme.page.body.path,data.sme.page.button.article-id,data.sme.page.button.author-id,data.sme.page.button.blogername,data.sme.page.button.chart-period,data.sme.page.button.class,data.sme.page.button.content,data.sme.page.button.path,data.sme.page.button.pictograms,data.sme.page.button.region,data.sme.page.button.tags,data.sme.page.button.title,data.sme.page.department,data.sme.page.domain,data.sme.page.hostname,data.sme.page.layout,data.sme.page.locality,data.sme.page.path,data.sme.page.redaction-sub-type,data.sme.page.redaction-type,data.sme.page.search,data.sme.page.slug,data.sme.page.technology,data.sme.page.title,data.sme.page.topic-id,data.sme.page.type,data.sme.page.version.deployment-date,data.sme.page.version.project-name,data.sme.page.version.project-version,data.sme.page.widget.display,data.sme.user.ad-blocker,data.sme.user.ad_blocker.active,data.sme.user.piano.content-unlocked,data.sme.user.piano.e-mail,data.sme.user.piano.has-access,data.sme.user.piano.id,data.sme.user.piano.is-logged,data.sme.user.piano.remaining-days,data.sme.user.piano.subscriber,data.sme.window.location.parameter.article-id,data.sme.window.location.parameter.page,data.sme.window.location.parameter.ref,data.sme.window.location.parameter.src,data.smeinit,data.user.attention.active,data.user.attention.idle,data.user.attention.total,data.user.device.agent,data.user.device.headers.user-agent,data.user.device.screen.availheight,data.user.device.screen.availwidth,data.user.device.screen.height,data.user.device.screen.orientation,data.user.device.screen.width,data.user.id.deepcookie,data.user.id.ip,data.user.id.session,data.video.autoplay,data.video.controls,data.video.currentsrc,data.video.currenttime,data.video.defaultmuted,data.video.defaultplaybackrate,data.video.duration,data.video.ended,data.video.error,data.video.height,data.video.id,data.video.loop,data.video.muted,data.video.paused,data.video.playbackrate,data.video.playingtime,data.video.poster,data.video.preload,data.video.seeking,data.video.src,data.video.videoheight,data.video.videowidth,data.video.volume,data.video.width,trackerId”,“metrics”:“rows”,“shardSpec”:{“type”:“none”},“binaryVersion”:9,“size”:182760927,“identifier”:“prod_deep_sme_backup_events_2017-01-16T00:00:00.000Z_2017-01-16T06:00:00.000Z_2017-01-16T00:00:00.000Z”}]}
}
},
“tuningConfig” : {
“type” : “hadoop”
}
}
}

Then I run it via:
** curl -H "Content-Type: application/json" -X POST -d @above.json http://localhost:19083/druid/indexer/v1/task {"task":"index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z"}**After a while when I query the status of the task: ** curl http://localhost:19083/druid/indexer/v1/task/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z/status
{“task”:“index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z”,“status”:{“id”:“index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z”,“status”:“FAILED”,“duration”:1749}}**The log from overlord:
*2017-01-18T22:19:27,398 WARN [qtp681564936-44] io.druid.segment.indexing.DataSchema - No parser or parseSpec has been specified
2017-01-18T22:19:27,399 INFO [qtp681564936-44] io.druid.indexing.overlord.HeapMemoryTaskStorage - Inserting task index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z with status: TaskStatus{id=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, status=RUNNING, duration=-1}
2017-01-18T22:19:27,399 INFO [TaskQueue-Manager] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z]: LockTryAcquireAction{interval=2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z}
2017-01-18T22:19:27,399 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskLockbox - Created new TaskLockPosse: TaskLockPosse{taskLock=TaskLock{groupId=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, dataSource=prod_deep_sme_events, interval=2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z, version=2017-01-18T22:19:27.399Z}, taskIds=}
2017-01-18T22:19:27,399 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskLockbox - Added task[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z] to TaskLock[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z]
2017-01-18T22:19:27,399 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z
2017-01-18T22:19:27,399 INFO [pool-253-thread-4] io.druid.indexing.overlord.ForkingTaskRunner - Running command: java -cp /etc/druid/defaults::/usr/lib/druid/0.8.0/lib/druid-services-0.8.0-selfcontained.jar:/etc/druid/overlord:/etc/hadoop/conf -Ddruid.indexer.runner.javaOpts="-server -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC" -Ddruid.metadata.storage.connector.password= -Ddruid.indexer.fork.property.druid.processing.numThreads=4 -Duser.timezone=UTC -Dfile.encoding.pkg=sun.io -Ddruid.storage.storageDirectory=hdfs:///user/druid -Ddruid.selectors.indexing.serviceName=overlord -Ddruid.indexer.queue.startDelay=PT0M -Ddruid.metadata.storage.connector.createTables=true -Ddruid.port=19083 -Ddruid.indexer.fork.property.hadoop.mapred.job.queue.name=druid-indexing -Ddruid.worker.capacity=4 -Ddruid.extensions.searchCurrentClassloader=false -Ddruid.service=overlord -Ddruid.metadata.storage.connector.user=root -Ddruid.metadata.storage.type=mysql -Ddruid.indexer.fork.property.hadoop.mapreduce.job.queuename=druid-indexing -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://web.advertine.com:3306/druid -Djava.io.tmpdir=/tmp -Ddruid.zk.service.host=kafka01:2181/kafka081 -Ddruid.extensions.coordinates=[“io.druid.extensions:mysql-metadata-storage”] -Dfile.encoding=UTF-8 -Ddruid.storage.type=hdfs -Ddruid.indexer.fork.property.druid.computation.buffer.size=100000000 -Ddruid.processing.numThreads=4 -Dhadoop.mapred.job.queue.name=druid-indexing -Dhadoop.mapreduce.job.queuename=druid-indexing -Ddruid.computation.buffer.size=100000000 -Ddruid.host=druid01.advertine.com -Ddruid.port=8100 io.druid.cli.Main internal peon /tmp/persistent/task/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z/df924824-f2d8-400a-8e9f-0653a1519753/task.json /tmp/persistent/task/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z/df924824-f2d8-400a-8e9f-0653a1519753/status.json
2017-01-18T22:19:27,400 INFO [pool-253-thread-4] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z output to: /tmp/persistent/task/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z/df924824-f2d8-400a-8e9f-0653a1519753/log
2017-01-18T22:19:31,615 WARN [qtp681564936-29] io.druid.segment.indexing.DataSchema - No parser or parseSpec has been specified
2017-01-18T22:19:31,615 INFO [qtp681564936-29] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z]: LockTryAcquireAction{interval=2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z}
2017-01-18T22:19:31,615 INFO [qtp681564936-29] io.druid.indexing.overlord.TaskLockbox - Task[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z] already present in TaskLock[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z]
2017-01-18T22:19:35,236 INFO [pool-253-thread-4] io.druid.indexing.overlord.ForkingTaskRunner - Process exited with status[0] for task: index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: log/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z.log
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.ForkingTaskRunner - Removing temporary directory: /tmp/persistent/task/index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z/df924824-f2d8-400a-8e9f-0653a1519753
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.ForkingTaskRunner - Ignoring request to cancel unknown task: index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.HeapMemoryTaskStorage - Updating task index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z to status: TaskStatus{id=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, status=FAILED, duration=1749}
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.TaskLockbox - Removing task[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z] from TaskLock[index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z]
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.TaskLockbox - TaskLock is now empty: TaskLock{groupId=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, dataSource=prod_deep_sme_events, interval=2017-01-16T00:00:00.000Z/2017-01-16T06:00:00.000Z, version=2017-01-18T22:19:27.399Z}
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.TaskQueue - Task done: HadoopIndexTask{id=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, type=index_hadoop, dataSource=prod_deep_sme_events}
2017-01-18T22:19:35,237 INFO [pool-253-thread-4] io.druid.indexing.overlord.TaskQueue - Task FAILED: HadoopIndexTask{id=index_hadoop_prod_deep_sme_events_2017-01-18T22:19:27.399Z, type=index_hadoop, dataSource=prod_deep_sme_events} (1749 run duration)*Do you see any problem with above.json? I cannot look at log for further information in /tmp as it is immediately erased.
Thank you in advance!!!
Best,
Pawel