Consuming kafka avro data

Hi,

I’m trying to consume avro data from a kafka stream on my machine, but without success.

I’ve created the topic and I’m injecting the following data in it:

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic calvin
–property value.schema=’{“type”:“record”,“name”:“calvin_”,“fields”:[{“name”:“time”,“type”:“string”},{“name”:“foo”,“type”:“string”}]}’
–property schema.registry.url=http://localhost:9999
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar1”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar2”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar3”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar4”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar5”}
{“time”:“2019-11-21T15:53:29+00:00”,“foo”:“bar6”}

``

I’ve added druid-avro-extensions to my druid and the put the following dataset:

{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “calvin-kafka”,
“parser”: {
“type”: “avro_stream”,
“avroBytesDecoder”: {
“type”: “schema_inline”,
“schema”: {
“name”: “calvin_”,
“type”: “record”,
“fields”: [
{
“name”: “time”,
“type”: “string”
},
{
“name”: “foo”,
“type”: “string”
}
]
}
},
“parseSpec”: {
“format”: “avro”,
“dimensionsSpec”: {
“dimensions”: [
“foo”
]
},
“timestampSpec”: {
“format”: “auto”,
“column”: “time”
}
}
},
“metricsSpec”: ,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: {
“type”: “none”
},
“rollup”: true,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”:
}
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: 20000000,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/Users/miguel.silvestre/apps/apache-druid-0.16.0-incubating/var/tmp/1574333623857-0”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“indexSpecForIntermediatePersists”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT30S”,
“intermediateHandoffPeriod”: “P2147483647D”,
“logParseExceptions”: false,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 0,
“skipSequenceNumberAvailabilityCheck”: false
},
“ioConfig”: {
“topic”: “calvin”,
“replicas”: 1,
“taskCount”: 1,
“taskDuration”: “PT3600S”,
“consumerProperties”: {
“bootstrap.servers”: “localhost:9092”
},
“pollTimeout”: 100,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: true,
“completionTimeout”: “PT1800S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“stream”: “calvin”,
“useEarliestSequenceNumber”: true,
“type”: “kafka”
},
“context”: null,
“suspended”: false
}

``

However the tasks are always in state running and I got the following error (

org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor - [16] unparseable events discarded. Turn on debug logging to see exception stack trace.)

on logs:

2019-11-21T17:38:48,844 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Built incremental fetch (sessionId=182690815, epoch=114) for node 0. Added (), altered (), removed () out of (calvin-0)
2019-11-21T17:38:48,844 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(calvin-0)) to broker 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:48,844 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=182690815,session_epoch=114,topics=[],forgotten_topics_data=[]} with correlation id 122 to node 0
2019-11-21T17:38:48,844 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:48,875 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:48,975 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,077 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,182 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,285 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Skipping fetch for partition calvin-0 because there is an in-flight request to 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,315 TRACE [MonitorScheduler-0] org.apache.druid.java.util.common.concurrent.ScheduledExecutors - Running org.apache.druid.java.util.metrics.MonitorScheduler$1@2a620208 (period PT60S)
2019-11-21T17:38:49,316 **ERROR** [MonitorScheduler-0] org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor - [16] unparseable events discarded. Turn on debug logging to see exception stack trace.
2019-11-21T17:38:49,345 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Completed receive from node 0 for FETCH with correlation id 122, received {throttle_time_ms=0,error_code=0,session_id=182690815,responses=[]}
2019-11-21T17:38:49,345 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Node 0 sent an incremental fetch response for session 182690815 with response=(), implied=(calvin-0)
2019-11-21T17:38:49,345 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Added READ_COMMITTED fetch request for partition calvin-0 at offset 16 to node 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,345 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Built incremental fetch (sessionId=182690815, epoch=115) for node 0. Added (), altered (), removed () out of (calvin-0)
2019-11-21T17:38:49,345 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(calvin-0)) to broker 10.8.2.15:9092 (id: 0 rack: null)
2019-11-21T17:38:49,345 TRACE [task-runner-0-priority-0] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=kafka-supervisor-gminnanm] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=182690815,session_epoch=115,topics=[],forgotten_topics_data=[]} with correlation id 123 to node 0

Hi Miguel,
Is it possible to try the following