How to call shutdown() on the Kafka firehose to force segment hand-off

Hello,

I cannot get the Realtime node to hand-off segments. I ingested not strictly chronological data into Druid with a Realtime node pulling from Kafka with rejectionPolicy “none”. From the documentation for this type:

none – All events are accepted. Never hands off data unless shutdown() is called on the configured firehose.

How can I manually initiate this shutdown? I issued kill -15 against the Realtime node process and only see:

2016-03-29T18:21:52,696 INFO [Thread-45] com.metamx.common.lifecycle.Lifecycle - Running shutdown hook
2016-03-29T18:21:52,747 INFO [Thread-45] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.stop()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@472b84fd].
2016-03-29T18:21:52,747 INFO [Thread-45] io.druid.server.coordination.AbstractDataSegmentAnnouncer - Stopping class io.druid.server.coordination.BatchDataSegmentAnnouncer with config[io.druid.server.initialization.ZkPathsConfig@58d3f4be]
2016-03-29T18:21:52,748 INFO [Thread-45] io.druid.curator.announcement.Announcer - unannouncing [/druid/announcements/l:8084]

No hand-off is happening and the Realtime node actually stays up.

Thank you,

Regards,

/David

David, try following http://druid.io/docs/0.9.0-rc3/tutorials/tutorial-kafka.html

RejectionPolicy ‘none’ should never be used in production with realtime nodes.

Hello Fangjin,

Thank you for your reply! I realise tranquility is the better option and rejectionPolicy “none” is not suited for production, however, this is just a POC.

All data has already been ingested into the basePersistDirectory, I can query the data and now I “just” want to trigger the segment hand-off by initiating a firehose shut-down as indicated in the documentation. Is there a way to force this?

Our deep data store is on GCS so I loaded 1 month worth of data into Kafka via a Mapper job - which is why is un-ordered. I have not had a chance to run the Hadoop batch Druid indexer on GCS yes since it’s a relatively new feature.

Final remark I initially chose “rejectionPolicy”: “messageTime” with “windowPeriod”: “PT1M” since I figured - given 1 month of data no 2 events will be further than 1 month apart, however, I must misinterpret the setting since almost all events were thrown away.

Thanks for your patience,

Regards,

David

You can’t trigger segment handoff in the sense you are imagining. Try to follow that tutorial. With what you are doing now, you won’t ever be able to get handoff with rejectionPolicy none.

Ok, I get it.

I will test the tranquility approach from the tutorial you linked - with “messageTime” rejection policy on time-ordered data. This should work to benchmark Kafka ingestion performance and hand-off something to the historical node.

Regards,

David

You can also look at http://imply.io/docs/latest/ingestion-streams for info about how to benchmark streaming ingest. There’s a script that will generate events.