Ingestion of csv data with Tranquility Kafka

Hello,

I would like to use Tranquility Kafka to ingest messages in csv format into a druid cluster.

Currently I am using a realtime node to perform ingestion and everything works fine.

When I start tranquility it seems to me that it tries to parse the messages as json instead of csv and consequently it fails. Probably I am missing something obvious. Could you please have a look and tell me what I am doing wrong?

Here is the configuration that I am passing to tranquility

{
“dataSources” : [
{
“spec” : {
“dataSchema” : {
“dataSource” : “content20stats_stage”,
“parser” : {
“type” : “string”,
“parseSpec” : {
“format” : “csv”,
“columns” : [
“timestamp”,
“a_attrs”,
“a_boxes_ctr_id”,
“a_boxes_id”,
“a_scrolls”,
“n_boximpression”,
“n_breakpoint”,
“n_click”,
“n_doc_type”,
“n_fbcomment”,
“n_fblike”,
“n_fbshare”,
“n_gplus”,
“n_impression”,
“n_info”,
“n_mappa”,
“n_searchno”,
“n_staytime”,
“n_twcount”,
“s_area”,
“s_box”,
“s_cat1”,
“s_cat2”,
“s_cat3”,
“s_dest_id”,
“s_doc_id”,
“s_domain”,
“s_link_type”,
“s_pag_id”,
“s_page”,
“s_ref_host”,
“s_ref_path”,
“s_search”,
“s_source”,
“s_ua”
],
“dimensionsSpec” : {
“dimensions” : [
“a_attrs”,
“a_boxes_ctr_id”,
“a_boxes_id”,
“a_scrolls”,
“n_boximpression”,
“n_breakpoint”,
“n_click”,
“n_doc_type”,
“n_fbcomment”,
“n_fblike”,
“n_fbshare”,
“n_gplus”,
“n_impression”,
“n_info”,
“n_mappa”,
“n_searchno”,
“n_staytime”,
“n_twcount”,
“s_area”,
“s_box”,
“s_cat1”,
“s_cat2”,
“s_cat3”,
“s_dest_id”,
“s_doc_id”,
“s_domain”,
“s_link_type”,
“s_pag_id”,
“s_page”,
“s_ref_host”,
“s_ref_path”,
“s_search”,
“s_source”,
“s_ua”
]
},
“listDelimiter” : “;”,
“timestampSpec” : {
“column” : “timestamp”,
“format” : “millis”
}
}
},
“granularitySpec” : {
“queryGranularity” : “MINUTE”,
“segmentGranularity” : “DAY”
},
“metricsSpec” : [{
“name” : “count”,
“type” : “count”
}, {
“fieldName” : “n_impression”,
“name” : “impressions”,
“type” : “longSum”
}, {
“fieldName” : “n_click”,
“name” : “clicks”,
“type” : “longSum”
}, {
“fieldName” : “n_boximpression”,
“name” : “boximpressions”,
“type” : “longSum”
}, {
“fieldName” : “n_staytime”,
“name” : “totstaytime”,
“type” : “longSum”
}, {
“fieldName” : “n_fblike”,
“name” : “fblike”,
“type” : “longSum”
}, {
“fieldName” : “n_fbshare”,
“name” : “fbshare”,
“type” : “longSum”
}, {
“fieldName” : “n_fbcomment”,
“name” : “fbcomment”,
“type” : “longSum”
}, {
“fieldName” : “n_twcount”,
“name” : “twcount”,
“type” : “longSum”
}, {
“fieldName” : “n_searchno”,
“name” : “searchres”,
“type” : “longSum”
}
]
},
“ioConfig” : {
“type” : “realtime”
},
“tuningConfig” : {
“type” : “realtime”,
“maxRowsInMemory” : “100000”,
“intermediatePersistPeriod” : “PT10M”,
“windowPeriod” : “PT10M”,
“rejectionPolicy” : {
“type” : “serverTime”
}
}
},
“properties” : {
“task.partitions” : “1”,
“task.replicants” : “1”,
“topicPattern” : “event”
}
}
],
“properties” : {
“zookeeper.connect” : “127.0.0.1:2181”,
“druid.discovery.curator.path” : “/druid/discovery”,
“druid.selectors.indexing.serviceName” : “overlord”,
“commit.periodMillis” : “15000”,
“consumer.numThreads” : “2”,
“kafka.zookeeper.connect” : “192.168.33.7:2181”,
“kafka.group.id” : “druid-example-stage”,
“kafka.auto.offset.reset” : “largest”,
“reportDropsAsExceptions” : “true”
}
}

and here is the relevant part of the tranquility log

