Tranquility Kafka customParser


The issue: we have a legacy Kafka topic with a custom text message format (no JSON, CVS, TSV or anything supported).

Question1: Is it possible to build a customParser that knows how to parse the text messages?

Looking over the tranquility code it seems the StringInputRowParser is hardcoded for the Tranquility-Kafka ingestion flow.

We already tried building a druid-extension with a custom InputRowParser but it doesn’t seem to be used at runtime as the default parser (StringInputRowParser) is always chosen.

Question2: Does the custom extension need to be loaded only by the realtime node(tranquility) or also by the master(overlord)?

Thank you!


Is there no way to consume a custom format messages?

Hey Bogdan,

You should be able to define a custom parser use a Druid extension when using the latest Tranquility 0.8.2 + a Druid 0.9.1 extension. What you might be running into is that if there is no registered extension for the “format” of the parser you specify, then it will default to parsing as TSV.

Please make sure that you wrote the extension properly (especially the Guice part that registers a new ByteBufferInputRowParser) and that you’re loading it properly into Tranquility (you can tell based on whether it logs some messages about loading the extension at startup).

Thank you Gian!

There is some confusion around using this extension:

  • my guess is i should refer it using the type property of the “parser” object in the configuration. Am i right?

Ex: “type”: “custom_type”.

  • what is the usage for the “format” property of the “parser” object? Should it stay json or this is the place where i define a custom format?

“format” : “custom_format” or “json” ?

Also, you said Tranquility should be loading the extension. Does this mean i need to copy the custom module libraries in the dist/tranquility/lib folder? Or is it enough to add the extension in dist/druid/extension/custom-module folder and add it to the file?

We managed using the extension for the batch loading of data from hdfs but we still have problems using it with the realtime tranquility nodes.

Regarding the Guice part, this is how we managed it:

new SimpleModule(getClass().getSimpleName())
        .registerSubtypes(new NamedType(ByteBufferInputRowParser.class),
                new NamedType(CustomParser.class, "custom_parser"))

Thank you!

Bogdan G.

Ok, it works now. I had to manually add the extension directory when starting tranquility.
I thought this would be auto- configured like the other druid daemons.