Kafka Indexing Service with Kafka message format "Avro"

Hi All,

Will the Kafka indexing service work if my Kafka topic is in avro format? I am trying to work on a realtime ingestion spec from a kafka topic which has messages in avro format. If this service is not ready for type “avro” what other alternative method can I try? Our Kafka topics do not have schema repo, but there is a confluent schema registry.

Can someone give a sample spec which is working on consuming messages from a topic with format avro.

Regards,
gurdeep

Hi Gurdeep,

We are using Avro + the confluent schema registry. There is an Avro module, but it doesn’t support Confluent’s schema registry (it uses a different open source registry). I modified the existing Avro module so that it could be configured to use either one, but I did not try and have it accepted into the main project as it introduces a dependency on another maven repository (Confluent’s). There are a few things I would change if I had time, and my change is unfortunately spread over 3 commits, but it works for us:

https://github.com/shopkick/druid/commit/f005f18672ce5d9cb0066794d4df240c7a865036

https://github.com/shopkick/druid/commit/77bac623957ec4c873f2166dc8f1da74b2e1b569

https://github.com/shopkick/druid/commit/6cbde084719b0a84808637fa099bbf554967f0ca

I made these changes to the 0.9.1.1 branch so I do not know if there are conflicts applying it to 0.9.2.

A supervisor spec would then look like:

{

“type”: “kafka”,

“dataSchema” : {

“dataSource” : “your_data_source”,

“parser” : {

“type” : “avro_stream”,

“avroBytesDecoder” : {

“type” : “schema_repo”,

“subjectAndIdConverter” : {

“type” : “confluent”,

“subject” : “${kafka_topic}”

},

“schemaRepository” : {

“type” : “confluent_client”,

“url” : “http://{schema_registry_host}:{schema_registry_port}”,

“identityMapCapacity” : 1000

}

},

“parseSpec”:{

“format”:“timeAndDims”,

“timestampSpec”:{

“column”:“your_timestamp_column”,

“format”:“auto”

},

“dimensionsSpec”:{

“dimensions”:

}

}

},

“granularitySpec”:{

“type”:“uniform”,

“segmentGranularity”:“DAY”,

“queryGranularity”: “none”

},

“metricsSpec”:[

{

“type”:“count”,

“name”:“count”

}

]

},

“ioConfig” : {

“topic” : “${kafka_topic}”,

“consumerProperties”: {

“bootstrap.servers”: “${kafka_broker_list}”

},

“useEarliestOffset” : true,

“taskCount” : ${task_count},

“replicas” : ${task_replicas},

“taskDuration”: “${task_duration}”

},

“tuningConfig” : {

“type” : “kafka”,

“maxRowsInMemory”: ${max_rows_in_memory},

“buildV9Directly”: true

}

}

Best of luck,

–Ben

Hi Ben,

Do you have a gz file for your distribution with your code in it? I would like to give this a try…

Regards,
Gurdeep

The gz is rather large and I do not have a convenient place where I could put it for you to download. However, you should be able to access the git repo and build it yourself. You will require git, maven and the JDK.

git clone https://github.com/shopkick/druid.git

cd druid

git checkout sk_0.9.1.1

cd distribution

mvn package

That will create a new subdirectory called target in which you can find our distribution: sk_druid-0.9.1.1-sk_bin.tar.gz

Our distribution includes only CDH’s hadoop client and the plugins we are using so you might need to switch those up depending on what features you are using.

–Ben

How are you handling nested schema? Do you use the flattenSpec?

We do not use nested schemata for analytics - they are generally hard to work with. The data is flattened out upstream if necessary.

–Ben

Ben,

I was able to submit a supervisor spec against your distribution and ran into the following error when running against the confluent schema registry…any ideas?

2016-11-15T21:45:02,457 INFO [PinPromo-incremental-persist] io.druid.segment.realtime.appenderator.AppenderatorImpl - Committing metadata[FiniteAppenderatorDriverMetadata{activeSegments={}, lastSegmentIds={}, callerMetadata={nextPartitions=KafkaPartitions{topic='pin-promotions-data', partitionOffsetMap={0=119954, 1=119906, 2=120008, 3=120029, 4=119930, 5=119917, 6=119987, 7=120032, 8=119974, 9=119959}}}}] for sinks[].
2016-11-15T21:45:02,461 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Persisted pending data in 10ms.
2016-11-15T21:45:02,467 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down...
2016-11-15T21:45:02,471 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_PinPromo_fccbdd0569600e8_bfficlgh, type=index_kafka, dataSource=PinPromo}]
java.lang.IllegalArgumentException: Provided string is null or empty: 'null'
	at org.schemarepo.RepositoryUtil.validateSchemaOrSubject(RepositoryUtil.java:135) ~[?:?]
	at org.schemarepo.client.Avro1124RESTRepositoryClient.lookup(Avro1124RESTRepositoryClient.java:82) ~[?:?]
	at org.schemarepo.api.TypedSchemaRepository.getSchema(TypedSchemaRepository.java:144) ~[?:?]
	at io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder.parse(SchemaRepoBasedAvroBytesDecoder.java:78) ~[?:?]
	at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:53) ~[?:?]
	at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:33) ~[?:?]
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:412) ~[?:?]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
2016-11-15T21:45:02,478 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_PinPromo_fccbdd0569600e8_bfficlgh] status changed to [FAILED].
2016-11-15T21:45:02,480 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_kafka_PinPromo_fccbdd0569600e8_bfficlgh",
  "status" : "FAILED",
  "duration" : 1502
}
2016-11-15T21:45:02,483 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.stop()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@3a4aadf8].