2016-05-24 15:24:48,663 [KafkaConsumer-1] INFO c.metamx.emitter.core.LoggingEmitter - Start: started [true]
2016-05-24 15:24:48,912 [ConsumerFetcherThread-druid-example-stage_parisi-1464103484157-1423c711-0-1] DEBUG kafka.consumer.PartitionTopicInfo - updated fetch offset of (event:0: fetched offset = 4855358982: consumed offset = 4855346182) to 4855358982
2016-05-24 15:24:49,194 [KafkaConsumer-1] DEBUG c.m.t.k.w.TranquilityEventWriter - 1 message(s) failed to parse as JSON and were rejected
com.fasterxml.jackson.core.JsonParseException: Unexpected character (’,’ (code 44)): Expected space separating root-level values
at [Source: [B@11cddec; line: 1, column: 15]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:437) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:453) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1484) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1225) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:792) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:690) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2247) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:88) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]
at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) [io.druid.tranquility-kafka-0.7.4.jar:0.7.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_72-internal]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
2016-05-24 15:24:49,195 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected character (’,’ (code 44)): Expected space separating root-level values
at [Source: [B@11cddec; line: 1, column: 15]
at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:105) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]
at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_72-internal]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (’,’ (code 44)): Expected space separating root-level values
at [Source: [B@11cddec; line: 1, column: 15]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:437) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:453) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1484) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1225) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:792) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:690) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2247) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:88) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]
… 6 common frames omitted
2016-05-24 15:24:49,196 [KafkaConsumer-1] INFO c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets
2016-05-24 15:24:49,197 [KafkaConsumer-1] DEBUG c.m.t.tranquilizer.Tranquilizer - Flushing 0 batches.
2016-05-24 15:24:49,212 [Tranquilizer-BackgroundSend-[com.metamx.tranquility.beam.TransformingBeam@e1ae52]] DEBUG c.m.t.tranquilizer.Tranquilizer - Interrupted, exiting thread.

The command line that I use to launch tranquility kafka is:

./tranquility kafka -configFile …/…/druid-0.9.0/conf/tranquility/kafka.json

Thank you,

Tommaso

Hey Tommaso,

Can you try this with Tranquility 0.8.0? Support for CSV was added in that release.

Thank you Gian,
do you happen to know if there are any plans to release a 0.8.0 distribution? I see that on druid download page the last version is 0.7.4

I tried to build one myself from the sources but I’m afraid I am not familiar with scala and sbt.

Anyway you kind answer sets the issue. I will continue to use realtime nodes for now.

Thanks again,
Tommaso

I tried with the distribution shipped with imply. It works like a charm.

Bye,
Tommaso

Hey Tommaso,

You can also get the most recent download stand-alone here: https://github.com/druid-io/tranquility#downloadable-distribution

Hi Tommasso

I saw that you were also trying to send csv to Druid through tranquility. I am working on the same and have a csv file ready as well as the configuration to pass to Tranquility is in order.
Although I am not able to understand how I can use a curl request to send a csv to tranquility node.

I saw something like this
python createData.py |curl -XPOST -H’Content-Type: text/plain’ --data-binary @- http://localhost:8200/v1/post/audience.

Would you happen to know where do I give the csv file name whose data I need to send to Druid in a syntax like the above?

Thanks in advance

Shubhi Agarwal

Hello Shubhi,

If the file that you want to send is located in /tmp/data.csv , then the command line to use is

curl -XPOST -H ‘Content-Type: text/plain’ --data-binary ‘@/tmp/data.csv’ http://localhost:8200/v1/post/audience

Regards,
Tommaso

Hi Tommaso

Thanks for your reply. I tried this but getting an error:

Malformed string on line[1]

Have a few questions:

Does timestamp need to be the first column in csv file? And should the header be removed from the csv file?

Pasting few lines of csv file:
Symbol,cusip,Position,costBasis,SoDPosition,tradeVolume,sid,pnl,time

BRF,92189F825,427,-1621.00,327,328,11640,63.07943,2016-11-15T01:30:00Z

CURE,25459Y876,-28805,392110.97,-16206,25801,142617,-15373.22706,2016-11-15T01:30:00Z

DDM,74347R305,18016,347244.98,22635,7113,7451,115515.22000,2016-11-15T01:30:00Z

DIA,78467X109,-47450,7469477.51,-7902,116484,2194,29196.32306,2016-11-15T01:30:00Z

DIG,74347R719,86525,-74437.16,84640,36235,9229,269207.69000,2016-11-15T01:30:00Z

