Hi,
I am using Druid 0.19.0 and ingesting from Kafka stream which has protobuf binary objects. I am able to create a data source for the root level simple protobuf elements but not sure how to flatten or ingest complex objects in protobuf.
My proto looks like this:
message Obj {
required string System = 1;
required string Id = 2;
optional Product product = 3;
}
System, Id are coming up as expected in Data source. Is there a way I can flatten Product or ingest Product as string as whole ?
Thanks
Hi Jain.
Was facing the same problem.
you should be able to see the information as such with adding this in the flattenSpec fields
{"name": "eventTime","type": "path","expr": "$.Obj.product.<any field from product>" },
Thanks for the reply.
I have below spec and it is not working for nested proto as you suggested. Looks like my flatten spec is not at the correct place. Could you please confirm ?
{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “test-20”,
“parser”: {
“type”: “protobuf”,
“descriptor”: “file:///home/arpit/obj.desc”,
“protoMessageType”: “Obj”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{ “name”: “Product”, “type”: “path”, “expr”: “$.Obj.product.productType” }
]
},
“parseSpec”: {
“format”: “json”,
“timestampSpec”:{
“column”: “InsertedTime”,
“format”: “auto”,
“missingValue”: “2000-01-01T01:02:03.456”
},
“dimensionsSpec”: {
“Product”,
“Id”
},
“metricsSpec”: ,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: “NONE”
}
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsPerSegment”: 300000
},
“ioConfig”: {
“topic”: “my_topic”,
“consumerProperties”: {
“bootstrap.servers”: “host1:port1”
},
“taskCount”: 1,
“replicas”: 1,
“taskDuration”: “PT1H”,
“useEarliestOffset”: true
}
}
I believe they have changed the ingestion on how it works on the later versions of Druid ( i am on 0.20.2) and you are on an older version.
I am not certain how the ingestion might work on an older version ( i.e if their any difference with my version )
I have attached a minimum spec that you will need. You need to include the flattenSpec inside the parseSpec.
{
"spec": {
"type": "kafka",
"dataSchema": {
"dataSource": "kafka-topic",
"parser": {
"type": "protobuf",
"descriptor": "file:///opt/apache-druid-0.20.2/conf/protobuf/file.desc",
"protoMessageType": "Obj",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "datetime",
"format": "posix"
},
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"name": "sid",
"type": "path",
"expr": "$.product.sid"
},
{
"name": "symbol",
"type": "path",
"expr": "$.product.symbol"
}
]
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": []
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
},
"type": "kafka"
}
Really appreciate your help !
My spec is exactly like you suggested. However, I think we also need to add the name (sid, symbol) in dimension. I tried both ways with/without adding the new name in dimension but still getting the field as null.
Try adding just the expression as such. This should give you the flatten string in.
I can see some data with the earlier spec you suggested.
Thanks a lot for your quick response.