Kafka Indexing Service, partial load of data

Hi,
I’m testing the Kafka Indexing Service on version 0.10.1-rc3 , but my data are partially loaded (only shard1 has data) or completely not loaded.

I currently have 2 partitions on Kafka.

Looking at MiddleManager logs what I’ve found are a lot of these errorrs:

2017-08-16 09:08:29,855 WARN i.d.s.r.a.AppenderatorDriver [task-runner-0-priority-0] Cannot allocate segment for timestamp[2017-08-16T09:01:47.000-04:00], sequenceName[index_kafka_buck_bidding2_613e5185e295383_1].

2017-08-16 09:08:29,857 INFO i.d.s.r.a.AppenderatorDriver [task-runner-0-priority-0] Persisting data.

2017-08-16 09:08:29,863 INFO i.d.s.r.a.AppenderatorImpl [task-runner-0-priority-0] Submitting persist runnable for dataSource[buck_bidding2]

2017-08-16 09:08:29,987 DEBUG o.a.k.c.c.KafkaConsumer [task-runner-0-priority-0] The Kafka consumer has closed.

2017-08-16 09:08:29,990 INFO i.d.s.r.a.AppenderatorImpl [task-runner-0-priority-0] Shutting down…

2017-08-16 09:08:29,995 INFO i.d.s.r.f.ServiceAnnouncingChatHandlerProvider [task-runner-0-priority-0] Unregistering chat handler[index_kafka_buck_bidding2_613e5185e295383_lfcbllgm]

2017-08-16 09:08:29,998 ERROR i.d.i.o.ThreadPoolTaskRunner [task-runner-0-priority-0] Exception while running task[KafkaIndexTask{id=index_kafka_buck_bidding2_613e5185e295383_lfcbllgm, type=index_kafka, dataSource=buck_bidding2}]

io.druid.java.util.common.ISE: Could not allocate segment for row with timestamp[2017-08-16T09:01:47.000-04:00]

at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:462) ~[?:?]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.10.1-rc3.jar:0.10.1-rc3]

at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.10.1-rc3.jar:0.10.1-rc3]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_73]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_73]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_73]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]

2017-08-16 09:08:30,030 INFO i.d.i.o.TaskRunnerUtils [task-runner-0-priority-0] Task [index_kafka_buck_bidding2_613e5185e295383_lfcbllgm] status changed to [FAILED].

2017-08-16 09:08:30,048 INFO i.d.i.w.e.ExecutorLifecycle [task-runner-0-priority-0] Task completed with status: {

“id” : “index_kafka_buck_bidding2_613e5185e295383_lfcbllgm”,

“status” : “FAILED”,

“duration” : 1440

}

``

my supervisor spec is as below:

{

“type”:“kafka”,

“dataSchema” : {

“dataSource” : “buck_bidding2”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “insert_datetime”,

“format” : “yyyy-MM-dd HH:mm:ss”

},

“dimensionsSpec” : {

“dimensions”: [“account_executive”,“account_manager”,“account_onboarding”,“account_sales”,“advertiser_bag”,“advertiser_id”,“advertiser_price”,“advertiser_transaction”,“app_bundle”,“banner_size”,“bid_floor”,“bid_price”,“bid_time”,“browser_id”,“browser_version_id”,“campaign_id”,“campaign_type”,“carrier_id”,“categories”,“city_id”,“company_id”,“company_price”,“creative_id”,“country_id”,“currency”,“conversion_type”,“deal_id”,“device_make_id”,“device_model_id”,“device_os_id”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [

{“type”: “longSum”,“name”: “bid_response”,“fieldName”: “bid”},

{“type”: “longSum”,“name”: “wins”,“fieldName”: “win”},

{“type”: “longSum”,“name”: “clicks”,“fieldName”: “click”},

{“type”: “longSum”,“name”: “clicks_global”,“fieldName”: “click_total”},

{“type”: “longSum”,“name”: “conversion”,“fieldName”: “convert”},

{“type”: “longSum”,“name”: “clickfraud_count”,“fieldName”: “click_fraud”},

{“type”: “longSum”,“name”: “conversionfraud_count”,“fieldName”: “convert_fraud”}

],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “HOUR”,

“queryGranularity”: “NONE”

}

},

“ioConfig” : {

“topic” : “buck_bidding”,

“consumerProperties”: {

“bootstrap.servers”: “10.20.50.223:9092,10.20.50.235:9092”

},

“taskCount”: 4,

“replicas”: 1,

“taskDuration”: “PT1H”

},

“tuningConfig”: {

“type” : “kafka”,

“maxRowsInMemory”: 150000,

“intermediatePersistPeriod”: “PT10m”,

“buildV9Directly”: true

}

}

