**URGENT** Realtime Kafka ingestion double

Hi,

I’ve a big issue related to the Realtime ingestion of data coming from Kafka (kafka_2.10-0.8.2.1)

Compared to a simple consumer to Kafka the numbers on Druid are greater, using select query some rows has metrics with value double or triple.

Running version is 0.9.1.1 and I’ve a couple of Realtime nodes running the same job. They manage 8 datasources and each has this part in spec file.

Every Realtime as suggested many times ago has a different group.id

The hourly throughput is around 22-25 million rows.

Can you please help me on investigate which is the issue and how to solve.

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “hour”,

“queryGranularity” : “minute”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “10.80.4.69:2181,10.80.4.6:2181,10.80.4.56:2181”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“group.id”: “druidaws6”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”

},

“feed”: “buck_bidding”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 600000,

“intermediatePersistPeriod”: “PT1h”,

“windowPeriod”: “PT1h”,

“basePersistDirectory”: “/usr/local/dataStorage”,

“rejectionPolicy”: {

“type”: “serverTime”

},

“shardSpec”: {

“type”: “linear”,

“partitionNum”: 0

}

}

},

``

Thanks,

Maurizio

Do the different realtime nodes have different group.ids but are writing into the same datasource? If so that would cause what you’re seeing. They’d essentially be loading the same data multiple times.

Hi Gian,
thanks for your feedback.

Yes, two realtime are doing the same job, the purpose is to have a fault tolerant system, datasources are the same and the druid.storage.storageDirectory is the same for both the nodes.

So, in your opinion is better to have just one Realtime ingesting a topic and writing in a datasource (eventually different realtime managing different topic/datasource)?

A couple of additional questions:

  • How is the expected behavior having two realtimes doing the same job? Each one collects data hourly, each one prepares segment and communicate to coordinator and copy into storageDirectory. Will the second copy overwrite the first? Will this be mitigated by coordinator in some way?

  • My suspect is that issue is more related to the communication with Kafka and how checkpoints are marked, this because I do not have exactly double count but some more, some thousands more, so in some occasions the same row in kafka is read more times by realtime. So the question is, how Realtime mark already read rows? Could be related to issues with Zookeeper? Is there any log in Realtime that allow me to verify it?

Thanks,

Maurizio

As additional info, opening all realtime logs I can see these Warning:
2017-03-09 04:03:22,908 WARN o.a.c.r.ExponentialBackoffRetry [main] maxRetries too large (30). Pinning to 29

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-druid_monitor[0]-SendThread(10.80.4.69:2181)] Client session timed out, have not heard from server in 11889ms for sessionid 0x45a84803157023d

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_opt_campaign[0]-SendThread(10.80.4.6:2181)] Client session timed out, have not heard from server in 11894ms for sessionid 0x65a848031810224

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_audiences[0]-SendThread(10.80.4.6:2181)] Client session timed out, have not heard from server in 11902ms for sessionid 0x65a848031810225

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_apps[0]-SendThread(10.80.4.6:2181)] Client session timed out, have not heard from server in 11902ms for sessionid 0x65a848031810222

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_rtb[0]-SendThread(10.80.4.6:2181)] Client session timed out, have not heard from server in 12710ms for sessionid 0x65a848031810223

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_bidding[0]-SendThread(10.80.4.56:2181)] Client session timed out, have not heard from server in 13250ms for sessionid 0x55a84804f55025a

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_tracking[0]-SendThread(10.80.4.56:2181)] Client session timed out, have not heard from server in 14143ms for sessionid 0x55a84804f550259

2017-03-09 04:12:02,889 WARN o.a.z.ClientCnxn [chief-buck_retargeting[0]-SendThread(10.80.4.69:2181)] Client session timed out, have not heard from server in 11894ms for sessionid 0x45a84803157023c

``

Could these means something ?

Thanks

Maurizio

Hi Gian,
and update about tests done today.

I’ve run only one realtime with the same configuration (does the shardSpec linear can create issues when run with only one instance?).

After that, one hour of data was ingested perfectly without any discrepancy, unfortunately, the next hour has again the issue 20819745 against 20465165 .

Nothing was changed during this time… I really have no idea why it’s happening.

Any feedback is really appreciated

Thanks

Maurizio

Hey Maurizio,

Sorry, I haven’t had time to go over your issue in detail, but I will say that Druid Realtime Nodes with any firehose (including Kafka) do not generally guarantee transactional behavior, exactly once ingestion, or any correctness beyond best effort. There are various things you can do to make realtime nodes “less wrong” but they do have various design flaws that will lead them to be wrong from time to time, especially surrounding service restart, interruption, or scale up/down.

This is part of the reason we developed the Druid Kafka indexing service, which does guarantee exactly once ingestion: http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html

If you need stronger guarantees I would encourage checking that out.

Hi Gian,
thanks for your feedback.

Currently, I was able to fix the issue splitting datasources between multiple realtime nodes, I’ve also increased the java heap space and changed the CG strategy.

About Kafka Indexing, did you evaluate it as a production-ready solution?

I’m very concerned about how to move a production environment with million of rows from Realtime-Kafka solution to Kafka Index solution, there are so many differences.

  • Kafka needs to be upgraded to 0.9 or 0.10

  • Realtime nodes must be turned off and additional Middlemanager must be created

How to manage transient data during migration?

Did you have any documentation about a rollup plan for this kind migration? I think should be very useful for many of us.

Thanks,

Maurizio