Tranquilty Kafka - multiple instances - duplicate massages

Hi Druids! :wink:

I’m using Druid 0.9.0 with Tranquilty Kafka 0.8.1. The data is being ingested from a topic that has only one partition.

I have two machines. Every machine is running an instance of these Druid nodes/apps:

  • coordinator
  • overlord
  • historical
  • middleManager
  • broker
  • tranquility kafka
    I know this is not the ideal setup in terms of HA, but I think this problem is not related with this setup. And also, every instance is running inside it’s own Docker container.

As you see above, I’m running two instances of tranquilty kafka with exactly the same config: kafka.json

When all druid and tranquility instances are running everything is workig great!

I have the correct setup in terms of worker capacity on the Indexing service - I have everything in real-time. In production it’s about 10K msg/s produced to the Kafka topic.

THE PROBLEM

Here are the steps I do to reproduce the problem:

Scenario 1

  • I stop one tranquility instance - with docker stop.

  • For some time (10-20 seconds) no data is ingested in Druid.

  • After that the ingestion resumes but duplicate messages are being ingested. Instead of 10000 in “totalCount” metric - which I produced to the Kafka configured topic - I get 10569 of “totalCount”.

  • After the period when duplicates occur (10-20 seconds) and with one tranquilty down - the cluster continues to operate normally - no duplicates occur.

  • Since I’m constantly producing 10K messages with different dimension value (“campaignId”) and with metric “countDelta” with value 1 - I can check all the time if duplicates occur.

  • I start the stopped tranquility instance while messages are produced to Kafka - duplicates occur! I get 10578 of “totalCount”.

  • After the period when duplicates occur (10-20 seconds) and with both tranquilty up and running - the cluster continues to operate normally - no duplicates occur.
    Scenario 2

  • Similar to Scenario 1, but instead of stopping one tranquility instance, I stop both tranquilty instances.

  • Ingestion stops - no data comes to Druid.

  • I start only one instance (the first that I stopped) while messages are produced to Kafka - duplicates occur (11542 of “totalCount”)

  • After the period when duplicates occur (10-20 seconds) and with one tranquilty down - the cluster continues to operate normally - no duplicates occur.

  • I start the stopped tranquility instance while messages are produced to Kafka - duplicates occur! I get 10683 of “totalCount”.
    Scenario 3

  • I was thinking it could be something wrong in the way Docker stops the tranquilty kafka service (docker stop) - so that causes tranquility kafka to not commit offsets or something. I was in doubt that the shutdown was not gracefull.

  • I stopped the two mentioned tranquilty kafka instances in Docker containers and started two instances manually from the console

  • The first started instance showed in the logs that is the ingestion is running (Flushing messages: receivedCount=7685, sentCount=7685, droppedCount=0, unparseableCount=0) and that is commiting offsets.

  • The second one was just logging: Flushed 0 pending messages in 0ms and committed offsets in 3ms.

  • I stopped the second one with CTRL-C, the logs are here: tranquility 2 shutdown.log

  • And still duplicates ocurred! I got 10039 of “totalCount” - the smallest number of duplicates till now! :slight_smile:

Questions

What is the correct way to stop a running Tranquilty kafka instance in this cluster setup?

This is a big problem, because it means I can’t restart Tranquility kafka to make changes to tranquilty config (kafka.json) - for example add new dimensions and metrics - without a period of duplicated messages - since in production I have a constant stream from Kafka.

I’m just looking for a gracefull way to do a rolling update of the tranquilty kafka.json config file. I don’t want even imagine what would happen if the whole machine crashes! :wink:

In this scenario I didn’t break the rules that prevent Tranquilty from duplicating messages: Stream push ingestion guarantees

The tranquilty kafka documetation says:

Multiple instances of Tranquility Kafka may be deployed to provide scalability (through each instance owning a subset of the Kafka partitions) and redundancy (through all instances sharing the same Kafka group ID). If the configuration changes, instances can be brought down and updated one at a time to update the cluster without any downtime.

In the end, I hope I’m doing something wrong - and that there is a step I forgot to do, or some config is not good for this cluster setup.

Thank you very much for your patience :slight_smile:

Davor Poldrugo

https://about.me/davor.poldrugo

Here are the Druid configs:

Hey Davor,

I was able to reproduce what you were seeing and unfortunately I don’t think it’s your configuration. My guess right now is that when the 2nd Tranquility instance shuts down, the Kafka consumer rebalances to transfer all the partitions to the 1st instance and it seeks all of the partitions (including the ones it previously owned) back to the last commit point which causes a re-read of the events it read since the last commit.

I’ll look more into why this is happening and what can be done about it, but in the meantime, if you’re looking for exactly-once ingestion instead of at-least once, you might be interested in the new Kafka indexing service that’ll be released in 0.9.1. There’s some documentation about it here: http://druid.io/docs/0.9.1-rc4/development/extensions-core/kafka-ingestion.html, and if you’re interested, you can try it out in 0.9.1-rc4. With the Kafka indexing service you won’t see issues like this, but do note that it’s still marked as experimental until it gets more use in production systems and any remaining bugs get shaken out.

I’ll post an update here when I learn more. Thanks for the detailed bug report!

Hey Davor,

I looked into the Kafka consumer a bit more and it looks like it does in fact roll back to the last committed offset during a rebalance which would cause duplication. If you do a Google search on ‘kafka duplicate rebalance’ you’ll see that duplicate events during rebalancing is a well-known issue, in particular when auto.commit.enable is set to false. In Tranquility, we disable automatic offset committing so that we can ensure that we only commit once the message has been delivered to Druid to uphold the at-least once delivery guarantee.

Previous guidance was to run a batch ingestion pipeline in parallel with the streaming pipeline to clean up these kinds of duplicate events, but as I mentioned in the previous post, the new Kafka indexing service may be interesting to you if real-time correctness is important.

Thank you David. I’ll give it a try.
This is the doc: http://druid.io/docs/0.9.1-rc4/development/extensions-core/kafka-ingestion.html

I didn’t find how you handle duplicates with kafka indexing service… because as far as I now, duplicates could occur even with Kafka 0.10.0. The recommended way is save partition/offset together with the data in an atomic way. As described in the Kafka doc (point-of-view of the consumer):

Hey Davor,

The Kafka indexing service follows the recommendation to save the partition/offset with the data atomically. What happens is that when the indexing task finishes reading and goes to publish the segment to the metadata store, it’ll commit the segment descriptors and a record of the last offsets read in the same transaction so that the finalized segments and the offset cursors stay in sync. We also use some other checks (making sure that messages received from Kafka have offsets following sequentially from one another and making sure that subsequent indexing tasks pick up from where the previous one left off) to provide exactly-once guarantees.