Loss of data on realtime node reboot

I have a
Druid 0.8.2 cluster with one real-time node ingesting data through a Kafka fire-hose.
Data arrive at regular intervals of 10 minutes. The following is a massaged
output of the command :

kafka-consumer-offset-checker.sh

–zookeeper zkservers_list --group ssol --topic SSOL.skc

where the
first field is the command execution timestamp, the second is the consumer Offset, the third is the logSize, the last the Lag

2016-02-04-11:43|3909986|3909986|0

2016-02-04-11:44|3909986|3909986|0

2016-02-04-11:45|3909986|3909986|0

2016-02-04-11:46|3909986|3909986|0

2016-02-04-11:47|3909986|3909986|0

2016-02-04-11:48|3909986|3909986|0

2016-02-04-11:49|3909986|3909986|0

2016-02-04-11:50|3909986|3909986|0

2016-02-04-11:51|3909986|3909986|0

2016-02-04-11:52|3909986|3909986|0

2016-02-04-11:53|3913584|3913584|0 <-- data arrived and were consumed

2016-02-04-11:54|3913584|3913584|0

2016-02-04-11:55|3913584|3913584|0

2016-02-04-11:56|3913584|3913584|0

2016-02-04-11:57|3913584|3913584|0

2016-02-04-11:58|3913584|3913584|0

2016-02-04-11:59|3913584|3913584|0

2016-02-04-12:00|3913584|3913584|0

2016-02-04-12:01|3913584|3913584|0

2016-02-04-12:02|3913584|3913584|0

2016-02-04-12:03|3917281|3917281|0 <-- data arrived and were consumed

2016-02-04-12:04|3917281|3917281|0

2016-02-04-12:05|3917281|3917281|0

2016-02-04-12:06|3917281|3917281|0

2016-02-04-12:07|3917281|3917281|0

2016-02-04-12:08|3917281|3917281|0

2016-02-04-12:09|3917281|3917281|0

2016-02-04-12:10|3917281|3917281|0

2016-02-04-12:11|3917281|3917281|0

2016-02-04-12:12|3917281|3917281|0

2016-02-04-12:13|3920791|3920791|0 <-- data arrived and were consumed

2016-02-04-12:14|3920791|3920791|0

2016-02-04-12:15|3920791|3920791|0

2016-02-04-12:16|3920791|3920791|0

2016-02-04-12:17|3920791|3920791|0

2016-02-04-12:18|3920791|3920791|0

2016-02-04-12:19|3920791|3920791|0

2016-02-04-12:20|3920791|3920791|0

2016-02-04-12:21|3920791|3920791|0

2016-02-04-12:22|3920791|3920791|0

2016-02-04-12:23|3923853|3923853|0
<-- data arrived and were consumed

2016-02-04-12:24|3923853|3923853|0

2016-02-04-12:25|3923853|3923853|0

2016-02-04-12:26|3923853|3923853|0

2016-02-04-12:27|3923853|3923853|0

2016-02-04-12:28|3923853|3923853|0

2016-02-04-12:29|3923853|3923853|0

2016-02-04-12:30|3923853|3923853|0

2016-02-04-12:31|3923853|3923853|0

2016-02-04-12:32|3923853|3923853|0

2016-02-04-12:33|3927422|3927422|0
<-- data arrived and were consumed

2016-02-04-12:34|3927422|3927422|0

2016-02-04-12:35|3927422|3927422|0

2016-02-04-12:36|3927422|3927422|0

2016-02-04-12:37|3927422|3927422|0

2016-02-04-12:38|3927422|3927422|0

2016-02-04-12:39|3927422|3927422|0

The realtime node firehose is
configured as follows:

“firehose”: {

            "type":

“kafka-0.8”,

            "consumerProps": {

“zookeeper.connect”:
“192.168.129.87:2181,192.168.129.89:2181,192.168.129.93:2181,192.168.129.94:2181,192.168.129.91:2181”,

“zookeeper.connection.timeout.ms”: “30000”,

“zookeeper.session.timeout.ms”: “30000”,

“zookeeper.sync.time.ms”: “5000”,

                "group.id":

“ssol”,

“fetch.message.max.bytes”: “1048586”,

“auto.offset.reset”: “smallest”,

                "auto.commit.enable":

“true”,

“auto.commit.interval.ms”: “60000”

            },

            "feed":

“SSOL.skc”

        }

