Is Druid able to load Avro messages from Kafka as real-time ingestion?

Hello:

I was wonder whether Druid is able to ingest avro format messages from kafka? I was reading some articles and seems like avro is not the most supported format when loading from Kafka? Since I still can’t get my Kafka and Druid talk to each other yet, I was trying to load a avro file from local to see how avro works on Druid. When it came to the Parse Data step, I don’t have the avro option in the drop-down list. I had the “druid-avro-extensions” in my environment file (I installed Druid by docker).

So this made me think, even I got Druid connect to my Kafka, does that mean Druid still can not read avro messages from the pipeline as well?

Thanks in advance!
Auto Generated Inline Image 1.pngAuto Generated Inline Image 2.png

Julie

Hi Julie
I have always had difficulty trying to injest avro format to Druid. I always use ksql streams to convert it to json format and load druid using it.

Auto Generated Inline Image 1.png

Auto Generated Inline Image 2.png

Hi, Visakh:

Just curious, when loading data from Kafka, what are the steps we should follow? Shall we submit supervisor first? I didn’t do the supervisor step, I was just loading data from UI dashboard and followed the UI step by step. I thought when you load data from UI, it’s going to build the supervisor and spec automatically for you, isn’t it?

Thanks!

Julie

What version of Druid are you using?

Also, avro may not be supported by the console. I can check with the devs, but you may want to try creating the ingestion spec and submitting it by hand to make sure your extension is loaded correctly and to get around the error in the meantime.

I’m running Druid by docker. It’s said version 2.2 in the docker-compose file. I have the “druid-avro-extensions” and “druid-kafka-indexing-service” in my environment file. So I should try to submit the ingestion spec manually right? I will give it a try. Thanks!

Julie

Yes
You can also try manually submitting the injestion spec

Hi, Visakh:

I tried to submit the injection spec manually. I used some sample data since the production data contains over 60 columns.

It seems not support avro format? My Druid is installed by docker which is version 2.2. Did I miss anything here?

Thank you!

Hi Julie
For Avro type should be avro_ocf in the ingestion spec

See example spec below

https://druid.apache.org/docs/latest/ingestion/data-formats.html#avro-ocf

Hi, Visakh:

I actually tried avro_ocf type and still not working. I’m starting to wonder whether it’s because the version in docker doesn’t support avro format?
Auto Generated Inline Image 1.png

Can you send some sample data over to try to reproduce it?

Auto Generated Inline Image 1.png

Hi, Rachel:

Here is my avro sample data:

{
“timestamp”: 1596569868191,
“partyId”: “0:kdgccxi2:rr8agJ0n2bYFcrdCKeEBpxwQYLl1J2Ap”,
“eventId”: “0:3l8XZxYFFxaRjmhGz9Fi7gw0ykRJcQuv14”,
“eventType”: “active”
}

and the schema:

{
“namespace”: “clickstream”,
“type”: “record”,
“name”: “ClickstreamEventSchema”,
“doc”: “The clickstream event schema.”,
“fields”: [
{ “name”: “timestamp”, “type”: “long”, “doc”: “The timestamp of the time the request was received by the server, in milliseconds since the UNIX epoch.” },
{ “name”: “partyId”, “type”: [“null”, “string”], “default”: null, “doc”: “A long-lived unique identifier stored by a client that is associated with each event they send. All events from the same client should have the same party identifier. For browser sources this value is stored in a cookie.” },
{ “name”: “eventId”, “type”: [“null”, “string”], “default”: null, “doc”: “A unique identifier that is associated with each event received from a source. (This identifier is assigned by the client, not by the server.)” },
{ “name”: “eventType”, “type”: [“null”, “string”], “default”: null, “doc”: “The type of event being processed. The tracking tag used by sites integrating with browser sources automatically issue a pageView event by default when a page-view commences.” }
]
}

I can see the data in Kafka but can’t ingest them into Druid by using Druid Docker version. (I have tried ingest from UI and submit ingest spec manually)

Any help would be appreciated!

Julie

Can you send the ingestion spec you are using please?

Hi, Rachel:

By the way, if you ever get the sample data working, please share the ingestion spec file. I’m very new to submitting spec file manually. That’s probably the reason that my ingestion failed.

Thanks!

Julie

Hi, Rachel:

