Tranquility processing only 15k messages / sec from Kafka to druid, where is the bottleneck?

We have successfully setup a kafka cluster with druid instances, everything is working fine (producer => kafka => zookeeper => tranquility => druid), we also setup a reporting for druid & can see all messages.
Now the problem:

We benchmarked our producer shooting 4.000.000 messages, all messages are correctly put into kafka, but now ranquility seems to have some maximum limit of messages it can take / process at once, thats about 15.000 / sec to Druid.

Does someone has an idea where to look for the bottleneck, is it some tranquility setting?

Thank you

here our kafka.json:

{

“dataSources” : {

“XXX” : {

“spec” : {

“dataSchema” : {

“dataSource” : “XXX”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“timestampSpec” : {

“column” : “timestamp”,

“format” : “auto”

},

“dimensionsSpec” : {

“dimensions” : [

all dimensions

],

“dimensionExclusions” : [

“timestamp”

]

},

“format” : “json”

}

},

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “hour”,

“queryGranularity” : “none”

},

“metricsSpec” : [

all metrics

]

},

“ioConfig” : {

“type” : “realtime”

},

“tuningConfig” : {

“type” : “realtime”,

“maxRowsInMemory” : “100000”,

“intermediatePersistPeriod” : “PT10M”,

“windowPeriod” : “PT10M”

}

},

“properties” : {

“task.partitions” : “2”,

“task.replicants” : “2”,

“topicPattern” : “XXX.*”,

“topicPattern.priority” : “1”

}

}

},

“properties” : {

“zookeeper.connect” : “XXX.XX.XX.XX:2181”,

“zookeeper.timeout” : “PT20S”,

“druid.selectors.indexing.serviceName” : “druid/overlord”,

“druid.discovery.curator.path” : “/druid/discovery”,

“kafka.zookeeper.connect” :“XXX.X.X.X:2181”,

“kafka.group.id” : “tranquility-kafka”,

“consumer.numThreads” : “2”,

“commit.periodMillis” : “1000”,

“reportDropsAsExceptions” : “false”

}

}

``

How many partitions does your topic have?

How many dimensions and metrics?
How big are the dimension values?
What is the average cardinality per dimension?
What are the specs of your Druid indexer machine?
What was the Java config used for the Druid indexer process?

Our setting:

  • 3x kafka instances (with each tranquility & zookeeper running)

  • 2x overlord instances (coordinator & overlord)

  • 2x middleManager (historical & middleManager)

  • 2x broker

  • 29 dimensions

  • 12 metrics

  • tranquility as above but now with numThreads = 7 & maxRowsInMemory = 500000

relevant kafka server.properties:

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/home/kafka/logs

num.partitions=7

num.recovery.threads.per.data.dir=1

log.flush.interval.messages=1000000

log.flush.interval.ms=10000

log.retention.hours=48

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=zookeeper:2181

zookeeper.connection.timeout.ms=6000

zk.sessiontimeout.ms=5000

confluent.support.metrics.enable=false

auto.create.topics.enable=true

delete.topic.enable=true

default.replication.factor=3

``

kafka zoo.cfg

tickTime=2000

initLimit=10

syncLimit=10

dataDir=/var/lib/zookeeper

clientPort=2181

maxClientCnxns=60

autopurge.snapRetainCount=3

autopurge.purgeInterval=0

leaderServes=yes

preAllocSize=102400

server.1=XXX

server.2=XXX

server.3=XXX

``

kafka zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/var/lib/zookeeper

clientPort=2181

maxClientCnxns=60

autopurge.snapRetainCount=3

autopurge.purgeInterval=0

leaderServes=yes

preAllocSize=65536

server.1=XXX

server.2=XXX

``

Druid _common/common.properties

druid.extensions.loadList=[“druid-kafka-eight”, “druid-s3-extensions”, “druid-histogram”, “druid-datasketches”, “druid-lookups-cached-global”, “mysql-metadata-storage”]

``
druid.startup.logging.logProperties=true

druid.zk.service.host=druid-overlord-1

druid.zk.paths.base=/druid

druid.metadata.storage.type=mysql

druid.metadata.storage.connector.connectURI=jdbc:XXXXX

druid.metadata.storage.connector.user=XXX

druid.metadata.storage.connector.password=XXX

druid.storage.type=s3

druid.storage.bucket=XXX

druid.storage.baseKey=XXX

druid.s3.accessKey=XXX

druid.s3.secretKey=XXX

druid.indexer.logs.type=file

druid.indexer.logs.directory=var/druid/indexing-logs