moreover, we have:

“tuningConfig”: {

        "type":

“realtime”,

        "maxRowsInMemory":

500000,

        "intermediatePersistPeriod":

“PT1m”,

        "windowPeriod":

“PT180m”,

        "basePersistDirectory" :

“/data/tmp/druid/realtime”,

        "rejectionPolicy": {

            "type":

“serverTime”

        }

    }

Since the intermediatePersistPeriod is 1 minute, I would expect that if
I shut the realtime node in the middle of the data-feeding interval (let’s say
after five minutes data arrived) and then reboot-it, I do not experience any
loss of data (notice also that consumer offset is committed in Zookeeper each
60 secs).

It is not so.

In the attached Pivot snapshots you
can see the differences between querying a datasource injected via a realtime
node that was restarted (TEST_BED) , and another datasource served by another (undisturbed)
realtime node (OPS), to which a copy of the same data is sent through another
Kafka topic (and another consumer group).

This is the kafka-consumer-offset-checker.sh command massaged output in the use-case time interval:

2016-02-04-14:00|3956197|3956197|0

2016-02-04-14:01|3956197|3956197|0

2016-02-04-14:02|3956197|3956197|0

2016-02-04-14:03|3959690|3959690|0
<-- data arrived and were consumed

2016-02-04-14:04|3959690|3959690|0

2016-02-04-14:05|3959690|3959690|0

2016-02-04-14:06|3959690|3959690|0

2016-02-04-14:07|3959690|3959690|0 <-- realtime node shut

2016-02-04-14:08|3959690|3959690|0

2016-02-04-14:09|3959690|3959690|0

2016-02-04-14:10|3959690|3959690|0

2016-02-04-14:11|3959690|3959690|0

2016-02-04-14:12|3959690|3959690|0

2016-02-04-14:13|3959690|3963533|3843
<-- data arrived and were NOT consumed

2016-02-04-14:14|3959690|3963533|3843

2016-02-04-14:15|3959690|3963533|3843

2016-02-04-14:16|3959690|3963533|3843

2016-02-04-14:17|3959690|3963533|3843

2016-02-04-14:18|3959690|3963533|3843

2016-02-04-14:19|3959690|3963533|3843

2016-02-04-14:20|3963533|3963533|0 <-- realtime node boot. Data were then consumed

2016-02-04-14:21|3963533|3963533|0

2016-02-04-14:22|3963533|3963533|0

2016-02-04-14:23|3966954|3966954|0
<-- new data arrived and were
consumed

2016-02-04-14:24|3966954|3966954|0

2016-02-04-14:25|3966954|3966954|0

2016-02-04-14:26|3966954|3966954|0

2016-02-04-14:27|3966954|3966954|0

2016-02-04-14:28|3966954|3966954|0

Is this a behavior you would expect?

Is there a graceful way to shut a
realtime node?

How can I minimize the risk of data loss on realtime node HW failures?

Thanks,

Marco

Realtime nodes should not lose data if u have a message bus such as Kafka. It’ll be a gradual recovery instead.

There’s an entire talk on the recovery process here: https://www.youtube.com/watch?v=Dlqj34l2upk

I have watched carefully the presentation from Eric you suggested to me, and I would say that my example above is just the case Eric refers to as “Loosing the Process”.
Moreover, “Message committing” and “Reply of messages” it’s just what I expected the realtime node did in conjuction with Kafka (which relies on commit log and consumer offset).
But this does not seem to work in my use case above.
I am going to investigate deeper to eventually find some more info that could be useful for investigation.
Thanks again,
Marco

HI Fangjin,

I have some
more information. It seems that, as soon the realtime node is rebooted after a
shut, it consumes messages from Kafka, but it does not persist them on the
intermediate persistent storage. Persist operation is done only at the next data consumption,
but previously fetched data are not present (neither roll-up has been applied
as well).

This is the
way I went to the above conclusion; history of operations are as follows (the
trace is related to Kafka consumer offset as returned by Kafka utility; as said
previously new data arrive each ten minutes):

