Index old data

Hi, List

I tried to set window period to “P4Y”. Actually, I’m using data with timestamp column that has 2014 (a little old but sometimes I need to re-index data) to test my application using tranquility but I’m getting the error:

2016-06-02T14:22:30,429 INFO [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Running task: index_realtime_telecom_2014-08-19T11:55:00.000Z_0_0
2016-06-02T14:22:30,441 INFO [task-runner-0] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory$1@621295db]
2016-06-02T14:22:30,454 ERROR [task-runner-0] io.druid.indexing.common.task.RealtimeIndexTask - Exception aborted realtime processing[telecom]: {class=io.druid.indexing.common.task.RealtimeIndexTask, exceptionType=class java.lang.UnsupportedOperationException, exceptionMessage=Cannot convert to Duration as this period contains years and years vary in length}
java.lang.UnsupportedOperationException: Cannot convert to Duration as this period contains years and years vary in length
	at org.joda.time.Period.checkYearsAndMonths(Period.java:1570) ~[joda-time-2.8.2.jar:2.8.2]
	at org.joda.time.Period.toStandardDuration(Period.java:1549) ~[joda-time-2.8.2.jar:2.8.2]
	at io.druid.segment.realtime.plumber.RealtimePlumber.startPersistThread(RealtimePlumber.java:845) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.segment.realtime.plumber.RealtimePlumber.startJob(RealtimePlumber.java:209) ~[druid-server-0.8.3.jar:0.8.3]
	at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:302) [druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:285) [druid-indexing-service-0.8.3.jar:0.8.3]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:265) [druid-indexing-service-0.8.3.jar:0.8.3]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

Does anyone knows how to solve this? Do I need to do some extra configuration?

Hey Marcelo,

That should work with PT35040H rather than P4Y, since the windowPeriods need to be fixed duration.

BUT, Tranquility is not really designed to handle ingestion of historical data like this– you will have issues with handoff, as tasks will stay around for four years before exiting!

A better approach would be to use batch indexing (local batch indexing for small amounts of data, remote Hadoop cluster for large amounts of data), or, if you are brave, the new Kafka supervisor stuff in 0.9.1 (http://druid.io/docs/0.9.1-rc1/development/extensions-core/kafka-ingestion.html). 0.9.1 has not been released yet but there is a release candidate, 0.9.1-rc1 available that you can try out.

Hi, Gian.

Hey Marcelo,

You should be avoiding calling “close” on the sender (it’s not usable after that). From Scala the easiest way to integrate is to use the Tranquility Spark module (https://github.com/druid-io/tranquility/blob/master/docs/spark.md). It works with RDDs as well as DStreams.

From Java, you can at least do something similar to what it’s doing. Check out the source here:

https://github.com/druid-io/tranquility/blob/master/spark/src/main/scala/com/metamx/tranquility/spark/BeamRDD.scala