Druid doesn't work through fireshose.type=receiver

Hi there!

Actualy, I couldn’t get any other realtime tasks to work. But I need exactly fireshose.type=receiver (NOT tranquility).

I hope it just misconfiguration in my case.

Reproduce case

wget http://static.druid.io/artifacts/releases/druid-0.7.0-bin.tar.gz
tar -zxf druid-0.7.0-bin.tar.gz
mkdir druid-0.7.0/test
cd druid-0.7.0/test

copy files config.patch and realtime_task.json there from attachments

cp -r …/config ./
patch -p2 -d config < config.patch
java -server -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath …/lib/:config/_common:config/overlord io.druid.cli.Main server overlord > overlord.log 2>&1 &
java -server -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath …/lib/
:config/_common:config/middle io.druid.cli.Main server middleManager > middle.log 2>&1 &

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d @realtime_task.json localhost:8090/druid/indexer/v1/task

Error 500

HTTP ERROR: 500

Problem accessing /druid/indexer/v1/task. Reason:

    com.google.inject.ProvisionException: Guice provision errors:
  1. Must override the binding for NodeTypeConfig if you want a DruidServerMetadata.
    at io.druid.guice.StorageNodeModule.getMetadata(StorageNodeModule.java:62)
    at io.druid.guice.StorageNodeModule.getMetadata(StorageNodeModule.java:62)
    while locating io.druid.server.coordination.DruidServerMetadata
    for parameter 0 at io.druid.server.coordination.BatchDataSegmentAnnouncer.<init>(BatchDataSegmentAnnouncer.java:68)
    at io.druid.guice.AnnouncerModule.configure(AnnouncerModule.java:42)
    while locating io.druid.server.coordination.BatchDataSegmentAnnouncer
    at io.druid.guice.JsonConfigProvider.bind(JsonConfigProvider.java:112)
    at io.druid.guice.JsonConfigProvider.bind(JsonConfigProvider.java:112)
    while locating com.google.common.base.Supplier<io.druid.server.coordination.DataSegmentAnnouncerProvider>
    at io.druid.guice.JsonConfigProvider.bind(JsonConfigProvider.java:113)
    while locating io.druid.server.coordination.DataSegmentAnnouncerProvider
    while locating io.druid.server.coordination.DataSegmentAnnouncer

1 error


Powered by Jetty://

I am not sure whether my realtime_task.json is correct. But i got the same with tasks from example:

  • examples/indexing/wikipedia_kafka_realtime_task.json
  • examples/indexing/wikipedia_realtime_task.json

What am I doing wrong?
How to get to work this?

config.patch (1.66 KB)

realtime_task.json (1.84 KB)

You should be able to get the task to run if you remove the “plumber” from your ioConfig, change your rejectionPolicy from “test” to “none”, and change your parser from type “string” to type “map”.

FYI if you want to get the port of the task, you can use service discovery to find it. The service key you provide in the task json (in your case, “eventReceiverServiceName”) will be announced in service discovery. You can use curator-x-discovery to get it, or if you want to do it manually, list out the ZooKeeper znodes under /druid/discovery/eventReceiverServiceName and read their data. There should only be one, and it should have json data in it describing the task’s service.

Thaks again, Gian. It becomes clear now.

One more qestion: How to recover realtime data after stopping realtime indexing workers? Data weren’t pushed to deep storage (apparently period was too short) but i have some segments files /tmp/druid/task//work/persist////*

Can I make druid to use them again? Is it possible either to restart the same tasks or make new ones use this files?

Can I be sure that there are no lost events that weren’t managed to persist? (especially if i stop task with SIGTERM).

I mean whether it use a kind of journals to avoid such loss?

Hi Eugene,

There isn’t a simple way to make new tasks use those files. In general, you should avoid stopping tasks until they have actually handed off their data. Tranquility handles this through timed shutdown and task rollover. You could use a similar strategy on your own, without tranquility (look into the TimedShutoffFirehose). Another possibly useful strategy from tranquility is to have replicated tasks (N tasks that you send the same data to). Combining these strategies gives you a best effort system that many folks find acceptable in production.

But, if you want to totally ensure data integrity (e.g. in the case where data is outside of windowPeriod or in the case where all replicated tasks die) your best bet is to run some flavor of lambda architecture, where you send a copy of your data to storage like HDFS or S3 and then index it in batches. Data indexed in batch will automatically replace realtime data for the same time period, which makes it easier to build that sort of architecture. Fwiw, this is the approach we take in production (best-effort realtime through tranquility + batch jobs to pick up any late data).

Gian, please tell me more about implementation of lambda architecture. How can i be sure that realtime index tasks don’t replace segments (when it hand off ones) created by hadoop index task.
How to make historical node load new generated segment before shutdown realtime task with the same data?

The first part (realtime segments not replacing batch segments) is done today through the windowPeriod config for realtime. Generally you’d set windowPeriod to realtime something like 30 minutes, then delay your batch jobs by a couple of hours. If you’re doing all your indexing through the indexing service, it has locking built-in that will ensure that the jobs don’t overlap.

The second part (realtime segments staying around until historicals load them) is done automatically as part of the handoff process. Realtime workers monitor the historical nodes to figure out when it’s okay to stop serving their segments.

Hi Gian. Thanks for answers!

Honestly, I haven’t understood, which moment realtime index task put segment into deep storage. And how does it related with windowPeriod (according documentation it just filters events that is out of this window).

And my question is about how to synchronize realtime indexing task handing off with batch indexing task that way to get batch segment version is newer than realtime segment.

I thought in direction watch segments info in metadata storage (or zookeeper probably contain necessary info) to catch moment to launch batch task.

Hi Eugene,

The only configs that are really specific to handoff are the segment granularity, windowPeriod and the rejectionPolicy.
windowPeriod being the amount of lag time to allow events.
e.g If you have set the segmentGranularity to hour, windowPeriod to 10 mins and rejectionPolicy to serverTime.

then the handoff will occur after segmentGranularity + windowPeriod 1 hour and 10 mins.

http://druid.io/docs/latest/Plumber.html also has some docs about different rejectionPolicies.

Thanks, Nishant. I am surprised how could i miss this chapter! :slight_smile:
It a little bit confusing. In some ingest specs plumber is used, while tuningConfig in others.

Hi Eugene, anywhere you see any inconsistency in the docs, please let us know right away. We are constantly trying to clean up the docs.

Hi Eugene, the plumber docs are out of date. They were previously viable for 0.6.x.

In 0.7.x, the configuration should be in the tuning config and you should be able to follow http://druid.io/docs/latest/Realtime-ingestion.html