Speed decreases when ingesting data via tranquility + spark streaming

I’m trying to ingest experiment data into druid with hundreds of Schema-less dimensions using tranquility + spark streaming.
The initial data is like :[user_id, experiment_id, etc],and experiment_id contains hundreds of experiments string such as :exp_1, exp_2, exp_3…

To ingest this kind of data to druid, i explode experiment_id to hundreds of Schema-less dimensions.

The kafka speed is 6000000 events/minute.

At first, the speed is pretty good,30s batch uses 15s finished ingesting to druid.

Howerer, with time goes, the speed is slower and slower.

My tranquility config:

{

“dataSources”: [

{

“spec”: {

“dataSchema”: {

“dataSource”: “imitation_test_uv”,

“metricsSpec”: [

{

“type”: “doubleSum”,

“name”: “recordCount”,

“fieldName”: “recordCount”

},

{

“name”: “uv”,

“type”: “hyperUnique”,

“fieldName”: “userId”,

“round”: true

}

],

“granularitySpec”: {

“segmentGranularity”: “hour”,

“queryGranularity”: “hour”,

“type”: “uniform”

},

“parser”: {

“type”: “string”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “ts”,

“format”: “auto”

},

“dimensionsSpec”: {

“dimensions”: [

]

}

}

}

},

“tuningConfig”: {

“type”: “realtime”,

“windowPeriod”: “PT10M”,

“intermediatePersistPeriod”: “PT10M”,

“maxRowsInMemory”: “50000”

}

},

“properties”: {

“task.partitions”: “3”,

“task.replicants”: “1”

}

}

],

“properties”: {

“zookeeper.connect”: “10.0.49.210:2181”,

“druid.selectors.indexing.serviceName”: “druid/overlord”,

“druid.discovery.curator.path”: “/druid/discovery”,

“tranquility.maxBatchSize”: “200000”,

“druidBeam.firehoseChunkSize”: “10000”,

“tranquility.lingerMillis”: “-1”,

“druidBeam.firehoseGracePeriod”: “PT1M”

}

}

My middle manager runtime.properties:

druid.service=druid/middleManager

druid.host=hd-experiment04

druid.port=8091

Number of tasks per middleManager

druid.worker.capacity=12

druid.processing.numThreads=12

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xmx6g -XX:MaxDirectMemorySize=16g -XX:+UseG1GC -XX:MaxGCPauseMil

druid.indexer.task.baseTaskDir=var/druid/task

HTTP server threads

druid.server.http.numThreads=50

Processing threads and buffers on Peons

druid.indexer.fork.property.druid.processing.buffer.sizeBytes=1073741824

druid.indexer.fork.property.druid.processing.numThreads=12

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.7.3”]

I think maybe the reason is too many dimensions or my configuration is not good.

Any suggestions to deal with this issue? Many thanks.

Hi Frank,

I am also trying to perform something similar . I am using Spark Streaming + tranquility beamrdd adapter which is mentioned in the Spark Doc of Tranquility.

I was not aware that we need to create a tranquilty config also.

Can you please let me know what are the steps to be performed ?

Thank you so much

Got it working. Thanks

Hi novice,
Happy to know that.The example in druid-io/tranquility is using builder() function.If you want to use a config file, you can use fromConfig() func.

And my problem is solved by using multi-value dimension.

在 2018年8月17日星期五 UTC+8上午11:04:34,Druid_novice写道:

Hi Frank,

Thank you for sharing your knowledge.