ProtoBuf Parser

Hi,

I see a ProtoBuf Parser is mentioned in the docs as an option (instead of JSON/CSV/TSV parsers), but now more details are given. I also looked at the code in github, at the ProtoBufInputRowParser class, but somewhere I am missing something. If I send messages via Kafka (and the KafkaFirehose) in json, it works fine, and the events gets ingested, but if I switch to use data serialized using protobuf, and setup the Realtime Node to use ProtoBuf parser, then it looks like the Realtime node does nog get any events (or ignores them if it does, there is no output on the Realtime node’s logger)

Any help will be appreciated.

My spec for the realtime node:

[

{

“dataSchema” : {

“dataSource” : “lqtest2”,

“parser” : {

“type” : “protoBuf”,

“descriptor” : “MetricsRecord.proto”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “millis”

},

“dimensionsSpec” : {

“dimensions”: [“userId”,“sourceId”,“deviceId”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

“metricsSpec” : [{

“type” : “count”,

“name” : “count”

}, {

“type” : “max”,

“name” : “maxValue”,

“fieldName” : “testValue”

}, {

“type” : “min”,

“name” : “minValue”,

“fieldName” : “testValue”

}],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “HOUR”,

“queryGranularity” : “SECOND”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “localhost:2181”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“group.id”: “druid-example”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”

},

“feed”: “test”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 500000,

“intermediatePersistPeriod”: “PT5m”,

“windowPeriod”: “PT5m”,

“basePersistDirectory”: “/tmp/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “messageTime”

}

}

}

]

Hi Herman, are there any exceptions in the logs?

If you include the RealtimeMetricsMonitor, are there any metrics about events being ingested, thrownAway, etc?

I’ve managed to get the RealtimeMetricsMonitor going (I’m still a noob with Druid, but really impressed so far)

From the log (see below) I can see that is struggles parsing the data, I’ll switch on debug logging and see if I get more information to trace down.

2015-05-15T08:43:05,019 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - New Server[DruidServerMetadata{name=‘10.0.2.36:8084’, host=‘10.0.2.36:8084’, maxSize=0, tier=’_default_tier’, type=‘realtime’, priority=‘0’}]

2015-05-15T08:44:02,505 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-05-15T08:44:02.492Z”,“service”:“realtime”,“host”:“10.0.2.36:8084”,“metric”:“events/thrownAway”,“value”:0,“user2”:“lqtest2”}]

2015-05-15T08:44:02,505 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [89,999] Unparseable events! Turn on debug logging to see exception stack trace.

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-05-15T08:44:02.506Z”,“service”:“realtime”,“host”:“10.0.2.36:8084”,“metric”:“events/unparseable”,“value”:89999,“user2”:“lqtest2”}]

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-05-15T08:44:02.506Z”,“service”:“realtime”,“host”:“10.0.2.36:8084”,“metric”:“events/processed”,“value”:0,“user2”:“lqtest2”}]

2015-05-15T08:44:02,506 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2015-05-15T08:44:02.506Z”,“service”:“realtime”,“host”:“10.0.2.36:8084”,“metric”:“rows/output”,“value”:0,“user2”:“lqtest2”}]

Thanks

Herman

OK, with debug level logging (see below) I can see that the Realtime node is still using jackson, i.e. expecting json format data.

How can I configure it to use the ProtoBuf parser. My attempt looks like this, but seems not to work:

“dataSchema” : {

“dataSource” : “lqtest2”,

“parser” : {

“type” : “protoBuf”,

“descriptor” : “MetricsRecord.proto”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “millis”

},

“dimensionsSpec” : {

“dimensions”: [“userId”,“sourceId”,“deviceId”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

Please note: if I set parseSpec.format to “protoBuf”, then Druid expects a TSV file, and complains about the rest of configuration not being correct for TSV.

2015-05-15T08:54:17,400 DEBUG [chief-lqtest2] io.druid.segment.realtime.RealtimeManager - thrown away line due to exception, considering unparseable

Pbitmap$0IcommoheartRateIserializedSizeJ UnabletimestampdeviceIdtLjava/lang/String;sourceIdq~LuserIdq~xpCMTU(t$42dd9e99-5ca6-4484-a6b4-8081a861f162t$1a4a19ce-71f1-4cf3-b11b-dfbdb4ab372bt$cbdac087-31a5-4ff3-814f-1b2875252a86]

at com.metamx.common.parsers.JSONParser.parse(JSONParser.java:147) ~[java-util-0.26.15.jar:?]

at io.druid.data.input.impl.StringInputRowParser.parseString(StringInputRowParser.java:86) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.buildStringKeyMap(StringInputRowParser.java:73) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:40) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:19) ~[druid-api-0.3.5.jar:0.3.5]

at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.nextRow(KafkaEightFirehoseFactory.java:118) ~[?:?]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:239) [druid-server-0.7.1.1.jar:0.7.1.1]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (’’ (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, ‘true’, ‘false’ or ‘null’)

Pbitmap$0I: sheartRateIserializedSizeJcsRecortimestampdeviceIdtLjava/lang/String;sourceIdq~LuserIdq~xpCMTU(t$42dd9e99-5ca6-4484-a6b4-8081a861f162t$1a4a19ce-71f1-4cf3-b11b-dfbdb4ab372bt$cbdac087-31a5-4ff3-814f-1b2875252a86; line: 1, column: 2]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:437) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1462) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683) ~[jackson-core-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:1833) ~[jackson-databind-2.4.4.jar:2.4.4]

