Pull data from two different kafka topics and push it to one datasource

Hi, I’m looking for a way to use 1 datasource for 2 kafka topics using tranquility.

I have 2 tranquility process which have same schema.

tranquility1: topic1 -> datasource

tranquility2: topic2 -> datasource

Everything goes fine until the data is handed off to deep storage, this error message occurs.

17-12-14T11:40:00.000Z] with the same version [2017-12-14T11:33:48.331Z]}

java.lang.UnsupportedOperationException: Cannot add overlapping segments [2017-12-14T11:00:00.000Z/2017-12-14T12:00:00.000Z and 2017-12-14T11:39:00.000Z/2017-12-14T11:40:00.000Z] with the same version [2017-12-14T11:33:48.331Z]

at io.druid.timeline.VersionedIntervalTimeline.addAtKey(VersionedIntervalTimeline.java:357) ~[druid-common-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.timeline.VersionedIntervalTimeline.add(VersionedIntervalTimeline.java:279) ~[druid-common-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.timeline.VersionedIntervalTimeline.add(VersionedIntervalTimeline.java:109) ~[druid-common-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.server.coordinator.helper.DruidCoordinatorCleanupOvershadowed.run(DruidCoordinatorCleanupOvershadowed.java:71) ~[druid-server-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.server.coordinator.DruidCoordinator$CoordinatorRunnable.run(DruidCoordinator.java:703) [druid-server-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.server.coordinator.DruidCoordinator$5.call(DruidCoordinator.java:585) [druid-server-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at io.druid.server.coordinator.DruidCoordinator$5.call(DruidCoordinator.java:578) [druid-server-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]

at com.metamx.common.concurrent.ScheduledExecutors$2.run(ScheduledExecutors.java:99) [java-util-0.27.10.jar:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_141]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_141]

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_141]

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_141]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_141]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_141]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]

``

I found out that only one of them succeeded in saving the segment to deep storage(hdfs) but the other failed.

Is there any way I can manage the segment version to save both of them?

Hi,

each Kafka optic should be mapped to a Druid dataSource. You can probably merge two topic stream before ingesting to Druid, or you can have two different dataSources for each topic and use 'union dataSource

2017년 12월 15일 (금) 오전 10:42, 오지연 dhwldus9269@gmail.com님이 작성:

Sorry, the email was accidentally sent.

You can probably merge two topic streams before ingesting to Druid, or you can have two different dataSources for each topic and use ‘union dataSource (http://druid.io/docs/latest/querying/datasource.html#union-data-source)’ at query time.

Jihoon

2017년 12월 15일 (금) 오전 11:39, Jihoon Son ghoonson@gmail.com님이 작성:

I see. Guess I have to rely on union dataSource.
Thanks for the reply!

2017년 12월 15일 금요일 오전 11시 41분 4초 UTC+9, Jihoon Son 님의 말:

Could you tell me where I can find that dataSource is mapped to Kafka topic?

2017년 12월 15일 금요일 오전 11시 40분 2초 UTC+9, Jihoon Son 님의 말:

Oh, sorry. I was confused with the Kafka indexing service. Each topic should be mapped to a dataSource in Kafka indexing service.
You should be able to ingest from 2 topics to a single dataSource.

java.lang.UnsupportedOperationException: Cannot add overlapping segments [2017-12-14T11:00:00.000Z/2017-12-14T12:00:00.000Z and 2017-12-14T11:39:00.000Z/2017-12-14T11:40:00.000Z] with the same version [2017-12-14T11:33:48.331Z]

This error message you posted above is saying that your cluster is in a very weird state. The error occurred in the coordinator when it cleans up unused segments (not when tasks hand off segments to deep storage). It means, there are at least two segments in the cluster with the same version but with overlapping and non-identical intervals.

This should not happen if you’re using only one tranquility Kafka instance because it assigns different shardSpecs to the tasks which result in segments of different versions. But, this is not guaranteed if you’re using two or more tranquility instances. Would you check it please?

Please let me know if this helps.

Thanks,

Jihoon

2017년 12월 18일 (월) 오전 11:08, 오지연 dhwldus9269@gmail.com님이 작성:

I tested again with 2 tranquility instances

tranquility1: topic1 -> datasource1

tranquility2: topic2 -> datasource1

This time, there were no error message in the coordinator log.

segmentGranurality is’hour’, intermediatePersistPeriod is ‘5M’ and windowPeriod ‘10M’
Before 12:00, both data in topic1 and topic2 had results when I query.

After 12:00, only topic1 had results but topic2 did not.

trnaquility2 is keep pulling data from topic2 but I have no idea where the data is going

I’ve checked the deep storage, there was 1 segment with 2 partitions.

The two partitions are from tranquility1 topic1 data.

Am I missing any setting?

2017년 12월 19일 화요일 오전 10시 37분 38초 UTC+9, Jihoon Son 님의 말:

Sorry about the above answer, my query was wrong.
It found out to be working fine.

Thank you for your advise Jihoon!

2017년 12월 19일 화요일 오후 12시 41분 28초 UTC+9, 오지연 님의 말:

Glad to hear that it works!

2017년 12월 19일 (화) 오후 2:14, 오지연 dhwldus9269@gmail.com님이 작성: