Kafka Ingested data are not available immediately for querying

I am running 15 supervisor task to ingest CDC data from Kafka topic into the single node druid cluster.

The druid worker capacity is set to 7 so that at any moment 7 tasks are in running state to index the data and the task duration is set to PT1H.

My issue is that CDC data is not immediately available for querying in Druid console for the index task which is in pending state.

The newly arrived data is available in the data-source only when the task goes into the Running state again.

How can i make sure that fresh data’s are available for querying in Druid for all the tasks.

Thanks

It is worth finding out why your tasks are not running – there may be a hint in the supervisor’s own log, too. I suspect there’s some kind of resource constraint that would explain why some of the tasks are not running.

There is no resource constraint, Since the single node server has 8 CPU, i have allocated 7 CPU for the index task by using druid.worker.capacity=7

If i am not wrong, the above configuration will make sure that number of index task in running state will be limited to 7 in a round robin way.

Although this doc is about batch ingestion, the principal is largely the same: there can only ever be druid.worker.capacity tasks running at once, all other tasks will be pending.

Each task will take a number of partitions in the incoming topic (you can see the resulting split in the Payload view of each task), so if you have PENDING tasks, those sub tasks will not be running, and you will not be loading data from those partitions.

Thought I’d mention In case it’s helpful that it’s also good to have a perfect ratio between topic partitions and the number of subtasks… e.g. if you have 8 partitions, 8, 4, 2, or 1 task is cool. 24 partitions, again something nice and easily divisible : 24 (1 partition per task), 12 (2 partitions per task), 8 (3 partitions per task), 6…

OH and ALSO did you mean Supervisor tasks? Or did you mean that you set taskCount in the ingestion spec?

I mean 15 supervisor task and among all the sub task that it creates, 7 are in running state(due to druid.worker.capacity=7) and rest of them are in pending state.

So if CDC data is generated in a Kafka topic and the corresponding subtask (created by supervisor) is in pending state, I don’t see the newly arrived data in the query console until the subtask becomes RUNNING again.

If I understand you rightly, I would expect that: it’s the subtask that is actually ingesting the data and creating a data source for you to query. When a task is done, it will persist this into Deep Storage, and then the Historicals pick this up – then they will provide that data source.

So if the tasks finish and then you still cannot query data, check the logs to see what has happened to your data.

You need to have enough workers for all the tasks that you expect to run - so that supervisor AND the subtasks, plus of course compaction, reindexing, replicas (if you use them), any batch jobs you might run - the docs have a list, I think.

As you have 15 supervisors, are you bringing data in from 15 topics?
That being the case, each supervisor spawns taskCount tasks – so you will need at least that many druid.worker.capacity

Sorry if I am confusing things lol!!

I hope this is helping.

Explaining task count and worker capacity in a blog post is on my list of things to do…

Since I have 15 supervisor therefore 15 subtask are spawned by supervisor (taskcount being 1 in each case) So if I increase the druid worker capacity to 15 that will not work because I have only 8 CPU. So how can I make sure that the ingested CDC data is atleast available for query even if the subtask it creates is in pending state ?

Taking a step back. Are you consuming from a single Kafka topic with 15 partitions? Or why do you have 15 supervisor tasks? Can you provide a broader description of what you are trying to achieve: how many data sources are you loading, ingestion types (batch, real-time)…etc.?

Middle manager tasks for real-time ingestion in the PENDING state will be assigned topic partitions which will not be consumed until the task runs, so you are likely to want less tasks such that they can all be in the RUNNING state and all topic partitions are being consumed.

I have 15 different topics and 15 different supervisor specification (with taskcount=1) for each topic. So there are 15 different indexing sub task spawned by the 15 different supervisors.

7 indexing sub task are in running state and rest of them are in pending state due to druid.worker.capacity=7.

Issue is, ingested CDC data from the kafka topic is not available for query if the sub task spawned by the supervisor is in pending state.

I see. How busy is your node with 7 running? I wonder if you increase the capacity to 15 and if message throughput of the streams isn’t too high it might be able to handle it. As CPUs get too busy you might need to scale out to more middle managers. Increase in capacity, will also incur more memory usage so you might need to tune that. But it might be worth a test.

Hi Dwija,

This is to be expected. You will have to scale out your cluster to be able to handle the 15 tasks needed to serve your 15 supervisor specs.