druid.indexer.logs.type=s3

druid.indexer.logs.s3Bucket=XXX

druid.indexer.logs.s3Prefix=XXX

druid.selectors.indexing.serviceName=druid/overlord

druid.selectors.coordinator.serviceName=druid/coordinator

druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]

druid.emitter=logging

druid.emitter.logging.logLevel=info

druid.host=XXX

Druid overlord jvm

-server

-Xms3g

-Xmx3g

-XX:NewSize=256m

-XX:MaxNewSize=256m

-XX:+UseConcMarkSweepGC

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=var/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

-XX:MaxDirectMemorySize=4096m

``

Druid overlord runtime

druid.service=druid/overlord

druid.port=8090

druid-host=XXX

druid.indexer.runner.type=remote

druid.indexer.storage.type=metadata

``

Druid coordinator jvm

-server

-Xms3g

-Xmx3g

-XX:NewSize=512m

-XX:MaxNewSize=512m

-XX:+UseG1GC

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=var/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

-Dderby.stream.error.file=var/druid/derby.log

``

Druid coordinator runtime

druid.service=druid/coordinator

druid.port=8081

druid.host=XXX

druid.coordinator.startDelay=PT30S

druid.coordinator.period=PT30S

druid.coordinator.merge.on=true

``

Druid historical jvm

-server

-Xms8g

-Xmx8g

-XX:MaxDirectMemorySize=8192m

-XX:NewSize=4g

-XX:MaxNewSize=4g

-XX:+UseConcMarkSweepGC

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=var/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

-XX:MaxDirectMemorySize=4096m

``

Druid historical runtime

druid.host=XXX

druid.port=8083

druid.service=druid/historical

druid.historical.cache.useCache=true

druid.historical.cache.populateCache=true

druid.processing.buffer.sizeBytes=67108864

druid.processing.numThreads=7

druid.server.http.numThreads=40

druid.segmentCache.locations=[{“path”:“var/druid/segment-cache”,“maxSize”:130000000000}]

druid.server.maxSize=130000000000

``

Druid middleManager runtime

druid.host=XXX

druid.port=8091

druid.service=druid/middlemanager

druid.indexer.runner.javaOpts=-server -Xmx3g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager$

druid.indexer.task.baseTaskDir=var/druid/task

druid.indexer.fork.property.druid.processing.buffer.sizeBytes=67108864

druid.indexer.fork.property.druid.processing.numThreads=7

druid.indexer.fork.property.druid.server.http.numThreads=40

druid.worker.capacity=7

druid.worker.ip=XXX

druid.worker.version=0

``

Druid middleManager jvm

-server

-Xms64m

-Xmx64m

-XX:+UseConcMarkSweepGC

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=var/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

-XX:MaxDirectMemorySize=8192m

``

Druid broker jvm

-server

-Xms8g

-Xmx8g

-XX:MaxDirectMemorySize=8192m

-XX:+UseConcMarkSweepGC

-XX:NewSize=4g

-XX:MaxNewSize=4g

-Duser.timezone=UTC

-Dfile.encoding=UTF-8

-Djava.io.tmpdir=var/tmp

-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

``

Druid broker runtime

druid.service=druid/broker

druid.port=8082

druid.host=XXX

druid.broker.http.numConnections=20

druid.server.http.numThreads=40

druid.processing.buffer.sizeBytes=67108864

druid.processing.numThreads=7

Query cache

druid.broker.cache.useCache=true

druid.broker.cache.populateCache=true

druid.cache.type=local

druid.cache.sizeInBytes=2147483648

``

Forgot to mention: We can definetly exclude the producer as the source of the problem :slight_smile:

Help, anyone? :slight_smile:

druid.indexer.runner.javaOpts=-server -Xmx8g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager

``

Do you have any information about the cardinality of those 29 dimensions? If the cardinalities are really high this seems like a normal speed I would say. It all depends on how fast your middleManager machines are.

Maybe try these middleManaged config changes:
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=268435456
druid.indexer.fork.property.druid.processing.numMergeBuffers=8```
If you have a metric in your source that just counts the number of events you can do a query like this to see how effective the rollup is: https://gist.github.com/erikdubbelboer/07c88de309c356e6933f773e61f1cf31
The lower the cardinalities the better the rollup and the faster everything will be including indexing and querying.
`

Problem solved:
As turned out the bottle neck was apparently our kafka/tranquility server, we increased our ec2 kafka/tranquility instances and got right away 66k/sec.

Thanks though