Id>Timestamp |Offset |LogSize|Lag

–|----------------|-------|-------|—

01|2016-02-10-10:07|6437853|6437853|0

02|2016-02-10-10:08|6437853|6437853|0

03|2016-02-10-10:09|6437853|6437853|0

04|2016-02-10-10:10|6437853|6437853|0

05|2016-02-10-10:11|6437853|6437853|0

06|2016-02-10-10:12|6437853|6437853|0

07|2016-02-10-10:13|6441045|6441045|0 <- New data arrived, and were consumed

08|2016-02-10-10:14|6441045|6441045|0

09|2016-02-10-10:15|6441045|6441045|0

10|2016-02-10-10:16|6441045|6441045|0

11|2016-02-10-10:17|6441045|6441045|0 <-
Realtime node process killed (-15)

12|2016-02-10-10:18|6441045|6441045|0

13|2016-02-10-10:19|6441045|6441045|0

14|2016-02-10-10:20|6441045|6441045|0

15|2016-02-10-10:21|6441045|6441045|0

16|2016-02-10-10:22|6441045|6441045|0

17|2016-02-10-10:23|6441045|6444673|3628
<- New data arrived, and were not consumed

18|2016-02-10-10:24|6441045|6444673|3628

19|2016-02-10-10:25|6441045|6444673|3628
<- Realtime node process started

20|2016-02-10-10:26|6441045|6444673|3628

21|2016-02-10-10:27|6444673|6444673|0 <- Data were consumed by realtime process

22|2016-02-10-10:28|6444673|6444673|0

23|2016-02-10-10:29|6444673|6444673|0

24|2016-02-10-10:30|6444673|6444673|0

25|2016-02-10-10:31|6444673|6444673|0

26|2016-02-10-10:32|6444673|6444673|0

27|2016-02-10-10:33|6448164|6448164|0 <- New data arrived, and were consumed

28|2016-02-10-10:34|6448164|6448164|0

29|2016-02-10-10:35|6448164|6448164|0

30|2016-02-10-10:36|6448164|6448164|0

I have got
a snapshot (via the ‘ls –larR’ command) of the content of the intermediate
persistent storage at the following times:

  • Before the realtime
    process was shut (at time of the event with ID 10 above)
  • After the
    realtime process went started and it consumed the data (at event with ID 24)
  • After new
    data arrived and were consumed (at event with ID 29)
    There is no
    trace of changes in the intermediate persistent storage until last snapshot.
    Going on and on, and fetching new data, the changes in that storage become again the
    same that are applied by another companion realtime node (undisturbed) working
    on another datasource but fetching the same data (on another topic) from Kafka.

You can
find attached the intermediate persistent storage snapshots I have talked
above, together with the one of the undisturbed realtime node.

This
behavior is bringing a lot of concern to me. Could you help me in understanding
if this could be a configuration problem or similar?

Thanks,

Marco

ev10.preSHUT (16 KB)

ev10.preSHUT (16 KB)

ev29.afterBOOT_NEWDATA (16.3 KB)

rtnode_UNDISTURBED (17.4 KB)

On each persist, a RT node commits the offset of the last message it persisted back to Kafka. If you lose the process and start it up again, RT nodes lose the data they have in memory and need to rereads that data from Kafka from the last offset it committed.

For configuration, I’d check the that “autooffset.reset” : “largest” is set, not “smallest”

FWIW, it might actually be easier to use a stream push model as described in the 0.9.0 docs, as you’ll be able to replicate streaming ingestion and have HA much easier.

I made the test using autooffset.reset : largest but the behavior does not change, data are lost as well.
Moreover, it seems that it skips the first time it should persist data to the intermediate storage, namely:

2016-02-12-10:39|7331936|7331936|0
2016-02-12-10:40|7331936|7331936|0
2016-02-12-10:41|7331936|7331936|0
2016-02-12-10:42|7331936|7331936|0 <- realtime process shut down
2016-02-12-10:43|7331936|7335533|3597
2016-02-12-10:44|7331936|7335533|3597
2016-02-12-10:45|7331936|7335533|3597
2016-02-12-10:46|7331936|7335533|3597
2016-02-12-10:47|7331936|7335533|3597
2016-02-12-10:48|7331936|7335533|3597
2016-02-12-10:49|7331936|7335533|3597
2016-02-12-10:50|7331936|7335533|3597 <- realtime process booted back
2016-02-12-10:51|7335533|7335533|0 <- It does not persist here, after consuming data (as when autooffset.reset was smallest) …
2016-02-12-10:52|7335533|7335533|0
2016-02-12-10:53|7339337|7339337|0 <- … but here, after new data have arrived …
2016-02-12-10:54|7339337|7339337|0
2016-02-12-10:55|7339337|7339337|0

In any case, I will save input data for a successive index restore via ETL indexing, in case of realtime node failures, and in the near future I will work to setup a push-mode pipeline.
Thanks for your help,
Marco

Marco, recovery takes some time, it isn’t immediate.

Please submit unit test reproducing failure. Given the volume of community support questions, it is extremely difficult for us to dig into issues without a unit test to showcase the behavior.

Hi Fangjin,
apologize for the delay in replying.
It should be easy to reproduce the problem with the stuff I attach.
I hope it will be not heavy to use them.
This is the content of the unittest.tgz file:

./unittest/runtime.properties
./unittest/common.runtime.properties
./unittest/realtime-run.sh
./unittest/kafka.soj.spec
./unittest/toload.template
./unittest/ut_inject.sh
./unittest/crontab.settings
./unittest/config.yaml.9099

The ut_inject.sh script is a simple shell script that, using the toload.template file, produces three files:

  • the first one in which TIMESTAMP is substituted by the current time
  • the second one in which TIMESTAMP is substituted by the current time minus five minutes
  • the third one in which TIMESTAMP is substituted by the current time minus ten minutes
    then all the three files are feeded, by calling the kafkacat utility (https://github.com/edenhill/kafkacat) to a Kafka topic.
    The process is repeated each five minutes, as can be seen from the crontab.settings file.

To use the above stuff you should/could :

  1. edit the crontab.settings file to fix directories at your best convenience
  2. edit the ut_inject.sh script to:
  • fix directories (SPLDIR and TMPDIR variables) at your best convenience
  • fix Kafka broker endpoint by modifying the variables KAFKA_BROKER_HOST and KAFKA_BROKER_PORT
  • fix Kafka topic name, if you want, by modifying the variable KAFKA_TOPIC
  • edit the realtime node Kafka firehose specification file (kafka.soj.spec) by modifying:
  • the “dataSource” value, if you want;
  • the consumer properties “zookeeper.connect” endpoints;
  • the consumer properties “group.id” consumer group identifier, if you want;
  • the firehose “feed” (if you changed the name of the Kafka topic);
  • the tuning config “basePersistDirectory” directory.

The realtime-run.sh script shows the way in which we started the Druid realtime node, whilst the runtime.properties file
contains its properties. Find attached also the properties common to all Druid nodes (common.runtime.properties).

The config.yaml.9099 file is the configuration file used by our installation of Pivot.

I modified the crontab file of my testing machine with the crontab.settings contents at about 13:00 PM CET.
After the execution of 13:50 I verified that the consumer offset lag became zero (see kafkaConsumerOffset utility snapshot attached),
then at 13:53:48 I killed the realtime node with:

kill -15 pid

At 13:54:24 I started again the realtime node (see the file realtime-run.sh for parameters).

Some time later, running Pivot, I can see again the loss of data (see Pivot snapshot attached).
Hoping this will be useful, best,
Marco

unittest.tgz (24.3 KB)

Is it possible to submit this as a PR to https://github.com/druid-io/druid?

Hi Fangjin,
I have to admit that I do not know what “PR” means and the way to submit it.
I would be happy to learn from you.
Best,
Marco

PR == pull request on github

I see. :slight_smile:
Should I pull the new request on the branch “0.8.2”? (Since I am using that version of Druid). Or on the master branch?
Thanks,
Marco

Hey Marco,

Please do master, as all new development occurs there.

You might find this doc helpful: https://github.com/druid-io/druid/blob/master/CONTRIBUTING.md

Hi all,
issue filed as:

Best,
Marco