My initial reaction is your cluster is under-provisioned. Even if you were to set druid.worker.capacity=15 and it was able to handle the load if the throughput is manageable, you will probably face performance issues in querying as you will have to share those same CPUs on the data node.

HI
@Vijeth_Sagar You are right. I am able to run all the index job at the same time by setting druid.worker.capacity=15

However, I tried adding another 9 new supervisor specification from different kafka topics and tried to increase the worker capacity to 20(druid.worker.capacity=20), but the server just stopped working and not reachable. Although there are no errors in the logs.

The server has 8 CPU and 24GB RAM

Can you suggest shall i increase the number of CPU or tune the JVM settings.

Following are the configuration settings:

broker/jvm.config

-server
-Xms512m
-Xmx512m
-XX:MaxDirectMemorySize=768m
-XX:+ExitOnOutOfMemoryError
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

broker/runtime.properties

druid.server.http.numThreads=12

druid.broker.http.numConnections=10
druid.broker.http.maxQueuedBytes=5MiB

druid.processing.buffer.sizeBytes=100MiB
druid.processing.numMergeBuffers=2
druid.processing.numThreads=1
druid.processing.tmpDir=var/druid/processing

druid.broker.cache.useCache=false
druid.broker.cache.populateCache=false

coordinator-overlord/runtime.properties

druid.service=druid/coordinator
druid.plaintextPort=8081

druid.coordinator.startDelay=PT10S
druid.coordinator.period=PT5S

druid.coordinator.asOverlord.enabled=true
druid.coordinator.asOverlord.overlordService=druid/overlord

druid.indexer.queue.startDelay=PT5S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

coordinator-overlord/jvm.config

-server
-Xms1g
-Xmx1g
-XX:+ExitOnOutOfMemoryError
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Dderby.stream.error.file=var/druid/derby.log

historical/runtime.properties

druid.service=druid/historical
druid.plaintextPort=8083

druid.server.http.numThreads=12

druid.processing.buffer.sizeBytes=200MiB

druid.processing.numMergeBuffers=2
druid.processing.numThreads=7

druid.processing.tmpDir=var/druid/processing

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:“300g”}]

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=20MiB

historical/jvm.config

-server
-Xms2g
-Xmx2g
-XX:MaxDirectMemorySize=3600m
-XX:+ExitOnOutOfMemoryError
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

middleManager/runtime.properties

druid.worker.capacity=15

druid.indexer.runner.javaOpts=-server -Xms5g -Xmx5g -XX:MaxDirectMemorySize=3g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task

druid.server.http.numThreads=50

druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=200MiB
druid.indexer.fork.property.druid.processing.numThreads=2

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

middleManager/jvm.config

-server
-Xms128m
-Xmx128m
-XX:+ExitOnOutOfMemoryError
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

router/runtime.properties

druid.router.http.numConnections=50
druid.router.http.readTimeout=PT5M
druid.router.http.numMaxThreads=100
druid.server.http.numThreads=100

druid.router.defaultBrokerServiceName=druid/broker
druid.router.coordinatorServiceName=druid/coordinator

druid.router.managementProxy.enabled=true

router/jvm.config

-server
-Xms512m
-Xmx512m
-XX:+UseG1GC
-XX:MaxDirectMemorySize=512m
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

Thanks

Hi Dwija,

While your MM configs look fine [assuming 15 workers is acceptable], I think the node is well past recommended limits. If this is a test cluster for experimentation, you could try to debug this but this is definitely not recommended for production.

If you do intend to take this to production, I would definitely increase the CPUs so you can stick to the the number of workers being about (vCPUs/4)

It is important to note that queries and ingestion share CPUs, and querying activities can take up unused ingestion CPUs but not vice-versa. So you can extrapolate behavior when you have a heavy query load.

Hope this helps and please reach out if you have further questions.

Hi @Vijeth_Sagar

Thanks for all the suggestions.

So what I understand

Real time Data from Kafka topic cannot be queried until the corresponding indexing task is over. The reason for this is that the indexing task spawned by the supervisor is in PENDING state and is waiting to get its turn to RUN.

To overcome this, i need to increase the worker capacity so that indexing task get its turn to RUN immediately once supervisor creates it.

Is not it possible to query the CDC data before its get segmented by the indexing task ?

Correct me if i am wrong

Thanks

You can query realtime data while it is being ingested. But the task has to start ingesting data in the first place. If it is in pending state, data is not yet coming in to druid and naturally you can’t query it in Druid.

1 Like