You can see from the stack that it is not using my code, you are still going through the Avro1124RESTRepositoryClient:

at org.schemarepo.client.Avro1124RESTRepositoryClient.lookup(Avro1124RESTRepositoryClient.java:82) ~[?:?]

In your parser spec did you specify the subjectAndIdConverter’s type as “confluent”? If so, I would go back and make sure that you are building from the correct branch (sk_0.9.1.1) and that you have correctly deployed the extension.

–Ben

Ben,

Here is a snapshot of my suprevisor spec which I am posting using the following command

curl -X POST -H ‘Content-Type: application/json’ -d @pinPromo.kafka.json http://localhost:8090/druid/indexer/v1/supervisor

{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “PinPromo”,
“parser”: {
“type”: “avro_stream”,
“avroBytesDecoder”: {
“type”: “schema_repo”,
“subjectAndIdConverter”: {
“type”: “confluent”,
“subject”: “pin-promotions-data”
},
“schemaRepository”: {
“type”: “confluent_client”,
“url”: “http://xx.xx.xxx.xxx:8081”,
“identityMapCapacity”: 1000
}
},
“parseSpec”: {
“format”: “timeAndDims”,
“timestampSpec”: {
“column”: “eventDate”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”:
}
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: “none”
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
}
]
},

I did a build from sk_0.9.1.1 and used the extensions generated as result of the build. I had to include the two extensions in my loadlist

druid-avro-extensions
druid-kafka-indexing-service

Which extension would have your code? Let me know what I am doing wrong here…

Regards
Gurdeep

I am not sure what is wrong. The configuration looks fine. Can you unpack the druid-avro-extensions-0.9.1.1.jar file and confirm that there is a confluent directory in there with CachedSchemaRepositoryClientWrapper.class and ConfluentSubjectAndIdConverter.class files?

–Ben

The jar in extensions did not had the confluent class. I followed the instructions you mentioned exactly and did a “mvn package”. I got a “sk_druid-0.9.1.1-sk_bin.tar.gz” in the target folder under distribution, used that as my druid distribution. Not sure why the confluent code is not there in this distribution.

Hi Ben,

I was able to get my superivisor spec running using your distribution, but now I am getting the following errors. Do you know where I can update the settings?

2016-11-21T17:57:30,952 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.client.cache.CacheConfig] from props[druid.realtime.cache.] as [io.druid.client.cache.CacheConfig@611a990b]
2016-11-21T17:57:30,953 ERROR [main] io.druid.cli.CliPeon - Error when starting up.  Failing.
com.google.inject.ProvisionException: Guice provision errors:

__1) Not enough direct memory.  Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, or druid.processing.numThreads: maxDirectMemory[3,817,865,216], memoryNeeded[8,589,934,592] = druid.processing.buffer.sizeBytes[1,073,741,824] * ( druid.processing.numThreads[7] + 1 )
  at io.druid.guice.DruidProcessingModule.getIntermediateResultsPool(DruidProcessingModule.java:108)__
  at io.druid.guice.DruidProcessingModule.getIntermediateResultsPool(DruidProcessingModule.java:108)
  while locating io.druid.collections.StupidPool<java.nio.ByteBuffer> annotated with @io.druid.guice.annotations.Global()
    for parameter 1 at io.druid.query.groupby.GroupByQueryEngine.<init>(GroupByQueryEngine.java:79)
  at io.druid.guice.QueryRunnerFactoryModule.configure(QueryRunnerFactoryModule.java:85)
  while locating io.druid.query.groupby.GroupByQueryEngine
    for parameter 0 at io.druid.query.groupby.GroupByQueryRunnerFactory.<init>(GroupByQueryRunnerFactory.java:64)
  at io.druid.guice.QueryRunnerFactoryModule.configure(QueryRunnerFactoryModule.java:82)
  while locating io.druid.query.groupby.GroupByQueryRunnerFactory

The error is remarkably informative - you need more memory for your particular workload and configuration and the error message has given you the different variables that you can change. It looks like the problem presents when performing a query, which is highly dependent on your data set and query patterns. I’m assuming that this is presenting in the peon’s, in which case you need to change their configuration in the middle manager’s runtime.properties file (druid.indexer.runner.javaOpts or druid.indexer.runner.javaOptsArray).

Ben,

I was able to get the indexing work for avro formats. Thank you for your direction. However my next 2 hurdles are…

  1. Flattening a complex nested schema
  2. Do you know if your code will work with Secured topics? I have a topic which is accessible only over SSL.

Regards,
Gurdeep

There is no support for flattening a nested Avro schema in my code - you would have to add that, or perform the flattening outside of Druid.

I have not tried it with secured topics, but there is nothing in my code specific to Kafka. The Avro translation piece is separate from communicating with Kafka.

–Ben