And the contents of server.json:
{

“dataSources” : {

“LATOUR” : {

“spec” : {

“dataSchema” : {

“dataSource” : “LATOUR”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“timestampSpec” : {

“column” : “time”,

},

“columns”: [“Symbol”,“cusip”,“Position”,“costBasis”,“SoDPosition”,“tradeVolume”,“sid”,“pnl”,“time”],

“dimensionsSpec” : {

“dimensions” : [“Symbol”,“cusip”,“sid”],

“dimensionExclusions” :

},

“format” : “csv”

}

},

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “hour”,

“queryGranularity” : {“type”: “duration”, “duration”: 10000}

},

“metricsSpec” : [

{

“name” : “Position”,

“type” : “doubleSum”,

“fieldName” : “Position”

},

{

“name” : “costBasis”,

“type” : “doubleSum”,

“fieldName” : “costBasis”

},

{

“name” : “SoDPosition”,

“type” : “doubleSum”,

“fieldName” : “SoDPosition”

},

{

“name” : “tradeVolume”,

“type” : “doubleSum”,

“fieldName” : “tradeVolume”

},

{

“name” : “pnl”,

“type” : “doubleSum”,

“fieldName” : “pnl”

}

]

},

“ioConfig” : {

“type” : “realtime”

},

“tuningConfig” : {

“type” : “realtime”,

“maxRowsInMemory” : “100000”,

“intermediatePersistPeriod” : “PT10M”,

“windowPeriod” : “PT10M”

}

},

“properties” : {

“task.partitions” : “1”,

“task.replicants” : “1”

}

}

},

“properties” : {

“task.warmingPeriod” : “PT5M”,

“druidBeam.firehoseGracePeriod” : “PT1M”,

“tranquility.maxBatchSize” : 1000,

“tranquility.lingerMillis” : 5,

“zookeeper.connect” : “infradruidmaster1.newark”,

“druid.discovery.curator.path” : “/druid/discovery”,

“druid.selectors.indexing.serviceName” : “druid/overlord”,

“http.port” : “8200”,

“http.threads” : “40”

}

}

Am i missing something over here?

Hi,
yes as specified in http://druid.io/docs/latest/ingestion/data-formats.html, you need to remove the header line while ingestion.
Please try after removing the header line.

Thanks, it worked on removing the csv header and keeping time as the first field in the file.

Hi guys

I have another generic question about Druid. Can i have columns in my csv file which fall neither in dimension or measure? For example: Suppose the field is “Price”. I dont see the field being a dimension since it does not identify a product, neither is this a measure since I dont need to do aggregations(sum etc) on various product prices. Please suggest how do i handle such fields which are just a property and their purpose is just to provide more information about the product.

Thanks

Shubhi Agarwal

yes, you can have additional columns and ignore them. Only the columns mentioned as dimensions n dimensionsSpec and metrics from metricsSpec will be ingested, If there are any additional column they will be dropped. In your case add Price to the list of columns in csv parseSpec and remove it from the list of dimensions and metrics.

Hi Nishant

Thanks for your quick response. The thing is that I dont want those columns like “Price” to be dropped. I want those columns to be ingested as well, so that I can see them on Pivot. Its just that these columns dont qualify either as dimension or metric. Hence wondering where to place them.

“parser” : {

“type” : “string”,

“parseSpec” : {“Price”,

“timestampSpec” : {

“column” : “time”,

},

“columns”: [“time”,“Symbol”,“cusip”,“Position”,“costBasis”,“SoDPosition”,“tradeVolume”,“sid”,“pnl”],

“dimensionsSpec” : {

“dimensions” : [“Symbol”,“cusip”,“sid”],

“dimensionExclusions” :

},

“format” : “csv”

}

},

Are you suggesting to add Price like i did over here?

Hey Shubhi,

The ingestion spec as you have it written wouldn’t be valid.

If you want the data to be available in queries and it’s not a metric that you want aggregated, it should be classified as a dimension (and you just wouldn’t use it to do any filtering/splitting).

Hello Everyone,

I am ingesting CSV file to Druid , having metric as count and sum and i am visualizing the data using Superset. but i cant see sum as a part of table which got ingested.

Sample data (Having 7 columns, doing sum on 4th cloumn):-

1969,2013-01-09T03:30:01Z,0 ,1,1,0,39

2107,2013-01-09T03:30:01Z,0 ,1,0,0,4

Metric used while ingesting

“metricsSpec”: [{

“name”: “count”,

“type”: “count”

},

{

“name” : “total”,

“type” : “longSum”,

“fieldName” : “filed4”

Please suggest, if druid have some issues while ingesting CSV file to it or superset has issues