``

This is the middleManager runtime.properties

druid.service=druid:rm:middleManager

druid.host=10.20.50.228

druid.port=8091

Task Log Module (Overlord and MiddleManager node)

druid.indexer.logs.type=file

druid.indexer.logs.directory=/usr/local/druid-0.10.1-rc3/logs/

HTTP server threads

druid.server.http.numThreads=25

Middle Manager Configuration

druid.worker.ip=10.20.50.228

druid.worker.capacity=15

Peon Configuration

druid.indexer.task.chathandler.type=announce

druid.indexer.task.baseTaskDir=/usr/local/druid-0.10.1-rc3/task

druid.indexer.runner.javaOptsArray = ["-XX:OnOutOfMemoryError=kill -9 %p","-Duser.timezone=America/New_York", “-Dfile.encoding=UTF-8”]

Taken from http://druid.io/docs/latest/Production-Cluster-Configuration.html

druid.indexer.fork.property.druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”]

druid.indexer.fork.property.druid.computation.buffer.size=536870912

druid.indexer.fork.property.druid.processing.numThreads=2

druid.indexer.fork.property.druid.server.http.numThreads=50

druid.indexer.fork.property.druid.request.logging.type=file

druid.indexer.fork.property.druid.request.logging.dir=/usr/local/druid-0.10.1-rc3/logs

druid.indexer.fork.property.druid.storage.type=local

druid.indexer.fork.property.druid.storage.storageDirectory=/nfs/dataStorage

druid.indexer.fork.property.druid.segmentCache.locations=[{“path”: “/druid/middle/indexCache”, “maxSize”: 0}]

druid.indexer.fork.property.druid.server.http.numThreads=50

``

Can you please help me what’s wrong in my configuration

Thanks

Maurizio

Hi,

an update looking at Overlord logs I’ve found these warnings:

2017-08-17 07:57:13,059 WARN i.d.m.IndexerSQLMetadataStorageCoordinator [qtp604990529-90] Cannot allocate new segment for dataSource[buck_bidding2], interval[2017-08-17T08:21:00.000Z/2017-08-17T08:22:00.000Z], maxVersion[2017-08-17T07:57:13.022-04:00]: conflicting segment[buck_bidding2_2017-08-17T04:00:00.000-04:00_2017-08-17T05:00:00.000-04:00_2017-08-17T04:21:42.984-04:00].

2017-08-17 07:57:13,061 WARN i.d.m.IndexerSQLMetadataStorageCoordinator [qtp604990529-90] Cannot allocate new segment for dataSource[buck_bidding2], interval[2017-08-17T08:21:42.000Z/2017-08-17T08:21:43.000Z], maxVersion[2017-08-17T07:57:13.022-04:00]: conflicting segment[buck_bidding2_2017-08-17T04:00:00.000-04:00_2017-08-17T05:00:00.000-04:00_2017-08-17T04:21:42.984-04:00].

2017-08-17 08:00:31,438 WARN i.d.m.IndexerSQLMetadataStorageCoordinator [qtp604990529-120] Cannot allocate new segment for dataSource[buck_bidding2], interval[2017-08-17T08:00:00.000Z/2017-08-17T09:00:00.000Z], maxVersion[2017-08-17T08:00:31.414-04:00]: conflicting segment[buck_bidding2_2017-08-17T04:00:00.000-04:00_2017-08-17T05:00:00.000-04:00_2017-08-17T04:21:42.984-04:00].

2017-08-17 08:00:31,442 WARN i.d.m.IndexerSQLMetadataStorageCoordinator [qtp604990529-120] Cannot allocate new segment for dataSource[buck_bidding2], interval[2017-08-17T08:15:00.000Z/2017-08-17T08:30:00.000Z], maxVersion[2017-08-17T08:00:31.414-04:00]: conflicting segment[buck_bidding2_2017-08-17T04:00:00.000-04:00_2017-08-17T05:00:00.000-04:00_2017-08-17T04:21:42.984-04:00].

``

I’m currently using timezone America/New_York on all servers and each Druid service in the JVM -Duser.timezone=America/New_York

Could this create issues on segment generation?

Thanks

Maurizio