How to improve kafka ingestion rate

Hello

We are evaluating considering druid as our analytical store. Have a couple of questions that we would like some opinions from the experts.

For the purpose of the PoC we are trying to ingest 1 B rows into the store and then run queries on it to assess if the query response times are satisfactory.

The druid cluster that we have setup has the following configuration:

Node 1 - Master

Runs the following

  1. Zookeeper

  2. Metadata Derby DB

  3. Overlord

  4. Coordinator

Node 2 - Data 01

Runs the following

  1. Historical

  2. Middlemanager

Node 3 - Data 02

Runs the following

  1. Historical

  2. Middlemanager

Node 4 - Query

Runs the following

  1. Broker

  2. Router

  3. Pivot

All nodes are AWS EC2 instance type – m4.2xlarge

The above is the setup recommended by Imply https://docs.imply.io/on-prem/deploy/cluster using the 3.1.6 distribution.

Additionally, we have setup a two node kafka cluster. We have created a topic with 10 partitions.

Our synthetic data is created in many csv.gz files. There are about 240 dimensions and 6 metrics. We are using a kafka producer job that is sending the data to kafka and have a kafka ingestion spec that reads from the specified topic from kafka.

We are seeing that the ingestion rate we are getting is only 4 K records per sec. We added close to 200 M records in kafka, but we see that only 4 M records get insered.

Based on the above, some questions that we would like expert opinions:

  1. Is 4 K records/sec ingestion rate the peak ingestion rate that we can expect in the cluster that we have setup?

  2. What are the tweaks in the current configuration that we can do to increase the ingestion rate? 4 K is too less, we would like ingestion rates to be in excess of 50 K records/sec. The cluster configuration is the default that came with the 3.1.6 distribution.

  3. Do we need to expand the cluster and if so which nodes should be expanded to get improved ingestion rate?

  4. Do we need a larger kafka cluster?

  5. Why does the ingestion stop after ingesting 4 M records? Which logs would contain the reason for this abrupt stop in ingestion?

Attached is the kafka ingestion spec with columnspec, dimesionspec and metricspec truncated.

Thanks,

Atul

kafka-supervisor.json (1.83 KB)

Hi
There are many configurations you need to tweek . Few of them are listed below inline which may improve the ingestion rate.

Hello

We are evaluating considering druid as our analytical store. Have a couple of questions that we would like some opinions from the experts.

For the purpose of the PoC we are trying to ingest 1 B rows into the store and then run queries on it to assess if the query response times are satisfactory.

The druid cluster that we have setup has the following configuration:

Node 1 - Master

Runs the following

  1. Zookeeper
  1. Metadata Derby DB
  1. Overlord
  1. Coordinator

Node 2 - Data 01

Runs the following

  1. Historical
  1. Middlemanager

Node 3 - Data 02

Runs the following

  1. Historical
  1. Middlemanager

Node 4 - Query

Runs the following

  1. Broker
  1. Router
  1. Pivot

All nodes are AWS EC2 instance type – m4.2xlarge

The above is the setup recommended by Imply <https://docs.imply.io/on-prem/deploy/cluster> using the 3.1.6 distribution.

Additionally, we have setup a two node kafka cluster. We have created a topic with 10 partitions.

Our synthetic data is created in many csv.gz files. There are about 240 dimensions and 6 metrics. We are using a kafka producer job that is sending the data to kafka and have a kafka ingestion spec that reads from the specified topic from kafka.

We are seeing that the ingestion rate we are getting is only 4 K records per sec. We added close to 200 M records in kafka, but we see that only 4 M records get insered.

Based on the above, some questions that we would like expert opinions:

  1. Is 4 K records/sec ingestion rate the peak ingestion rate that we can expect in the cluster that we have setup?

By default only 2 worker is allocated for middle manager. ie data from only 4 partition is read at a time. Try increasing the no of worker to 5 in middle manager runtime.properties. Hope you are having the kafka in the same subnet. Make sure that there is no network delay between MM node and kafka nodes.

  1. What are the tweaks in the current configuration that we can do to increase the ingestion rate? 4 K is too less, we would like ingestion rates to be in excess of 50 K records/sec. The cluster configuration is the default that came with the 3.1.6 distribution. May i know what is the size of the message uncompressed.

As thumb rule consider that druid capable of ingesting 25gb per worker in 1 hr .

keep druid.processing.numThreads=2 in MM runtime.properties. This means 1 worker will take 2 core for ingestion.

  1. Do we need to expand the cluster and if so which nodes should be expanded to get improved ingestion rate?

You can consider scaling the Data node further . If you want to increase the ingestion rate further , increase the no of lkfaka partion such that one worker will get one partition.

  1. Do we need a larger kafka cluster?
  1. Why does the ingestion stop after ingesting 4 M records? Which logs would contain the reason for this abrupt stop in ingestion?

Check the Middle manager log and PEON log in middle manager node. PEONs are the worker process (ingestion process) Middle manager starts for a specific period of time(default 1 hour) . At the end of 1 hour PEON push the segment to Deepstorage . After this MM will start another PEON process for next 1 hr duration. Check if GC is causing any issue. or check if the connectivity between MM and Deepstorage is good.

Please also go through the Cluster tuning guidlines for Middle Manager and PEONS. https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html

Hi Tijo
Thanks for the response. See inline for further updates.

Hi
There are many configurations you need to tweek . Few of them are listed below inline which may improve the ingestion rate.

Hello

We are evaluating considering druid as our analytical store. Have a couple of questions that we would like some opinions from the experts.

For the purpose of the PoC we are trying to ingest 1 B rows into the store and then run queries on it to assess if the query response times are satisfactory.

The druid cluster that we have setup has the following configuration:

Node 1 - Master

Runs the following

  1. Zookeeper
  1. Metadata Derby DB
  1. Overlord
  1. Coordinator

Node 2 - Data 01

Runs the following

  1. Historical
  1. Middlemanager

Node 3 - Data 02

Runs the following

  1. Historical
  1. Middlemanager

Node 4 - Query

Runs the following

  1. Broker
  1. Router
  1. Pivot

All nodes are AWS EC2 instance type – m4.2xlarge

The above is the setup recommended by Imply <https://docs.imply.io/on-prem/deploy/cluster> using the 3.1.6 distribution.

Additionally, we have setup a two node kafka cluster. We have created a topic with 10 partitions.

Our synthetic data is created in many csv.gz files. There are about 240 dimensions and 6 metrics. We are using a kafka producer job that is sending the data to kafka and have a kafka ingestion spec that reads from the specified topic from kafka.

We are seeing that the ingestion rate we are getting is only 4 K records per sec. We added close to 200 M records in kafka, but we see that only 4 M records get insered.

Based on the above, some questions that we would like expert opinions:

  1. Is 4 K records/sec ingestion rate the peak ingestion rate that we can expect in the cluster that we have setup?

By default only 2 worker is allocated for middle manager. ie data from only 4 partition is read at a time. Try increasing the no of worker to 5 in middle manager runtime.properties. Hope you are having the kafka in the same subnet. Make sure that there is no network delay between MM node and kafka nodes.

  1. What are the tweaks in the current configuration that we can do to increase the ingestion rate? 4 K is too less, we would like ingestion rates to be in excess of 50 K records/sec. The cluster configuration is the default that came with the 3.1.6 distribution. May i know what is the size of the message uncompressed.

As thumb rule consider that druid capable of ingesting 25gb per worker in 1 hr .

keep druid.processing.numThreads=2 in MM runtime.properties. This means 1 worker will take 2 core for ingestion.

The raw/uncompressed size is roughly 800 MB for 300K records. The ingestion rate that we are getting is 4K records/sec which comes to 14.4 M records/sec which equates to an uncompressed size of 38.4 GB/sec. The MM configuration has the following:

druid.worker.capacity=3

druid.processing.numThreads=2

That there are two MMs, which means there could be 6 worker tasks simultaneously in progress and if druid is capable of ingesting 25gb per worker in 1 hr, this translates to 150 GB ingestion per hour. The ingestion rate that we are getting is way less than that.

  1. Do we need to expand the cluster and if so which nodes should be expanded to get improved ingestion rate?

You can consider scaling the Data node further . If you want to increase the ingestion rate further , increase the no of lkfaka partion such that one worker will get one partition.

We have the kafka partitions set to 10. So, are you suggesting that to get optimal performance, we should set druid.worker.capacity=5 on each of the MM so that there will be exactly the same number of workers as the number of kafka partitions?

  1. Do we need a larger kafka cluster?
  1. Why does the ingestion stop after ingesting 4 M records? Which logs would contain the reason for this abrupt stop in ingestion?

Check the Middle manager log and PEON log in middle manager node. PEONs are the worker process (ingestion process) Middle manager starts for a specific period of time(default 1 hour) . At the end of 1 hour PEON push the segment to Deepstorage . After this MM will start another PEON process for next 1 hr duration. Check if GC is causing any issue. or check if the connectivity between MM and Deepstorage is good.

I figured that the ingestion is not stopping at 4 M records, rather it is getting wrapped around. A periodic count(*) on the datasource while the ingestion is in progress shows that the count is getting increased at the rate of ~4K records/sec until the count reaches to 20 M when it gets wrapped around. I suspect the data is getting dropped/lost.

Here is the relevant index logs and the count(*) when the wraparound happens

2019-12-10T14:03:38,359 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_4b117c3b9b1c1ae_cfcibngo].

2019-12-10T14:03:38,359 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [READING])

2019-12-10T14:03:38,359 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persisting all pending data

2019-12-10T14:03:38,360 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisting data.

2019-12-10T14:03:38,360 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Hydrant[FireHydrant{, queryable=flows39_2019-06-07T23:00:00.000Z_2019-06-08T00:00:00.000Z_2019-12-10T12:48:30.575Z_1, count=40}] hasn’t persisted yet, persisting. Segment[flows39_2019-06-07T23:00:00.000Z_2019-06-08T00:00:00.000Z_2019-12-10T12:48:30.575Z_1]

2019-12-10T14:03:38,086 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_a21d375287d3740_bebmfiod].

2019-12-10T14:03:38,086 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [PUBLISHING])

2019-12-10T14:03:38,090 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName=‘index_kafka_flows39_a21d375287d3740_0’, assignments=, startOffsets={0=0, 1=0, 2=0, 3=0, 4=0, 5=0, 6=0, 7=0, 8=0, 9=0}, exclusiveStartPartitions=, endOffsets={0=1492905, 1=1272440, 2=1552565, 3=1384465, 4=1522970, 5=1480860, 6=1487475, 7=1305730, 8=1463260, 9=1332120}, sentinel=false, checkpointed=true}]

java.util.concurrent.CancellationException: Task was cancelled.

    at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:392) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1170) [guava-16.0.1.jar:?]

Tue Dec 10 14:03:09 UTC 2019

select count(*) from flows39;

┌──────────┐

│ EXPR$0 │

├──────────┤

│ 20753038 │

└──────────┘

Retrieved 1 row in 1.35s.

Tue Dec 10 14:04:11 UTC 2019

select count(*) from flows39;

┌────────┐

│ EXPR$0 │

├────────┤

│ 146935 │

└────────┘

Retrieved 1 row in 0.13s.

2019-12-10T15:34:07,861 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_abca014a00cce81_ofconddn].

2019-12-10T15:34:07,861 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [READING])

2019-12-10T15:34:07,868 INFO [flows39-incremental-persist] org.apache.druid.segment.StringDimensionMergerV9 - Completed dim[reporterEntity_collectorId] inverted with cardinality[8,129] in 51 millis.

2019-12-10T15:34:07,749 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_6e9a0af5cb37487_koghokai].

2019-12-10T15:34:07,749 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [PUBLISHING])

2019-12-10T15:34:07,753 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName=‘index_kafka_flows39_6e9a0af5cb37487_0’, assignments=, startOffsets={0=8640010, 1=8640000, 2=8640010, 3=8640000, 4=8640010, 5=8640000, 6=8640010, 7=8640000, 8=8640010, 9=8640000}, exclusiveStartPartitions=, endOffsets={0=10095940, 1=9841905, 2=10054370, 3=9894285, 4=10154130, 5=10012390, 6=10091550, 7=9916425, 8=10248950, 9=9851470}, sentinel=false, checkpointed=true}]

java.util.concurrent.CancellationException: Task was cancelled.

    at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:392) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) ~[guava-16.0.1.jar:?]

    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1170) [guava-16.0.1.jar:?]

Tue Dec 10 15:33:51 UTC 2019

select count(*) from flows39;

┌──────────┐

│ EXPR$0 │

├──────────┤

│ 20211981 │

└──────────┘

Retrieved 1 row in 0.21s.

Tue Dec 10 15:34:21 UTC 2019

select count(*) from flows39;

┌────────┐

│ EXPR$0 │

├────────┤

│ 40184 │

└────────┘

Retrieved 1 row in 0.08s.

I suspected that this wraparound is due to maxRowsPerSegment and maxTotalRows tuningConfig params. But these were set to a high value

  "maxRowsPerSegment":200000000,

  "maxTotalRows":2000000000,

Also, I experimented setting these configs to low value

  "maxRowsPerSegment":200,

  "maxTotalRows":2000,

And tried to ingest about 300K records. All the records got ingested. I was expecting that the number of rows won’t go past maxTotalRows, but that wasn’t the case.

Any insights on why data is getting lost/wraparound?

See my inline comments

Hi Tijo
Thanks for the response. See inline for further updates.

Hi
There are many configurations you need to tweek . Few of them are listed below inline which may improve the ingestion rate.

Hello

We are evaluating considering druid as our analytical store. Have a couple of questions that we would like some opinions from the experts.

For the purpose of the PoC we are trying to ingest 1 B rows into the store and then run queries on it to assess if the query response times are satisfactory.

The druid cluster that we have setup has the following configuration:

Node 1 - Master

Runs the following

  1. Zookeeper
  1. Metadata Derby DB
  1. Overlord
  1. Coordinator

Node 2 - Data 01

Runs the following

  1. Historical
  1. Middlemanager

Node 3 - Data 02

Runs the following

  1. Historical
  1. Middlemanager

Node 4 - Query

Runs the following

  1. Broker
  1. Router
  1. Pivot

All nodes are AWS EC2 instance type – m4.2xlarge

The above is the setup recommended by Imply <https://docs.imply.io/on-prem/deploy/cluster> using the 3.1.6 distribution.

Additionally, we have setup a two node kafka cluster. We have created a topic with 10 partitions.

Our synthetic data is created in many csv.gz files. There are about 240 dimensions and 6 metrics. We are using a kafka producer job that is sending the data to kafka and have a kafka ingestion spec that reads from the specified topic from kafka.

We are seeing that the ingestion rate we are getting is only 4 K records per sec. We added close to 200 M records in kafka, but we see that only 4 M records get insered.

Based on the above, some questions that we would like expert opinions:

  1. Is 4 K records/sec ingestion rate the peak ingestion rate that we can expect in the cluster that we have setup?

By default only 2 worker is allocated for middle manager. ie data from only 4 partition is read at a time. Try increasing the no of worker to 5 in middle manager runtime.properties. Hope you are having the kafka in the same subnet. Make sure that there is no network delay between MM node and kafka nodes.

  1. What are the tweaks in the current configuration that we can do to increase the ingestion rate? 4 K is too less, we would like ingestion rates to be in excess of 50 K records/sec. The cluster configuration is the default that came with the 3.1.6 distribution. May i know what is the size of the message uncompressed.

As thumb rule consider that druid capable of ingesting 25gb per worker in 1 hr .

keep druid.processing.numThreads=2 in MM runtime.properties. This means 1 worker will take 2 core for ingestion.

The raw/uncompressed size is roughly 800 MB for 300K records. The ingestion rate that we are getting is 4K records/sec which comes to 14.4 M records/sec which equates to an uncompressed size of 38.4 GB/sec. The MM configuration has the following:

druid.worker.capacity=3

druid.processing.numThreads=2

That there are two MMs, which means there could be 6 worker tasks simultaneously in progress and if druid is capable of ingesting 25gb per worker in 1 hr, this translates to 150 GB ingestion per hour. The ingestion rate that we are getting is way less than that.

  1. Do we need to expand the cluster and if so which nodes should be expanded to get improved ingestion rate?

You can consider scaling the Data node further . If you want to increase the ingestion rate further , increase the no of lkfaka partion such that one worker will get one partition.

We have the kafka partitions set to 10. So, are you suggesting that to get optimal performance, we should set druid.worker.capacity=5 on each of the MM so that there will be exactly the same number of workers as the number of kafka partitions?

Correct.

  1. Do we need a larger kafka cluster?
  1. Why does the ingestion stop after ingesting 4 M records? Which logs would contain the reason for this abrupt stop in ingestion?

Check the Middle manager log and PEON log in middle manager node. PEONs are the worker process (ingestion process) Middle manager starts for a specific period of time(default 1 hour) . At the end of 1 hour PEON push the segment to Deepstorage . After this MM will start another PEON process for next 1 hr duration. Check if GC is causing any issue. or check if the connectivity between MM and Deepstorage is good.

I figured that the ingestion is not stopping at 4 M records, rather it is getting wrapped around.

Check in the ingestion spec, what is the maxRowsPerSegment value . If this is 5 million means - a segment with 5 million records will be created( including roll ups). if you enabled rollup - count is calculated with Sum( ) .Normal count(*) will give a different result.

A periodic count(*) on the datasource while the ingestion is in progress shows that the count is getting increased at the rate of ~4K records/sec until the count reaches to 20 M when it gets wrapped around. I suspect the data is getting dropped/lost.

Here is the relevant index logs and the count(*) when the wraparound happen

Below i can see the error when MM tries to push the data to Historical . If such error happens it will restart ingestion from the kafka with the last successful index which it ingested in the earlier iteration , so there is no data loss.

2019-12-10T14:03:38,359 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_4b117c3b9b1c1ae_cfcibngo].

2019-12-10T14:03:38,359 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [READING])

2019-12-10T14:03:38,359 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persisting all pending data

2019-12-10T14:03:38,360 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisting data.

2019-12-10T14:03:38,360 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Hydrant[FireHydrant{, queryable=flows39_2019-06-07T23:00:00.000Z_2019-06-08T00:00:00.000Z_2019-12-10T12:48:30.575Z_1, count=40}] hasn’t persisted yet, persisting. Segment[flows39_2019-06-07T23:00:00.000Z_2019-06-08T00:00:00.000Z_2019-12-10T12:48:30.575Z_1]

2019-12-10T14:03:38,086 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_a21d375287d3740_bebmfiod].

2019-12-10T14:03:38,086 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [PUBLISHING])

2019-12-10T14:03:38,090 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName=‘index_kafka_flows39_a21d375287d3740_0’, assignments=, startOffsets={0=0, 1=0, 2=0, 3=0, 4=0, 5=0, 6=0, 7=0, 8=0, 9=0}, exclusiveStartPartitions=, endOffsets={0=1492905, 1=1272440, 2=1552565, 3=1384465, 4=1522970, 5=1480860, 6=1487475, 7=1305730, 8=1463260, 9=1332120}, sentinel=false, checkpointed=true}]

java.util.concurrent.CancellationException: Task was cancelled.

    at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:392) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1170) [guava-16.0.1.jar:?]

Tue Dec 10 14:03:09 UTC 2019

select count(*) from flows39;

┌──────────┐

│ EXPR$0 │

├──────────┤

│ 20753038 │

└──────────┘

Retrieved 1 row in 1.35s.

Tue Dec 10 14:04:11 UTC 2019

select count(*) from flows39;

┌────────┐

│ EXPR$0 │

├────────┤

│ 146935 │

└────────┘

Retrieved 1 row in 0.13s.

2019-12-10T15:34:07,861 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_abca014a00cce81_ofconddn].

2019-12-10T15:34:07,861 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [READING])

2019-12-10T15:34:07,868 INFO [flows39-incremental-persist] org.apache.druid.segment.StringDimensionMergerV9 - Completed dim[reporterEntity_collectorId] inverted with cardinality[8,129] in 51 millis.

2019-12-10T15:34:07,749 INFO [Thread-53] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting graceful shutdown of task[index_kafka_flows39_6e9a0af5cb37487_koghokai].

2019-12-10T15:34:07,749 INFO [Thread-53] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Stopping gracefully (status: [PUBLISHING])

2019-12-10T15:34:07,753 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName=‘index_kafka_flows39_6e9a0af5cb37487_0’, assignments=, startOffsets={0=8640010, 1=8640000, 2=8640010, 3=8640000, 4=8640010, 5=8640000, 6=8640010, 7=8640000, 8=8640010, 9=8640000}, exclusiveStartPartitions=, endOffsets={0=10095940, 1=9841905, 2=10054370, 3=9894285, 4=10154130, 5=10012390, 6=10091550, 7=9916425, 8=10248950, 9=9851470}, sentinel=false, checkpointed=true}]

java.util.concurrent.CancellationException: Task was cancelled.

    at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:392) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) ~[guava-16.0.1.jar:?]
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1170) [guava-16.0.1.jar:?]

Tue Dec 10 15:33:51 UTC 2019

select count(*) from flows39;

┌──────────┐

│ EXPR$0 │

├──────────┤

│ 20211981 │

└──────────┘

Retrieved 1 row in 0.21s.

Tue Dec 10 15:34:21 UTC 2019

select count(*) from flows39;

┌────────┐

│ EXPR$0 │

├────────┤

│ 40184 │

└────────┘

Retrieved 1 row in 0.08s.

I suspected that this wraparound is due to maxRowsPerSegment and maxTotalRows tuningConfig params. But these were set to a high value

  "maxRowsPerSegment":200000000,
  "maxTotalRows":2000000000,

Also, I experimented setting these configs to low value

  "maxRowsPerSegment":200,
  "maxTotalRows":2000,

And tried to ingest about 300K records. All the records got ingested. I was expecting that the number of rows won’t go past maxTotalRows, but that wasn’t the case.

Consider if the case maxRowsPerSegment = 5 m and maxTotalRows is 20 m and your task granularity is hour, this mean for each interval eg 1:00 AM to 2:00 PM there will be one segment , 2:00 to 3:00 another … like that and if the total no of rec for all the segments created by a task is > 20 m then task create segment and push the data deepstorage.

Any insights on why data is getting lost/wraparound?

I dont think ur data is lost . ingestion task has failed and it restart the task again .

Hi Tijo
Below is the tuningconfig of the kafka ingestion spec. Although the maxRowsInMemory is set to 100M, I still see that the count(*) grows to 20M and then it wraps around. maxRowsPerSegment and maxTotalRows are also considerably high.

If it is a question of process getting killed because of OOM, then the logs don’t say that explicitly.

“tuningConfig”:{

  "type":"kafka",

  "maxRowsInMemory":100000000,

  "maxBytesInMemory":0,

  "maxRowsPerSegment":350000000,

  "maxTotalRows":5000000000,

  "intermediatePersistPeriod":"PT10M",

  "maxPendingPersists":0,

  "indexSpec":{

     "bitmap":{

        "type":"concise"

     },

     "dimensionCompression":"lz4",

     "metricCompression":"lz4",

     "longEncoding":"longs"

  },

  "indexSpecForIntermediatePersists":{

     "bitmap":{

        "type":"concise"

     },

     "dimensionCompression":"lz4",

     "metricCompression":"lz4",

     "longEncoding":"longs"

  },

  "buildV9Directly":true,

  "reportParseExceptions":false,

  "handoffConditionTimeout":0,

  "resetOffsetAutomatically":false,

  "segmentWriteOutMediumFactory":null,

  "intermediateHandoffPeriod":"P2147483647D",

  "logParseExceptions":false,

  "maxParseExceptions":2147483647,

  "maxSavedParseExceptions":0,

  "skipSequenceNumberAvailabilityCheck":false

},

The segmentGranularity is set to HOUR

"granularitySpec": {

  "type": "uniform",

  "segmentGranularity": "HOUR",

  "queryGranularity": "NONE",

  "rollup": true

}

The data that I am trying to ingest consists of multiple csv files. The events are spread over one day (the date is not the current date, but a date in the past). The total number of records are close to 170 M.

A kafka producer job puts the csv to a topic with 10 partitions in Kafka.

You mentioned that the optimal druid.worker.capacity should be such that it is equal to the number of partitions in the topic. We have two MMs in the cluster. And even if I set the druid.worker.capacity to 3, 4 or 5 on each of the MM, I am still getting only about 4K records/sec ingestion rate.

About your point of

ingestion task has failed and it restart the task again

The ingestion continues to progress, it again reaches 20M rows and then wraps around. But at some point the table becomes unavailable. select count(*) query returns an error

java.lang.RuntimeException: Error while applying rule DruidTableScanRule, args [rel#285753:LogicalTableScan.NONE.(table=[druid, flows42])]

So I am still trying to figure out

a) How to ingest 170 M records into druid using kafka

b) How to improve the ingestion rate

Thanks for the help,

Atul

Hi Atul,
Can you also attach the configs. Commons , Historical, Middle Manager, Co ordinator , Overload , Broker

Thanks

Attached is the tar.gz of conf/druid

conf_druid.tar.gz (7.42 KB)