Ingesting map in protobuf into druid through Kafka ingestion spec

I am trying to ingest a protobuf object which contains a map into the druid using Kafka supervisor. Details regarding the same are explained in the below steps.

  • Step 1: Create a protobuf object
syntax = "proto2";
package test;

option java_package = "com.sk.test";
option java_outer_classname = "ImpressionProto";
option java_multiple_files = true;

message Impression {
  required string dt = 1;
  required string impressionId = 2;
  required double dataPrice = 3;
  map<string, double> dataProviderDataPrice = 4;
}

using command

protoc --include_imports --descriptor_set_out=impression.desc src/main/protobuf/impression.proto

to generate impression.desc which I upload to S3.

  • Step 2
    Ingested the data (string as key serializer and KafkaProtobufSerializer as value serializer) into kafka.
package com.sk.test;

import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaProducerSample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
        properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");


        Producer<String, com.sk.test.Impression> producer = new KafkaProducer<>(properties);

        Map<String, Double> temp = new HashMap<>();
        temp.put("dataPrice1", 1.2);

        com.sk.test.Impression impression = com.sk.test.Impression.newBuilder()
                .setDt("2022-03-22T14:00:00.000Z")
                .setImpressionId("1")
                .setDataPrice(1.2)
                .putAllDataProviderDataPrice(temp)
                .build();

        ProducerRecord<String, Impression> record = new ProducerRecord<>("protobufTestSK3", null, impression);
        producer.send(record);
        producer.flush();
        producer.close();

    }
}
  • Step 3
    trying to ingest data into druid using following spec
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "protobufTestSK3",
      "timestampSpec": {
        "column": "dt",
        "format": "auto",
        "missingValue": null
      },
      "dimensionsSpec": {
        "dimensions": [
          {
            "type": "string",
            "name": "impressionId",
            "multiValueHandling": "SORTED_ARRAY",
            "createBitmapIndex": true
          }
        ],
        "dimensionExclusions": [
          "dataProviderDataPrice_dataPrice1",
          "dataProviderDataPrice_dataPrice2",
          "dataPrice"
        ],
        "includeAllDimensions": false
      },
      "metricsSpec": [
        {
          "type": "doubleSum",
          "fieldName": "dataPrice",
          "name": "dataPrice",
          "expression": null
        },
        {
          "type": "doubleSum",
          "fieldName": "dataProviderDataPrice_dataPrice1",
          "name": "dataPrice1"
        },
        {
          "type": "doubleSum",
          "fieldName": "dataProviderDataPrice_dataPrice2",
          "name": "dataPrice2"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE",
        "rollup": true,
        "intervals": []
      },
      "transformSpec": {
        "filter": null,
        "transforms": []
      }
    },
    "ioConfig": {
      "topic": "protobufTestSK3",
      "inputFormat": {
        "type": "protobuf",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [
            {
              "type": "root",
              "name": "impressionId",
              "expr": "impressionId"
            },
            {
              "name": "dataProviderDataPrice_dataPrice1",
              "type": "path",
              "expr": "$.dataProviderDataPrice.dataPrice1"
            },
            {
              "name": "dataProviderDataPrice_dataPrice2",
              "type": "path",
              "expr": "$.dataProviderDataPrice.dataPrice2"
            }
          ]
        },
        "protoBytesDecoder": {
          "type": "file",
          "descriptor": "<s3_path_to_descriptor>",
          "protoMessageType": "Impression"
        }
      },
      "replicas": 1,
      "taskCount": 1,
      "taskDuration": "PT3600S",
      "consumerProperties": {
        "bootstrap.servers": "<list_of_brokers>"
      },
      "autoScalerConfig": null,
      "pollTimeout": 100,
      "startDelay": "PT5S",
      "period": "PT30S",
      "useEarliestOffset": true,
      "completionTimeout": "PT1800S",
      "lateMessageRejectionPeriod": null,
      "earlyMessageRejectionPeriod": null,
      "lateMessageRejectionStartDateTime": null,
      "useEarliestSequenceNumber": true,
      "type": "kafka"
    },
    "tuningConfig": {
      "type": "kafka",
      "appendableIndexSpec": {
        "type": "onheap"
      },
      "maxRowsInMemory": 1000000,
      "maxBytesInMemory": 500000000,
      "skipBytesInMemoryOverheadCheck": false,
      "maxRowsPerSegment": 10000000,
      "maxTotalRows": 20000000,
      "intermediatePersistPeriod": "PT10M",
      "maxPendingPersists": 0,
      "indexSpec": {
        "bitmap": {
          "type": "concise"
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "indexSpecForIntermediatePersists": {
        "bitmap": {
          "type": "concise"
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "reportParseExceptions": false,
      "handoffConditionTimeout": 0,
      "resetOffsetAutomatically": false,
      "segmentWriteOutMediumFactory": null,
      "workerThreads": null,
      "chatThreads": null,
      "chatRetries": 8,
      "httpTimeout": "PT10S",
      "shutdownTimeout": "PT80S",
      "offsetFetchPeriod": "PT30S",
      "intermediateHandoffPeriod": "P2147483647D",
      "logParseExceptions": false,
      "maxParseExceptions": 2147483647,
      "maxSavedParseExceptions": 0,
      "skipSequenceNumberAvailabilityCheck": false,
      "repartitionTransitionDuration": "PT120S"
    }
  }
}

With above config I am not able to ingest data into Druid. I am getting following exception,

2022-03-31T05:36:44,229 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
org.apache.druid.java.util.common.parsers.ParseException: Fail to decode protobuf message!
	at org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder.parse(FileBasedProtobufBytesDecoder.java:86) ~[?:?]
	at org.apache.druid.data.input.protobuf.ProtobufReader.intermediateRowIterator(ProtobufReader.java:83) ~[?:?]
	at org.apache.druid.data.input.IntermediateRowParsingReader.intermediateRowIteratorWithMetadata(IntermediateRowParsingReader.java:231) ~[druid-core-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:49) ~[druid-core-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43) ~[druid-processing-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:70) ~[druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135) ~[druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104) ~[druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:633) [druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:265) [druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:149) [druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477) [druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449) [druid-indexing-service-2022.03.0-iap.jar:2022.03.0-iap]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_241]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_241]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_241]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]