at com.metamx.common.parsers.JSONParser.parse(JSONParser.java:115) ~[java-util-0.26.15.jar:?]

… 6 more

2015-05-15T08:54:17,401 DEBUG [chief-lqtest2] kafka.consumer.PartitionTopicInfo - reset consume offset of test:0: fetched offset = 71431: consumed offset = 64836 to 64836

can you try “protobuf” instead of “protoBuf”? From the code, it looks like doc is wrong.

– Himanshu

Hi, I changed my spec file affecting this to:

“dataSchema” : {

“dataSource” : “lqtest2”,

“parser” : {

“type” : “protobuf”,

“descriptor” : “MetricsRecord.proto”,

“parseSpec” : {

“format” : “json”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “millis”

},

“dimensionsSpec” : {

“dimensions”: [“userId”,“sourceId”,“deviceId”],

“dimensionExclusions” : ,

“spatialDimensions” :

}

}

},

(i.e. using protobuf, instead of protoBuf, but still had to keep parseSpec.format == json, else it expects TSV params in parseSpec)

No there is progress, for each event (which is a a message object build form a proto file, and then send as Array[Byte] over kafka) I get the exception below, so at least ProtobufInputParser is being used. Any advise on how to tackle this InvalidProtocolBufferException? Might it have to do with the descriptor file I’m setting in the spec file? What format does it expect the “descriptor” in? The .proto definition file, or the class file(s) that protoc has compiled?

The exception I get for each event I want to ingest:

2015-05-16T09:30:30,351 DEBUG [chief-lqtest2] io.druid.segment.realtime.RealtimeManager - thrown away line due to exception, considering unparseable

java.lang.RuntimeException: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.

at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]

at io.druid.data.input.ProtoBufInputRowParser.getDescriptor(ProtoBufInputRowParser.java:127) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.buildStringKeyMap(ProtoBufInputRowParser.java:85) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.parse(ProtoBufInputRowParser.java:78) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.data.input.ProtoBufInputRowParser.parse(ProtoBufInputRowParser.java:41) ~[druid-processing-0.7.1.1.jar:0.3.5]

at io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.nextRow(KafkaEightFirehoseFactory.java:118) ~[?:?]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:239) [druid-server-0.7.1.1.jar:0.7.1.1]

Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.

at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.(DescriptorProtos.java:89) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.(DescriptorProtos.java:47) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet$1.parsePartialFrom(DescriptorProtos.java:136) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet$1.parsePartialFrom(DescriptorProtos.java:131) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:217) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:223) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) ~[protobuf-java-2.5.0.jar:?]

at com.google.protobuf.DescriptorProtos$FileDescriptorSet.parseFrom(DescriptorProtos.java:253) ~[protobuf-java-2.5.0.jar:?]

at io.druid.data.input.ProtoBufInputRowParser.getDescriptor(ProtoBufInputRowParser.java:119) ~[druid-processing-0.7.1.1.jar:0.3.5]

… 5 more

OK, we’ll fix the documentation regarding “protobuf” . I found some more stuff missing in the doc. I don’t think protobuf is well documented and we’ll fix that once things work for you and I know that it works… coz I’ve never used it :slight_smile:

pls try adding the “descriptor” file in the “parser” . here is what you should try…

“parser” : {
“type” : “protobuf”,
“descriptor” : “descriptorFileName” //this will is expected to be present in the classpath
“parseSpec” : {
“format” : “json”,
“timestampSpec” : {
“column” : “timestamp”,
“format” : “auto”
},

more of “dev” info, but…

the “descriptor” file is later read using following code, which seems to be failing for you right now.
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);

later your record is read using following code…
https://github.com/druid-io/druid/blob/master/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java#L85

– Himanshu