Below is my ingestion spec file. When I submit it, it seems running and reading data from my kafka. But when I query the data in datasource, they all show null like below. Something wrong with my spec file but I don’t know where and how.

Any help would be appreciated!

Thanks!

{
“dataSchema”: {
“dataSource”: “clickstream-simple-avro”,
“timestampSpec”: null,
“dimensionsSpec”: null,
“metricsSpec”: ,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “HOUR”,
“queryGranularity”: {
“type”: “none”
},
“rollup”: false,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”:
},
“parser”: {
“type”: “avro_stream”,
“avroBytesDecoder”: {
“type”: “schema_inline”,
“schema”: {
“namespace”: “io.divolte.record”,
“type”: “record”,
“name”: “DefaultEventRecord”,
“fields”: [
{
“name”: “timestamp”,
“type”: “long”,
“doc”: “The timestamp of the time the request was received by the server, in milliseconds since the UNIX epoch.”
},
{
“name”: “partyId”,
“type”: [
“null”,
“string”
],
“default”: null,
“doc”: “A long-lived unique identifier stored by a client that is associated with each event they send. All events from the same client should have the same party identifier. For browser sources this value is stored in a cookie.”
},
{
“name”: “eventId”,
“type”: [
“null”,
“string”
],
“default”: null,
“doc”: “A unique identifier that is associated with each event received from a source. (This identifier is assigned by the client, not by the server.)”
},
{
“name”: “eventType”,
“type”: [
“null”,
“string”
],
“default”: null,
“doc”: “The type of event being processed. The tracking tag used by sites integrating with browser sources automatically issue a pageView event by default when a page-view commences.”
}
]
}
},
“parseSpec”: {
“format”: “avro”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [
“partyId”,
“eventType”,
“eventId”
],
“dimensionExclusions”:
}
}
}
},
“ioConfig”: {
“topic”: “clickstream-simple-avro”,
“inputFormat”: null,
“replicas”: 1,
“taskCount”: 1,
“taskDuration”: “PT300S”,
“consumerProperties”: {
“bootstrap.servers”: “172.16.12.157:29092”
},
“pollTimeout”: 100,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: true,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“lateMessageRejectionStartDateTime”: null,
“stream”: “clickstream-simple-avro”,
“useEarliestSequenceNumber”: true,
“type”: “kafka”
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: null,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/apache-druid-0.17.0/var/tmp/druid-realtime-persist3809866181960123894”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”,
“intermediateHandoffPeriod”: “P2147483647D”,
“logParseExceptions”: true,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 100,
“skipSequenceNumberAvailabilityCheck”: false,
“repartitionTransitionDuration”: “PT120S”
},
“type”: “kafka”
}

I got it work in my local. In case someone is interested, here is my spec file:

Thanks for everyone’s help!

{
“dataSchema”: {
“dataSource”: “clickstream”,
“timestampSpec”: null,
“dimensionsSpec”: null,
“metricsSpec”: ,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “HOUR”,
“queryGranularity”: {
“type”: “none”
},
“rollup”: false,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”:
},
“parser”: {
“type”: “avro_stream”,
“avroBytesDecoder”: {
“url”: “http://myip:8081”,
“type”: “schema_registry”
},
“parseSpec”: {
“format”: “timeAndDims”,
“timestampSpec”: {
“column”: “timestamp”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [
“partyId”,
“eventId”,
“eventType”,
“fieldId”,
“eventValue”,
“sessionId”,
… (other columns)
]
}
}
}
},
“ioConfig”: {
“topic”: “clickstream”,
“inputFormat”: null,
“replicas”: 1,
“taskCount”: 1,
“taskDuration”: “PT300S”,
“consumerProperties”: {
“bootstrap.servers”: “myip:29092”
},
“pollTimeout”: 100,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: true,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“lateMessageRejectionStartDateTime”: null,
“stream”: “clickstream”,
“useEarliestSequenceNumber”: true,
“type”: “kafka”
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: null,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/apache-druid-0.17.0/var/tmp/druid-realtime-persist3809866181960123894”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”,
“intermediateHandoffPeriod”: “P2147483647D”,
“logParseExceptions”: true,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 100,
“skipSequenceNumberAvailabilityCheck”: false,
“repartitionTransitionDuration”: “PT120S”
},
“type”: “kafka”
}

1 Like

Thanks for Julie for sharing the solution across.