Ingestion of csv format data with Tranquility Kafka

Hello,

I have been tested ingestion of csv format data from kafka to druid using tranquility.

I already check bellow topic, but that is not helpful for me.

https://groups.google.com/forum/#!searchin/druid-user/csv|sort:relevance/druid-user/ub6gswZ_kl4/0VXmQxvZDgAJ

So here is my configuration and command, could you recommend or comment on my code?

1.tranquility configuration

{
“dataSources” : {
“josh_csv_test_2” : {
“spec” : {
“dataSchema” : {
“dataSource” : “josh_csv_test_2”,
“parser” : {
“type” : “string”,
“parseSpec” : {
“format” : “csv”,
“timestampSpec” : {
“column” : “timestamp”,
“format” : “millis”
},
“columns” : [“timestamp”,“a”,“b”,“c”,“d”,“e”],
“dimensionsSpec” : {
“dimensions” : [“a”,“b”,“c”,“d”,“e”]
},
“listDelimiter” : “,”,
}
},
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “TEN_MINUTE”,
“queryGranularity” : “minute”
},
“metricsSpec” : [
{
“type” : “count”,
“name” : “count”
}
]
},
“ioConfig” : {
“type” : “realtime”
},
“tuningConfig” : {
“type” : “realtime”,
“maxRowsInMemory” : “400000”,
“intermediatePersistPeriod” : “PT1M”,
“windowPeriod” : “PT1M”
}
},
“properties” : {
“task.partitions” : “1”,
“task.replicants” : “1”,
“topicPattern” : “mytopic”,
“task.warmingPeriod” : “PT1M”,
“druidBeam.firehoseBufferSize” : “3200000”,
“druidBeam.firehoseChunkSize” : “120000”
}
},
},
“properties” : {
“zookeeper.connect” : “localhost”,
“druid.discovery.curator.path” : “/druid/discovery”,
“druid.selectors.indexing.serviceName” : “druid/overlord”,
“commit.periodMillis” : “10000”,
“consumer.numThreads” : “2”,
“kafka.zookeeper.connect” : “192.168.60.18:2181”,
“kafka.group.id” : “mytopic”,
“tranquility.maxBatchSize”: “640000”,
“tranquility.lingerMillis”: “2000”,
“kafka.fetch.wait.max.ms” : “500”,
“kafka.fetch.message.max.bytes” : “67108864”,
“kafka.fetch.min.bytes” : “33554432”,
“kafka.socket.receive.buffer.bytes” : “2097152”
}
}

``

2. csv format data

2013-08-31T03:32:45Z,ABC,DEF,GHI,JKL,MNO

``

3.Result

2017-09-19 00:10:21,592 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=1, sentCount=0, failedCount=1}} pending messages in 0ms and committed offsets in 2ms.
2017-09-19 00:10:31,593 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=0, sentCount=0, failedCount=0}} pending messages in 0ms and committed offsets in 0ms.
2017-09-19 00:10:41,596 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 0ms and committed offsets in 3ms.
2017-09-19 00:10:51,599 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 0ms and committed offsets in 3ms.
2017-09-19 00:11:01,603 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 0ms and committed offsets in 2ms.
2017-09-19 00:11:11,606 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 0ms and committed offsets in 2ms.
2017-09-19 00:11:21,609 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 0ms and committed offsets in 3ms.
2017-09-19 00:11:31,612 INFO [com.metamx.tranquility.kafka.KafkaConsumer] Flushed {mytopic={receivedCount=4, sentCount=0, failedCount=4}} pending messages in 1ms and committed offsets in 2ms.

``

As you check ‘failedCount=1’, Tranquility does not normally work.

What am I missed?

Thank you.

Regards,

Josh.