I am using druid 0.22.1
kafka version 2.5.1

Regards,
Shrinivas

Welcome @shrinivaskulkarni! Looking at the ParseException, I wonder if it’s the Protobuf object itself:

@Override
  public DynamicMessage parse(ByteBuffer bytes)
  {
    bytes.get(); // ignore first \0 byte
    int id = bytes.getInt(); // extract schema registry id
    bytes.get(); // ignore \0 byte before PB message
    int length = bytes.limit() - 2 - 4;
    Descriptors.Descriptor descriptor;
    try {
      ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id);
      descriptor = schema.toDescriptor();
    }
    catch (RestClientException e) {
      LOGGER.error(e.getMessage());
      throw new ParseException(null, e, "Fail to get protobuf schema because of can not connect to registry or failed http request!");
    }
    catch (IOException e) {
      LOGGER.error(e.getMessage());
      throw new ParseException(null, e, "Fail to get protobuf schema because of invalid schema!");
    }
    try {
      byte[] rawMessage = new byte[length];
      bytes.get(rawMessage, 0, length);
      DynamicMessage message = DynamicMessage.parseFrom(descriptor, rawMessage);
      return message;
    }
    catch (Exception e) {
      LOGGER.error(e.getMessage());
      throw new ParseException(null, e, "Fail to decode protobuf message!");
    }
  }

I’m wondering if your schema doesn’t match your message?

Hi @Mark_Herrera , Thanks for the reply. I am producing the message using the simple code below,

package com.sk.test;

import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaProducerSample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
        properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");


        Producer<String, com.sk.test.Impression> producer = new KafkaProducer<>(properties);

        Map<String, Double> temp = new HashMap<>();
        temp.put("dataPrice1", 1.2);

        com.sk.test.Impression impression = com.sk.test.Impression.newBuilder()
                .setDt("2022-03-22T14:00:00.000Z")
                .setImpressionId("1")
                .setDataPrice(1.2)
                .putAllDataProviderDataPrice(temp)
                .build();


        ProducerRecord<String, Impression> record = new ProducerRecord<>("protobufTestSK3", null, impression);
        producer.send(record);
        producer.flush();
        producer.close();

    }
}

Is there any issue with the above message producer?
Secondly, I think it’s using FileBasedProtobufBytesDecoder and not SchemaRegistryBasedProtobufBytesDecoder as I have specified the file in protoBytesDecoder.

And I am able to read the Protobuf message using a sample Kafka consumer code below,

import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerSample {
    public static void main (String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
        properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Impression.class.getName());

        KafkaConsumer<String, Impression> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("protobufTestSK3"));

        while (true) {
            ConsumerRecords<String, Impression> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, Impression> record : records) {
                System.out.println("Impression Date: " + record.value().getDt());
                System.out.println("Impression Id: " + record.value().getImpressionId());
                System.out.println("Data Price: " + record.value().getDataPrice());
                System.out.println("DataProvider Data Price: " + record.value().getDataProviderDataPriceMap().toString());
            }
            consumer.commitAsync();
        }


    }
}

Again thanks for the response.

Regards,
Shrinivas

Hi Shrinivas,

You’re welcome, and thanks for sharing your code! I’ll try to reproduce this in the next several days. Let’s be in touch.

Best,

Mark

Hi @shrinivaskulkarni,

I’m wondering whether this might be a version compatibility issue.
The version of protobuf that Druid 0.22.1 is built with is 3.11.0.
What version are you using to produce/consume the messages?

You might want to match versions, at least as a test.

Let us know how it goes,

Sergio

@Sergio_Ferragut I am using protobuf version 3.12.2 in the simple Java Protobuf message producer and consumer. Do you want the version to match exactly?

@shrinivaskulkarni I don’t know if they are backward/forward compatible, but given the parse exception, I think it is worth a test.

Also, looking at your spec and your consumer example, I see that in one case you are using a descriptor file and in the other the schema registry. Are they the same? Can you share the descriptor file? Have you tried using the schema registry option for the spec, instead of the file?
I think it may be useful to figure out if there are differences between your consumer test code schema and the ingestion spec schema.

@Sergio_Ferragut apologies for the late reply. The issue was with the Serializer & Deserializer. I was using KafkaProtobufSerializer & KafkaProtobufDeserializer initially, which is correct if one used the Kafka schema repository. I was using a descriptor file to resolve the schema. After I changed it to ByteArraySerializer & ByteArrayDeserializer it worked for me.

1 Like

I’m glad you figured it out. Do you think we can make that clearer somewhere in the documentation?

@Sergio_Ferragut we can mention this point in this link about consuming Kafka protobuf messages without schema repository.