Druid kafka ingestion task can run successfully but no data source to be created

Hi
I am processing the kafka ingestion and the data format is protobuf.
Druid version is 0.20.0

The supervisor-spec.json as follows

{
    "type": "kafka",
    "spec": {
        "ioConfig": {
            "type": "kafka",
            "consumerProperties": {
                "bootstrap.servers": "172.32.4.250:9092"
            },
            "topic": "allevent"
        },
        "tuningConfig": {
            "type": "kafka",
            "maxRowsPerSegment": 5000000
        },
        "dataSchema": {
            "dataSource": "allevent_stream",
            "parser": {
                "type": "protobuf",
                "descriptor": "file:///home/ubuntu/autopilot_event.desc",
                "protoMessageType": "AutopilotEvent",
                "parseSpec": {
                    "format": "json",
                    "timestampSpec": {
                        "column": "logTime",
                        "format": "auto"
                    },
                    "dimensionsSpec": {
                        "dimensions": [
                            "eventDate",
                            "firstLogin",
                            "relative_day",
                            "bundleId",
                            "platform",
                            "deviceModel",
                            "device_brand_model",
                            "deviceLevel",
                            "deviceId",
                            "customerUserId",
                            "imei",
                            "autopilotId",
                            "eventName",
                            "eventType",
                            "region",
                            "afStatus",
                            "mediaSource",
                            "topicId",
                            {
                                "name": "cases",
                                "type": "string",
                                "multiValueHandling": "array"
                            },
                            {
                                "name": "config_id",
                                "type": "string",
                                "multiValueHandling": "array"
                            },
                            {
                                "name": "case_first_dates",
                                "type": "string",
                                "multiValueHandling": "array"
                            },
                            "isSampleUser",
                            "stagedRolloutRandom",
                            "adgroup",
                            "uaAgeMax",
                            "uaGender",
                            "uaAgeMin",
                            "adgroupId",
                            "device_reinstalled",
                            "dimMappingVersion",
                            "activeDays",
                            "font",
                            "like_play",
                            "like_ad_rv",
                            "ability_play",
                            "ability_ecpm_rv",
                            "ability_ecpm_inters",
                            "ability_pay",
                            "store"
                        ]
                    }
                }
            },
            "granularitySpec": {
                "type": "uniform",
                "queryGranularity": "HOUR",
                "segmentGranularity": "HOUR"
            },
            
            "metricsSpec": [
                {
                    "type": "count",
                    "name": "event_count"
                },
                {
                    "type": "doubleSum",
                    "name": "event_value",
                    "fieldName": "eventValue",
                    "expression": null
                },
                {
                    "type": "thetaSketch",
                    "name": "device_id_sketch",
                    "fieldName": "deviceId",
                    "size": 16384,
                    "shouldFinalize": true,
                    "isInputThetaSketch": false,
                    "errorBoundsStdDev": null
                },
                {
                    "type": "longMax",
                    "name": "event_time",
                    "fieldName": "eventTime",
                    "expression": null
                }
            ],
            "transformSpec": {
                "filter": {
                    "type": "selector",
                    "dimension": "value_abnormal",
                    "value": "false",
                    "extractionFn": null
                },
                "transforms": [
                    {
                        "type": "expression",
                        "name": "relative_day",
                        "expression": "(unix_timestamp(eventDate) - unix_timestamp(firstLogin)) / 86400"
                    },
                    {
                        "type": "expression",
                        "name": "device_brand_model",
                        "expression": "concat(deviceBrand, '__', deviceModel)"
                    },
                    {
                        "type": "expression",
                        "name": "value_abnormal",
                        "expression": "if((eventName == 'main_app_open' && eventValue < 0) || (eventName == 'main_app_close' && (eventValue <0 || eventValue > 43200)) || (eventName == 'extended_active' && eventValue < 0) || (eventName == 'ad_show' && eventValue < 0) || (eventName == 'ad_click' && eventValue < 0) || (eventName == 'iap' && (eventValue < 0 || eventValue > 1000)) || (eventName == 'test' && eventValue <0) || (eventName == 'app_open' && eventValue < 0) || (eventName == 'app_close' && (eventValue < 0 || eventValue > 43200)) || (eventName == 'active' && eventValue < 0) || (eventName == 'ad_display' && eventValue < 0), 'true', 'false')"
                    }
                ]
            }
        }
    }
}

And the protobuf description file is

syntax = "proto3";
import "google/protobuf/wrappers.proto";
message AutopilotEvent {

  int64 autopilotId = 1;
  string eventDate = 2;
  string bundleId = 3;
  repeated string cases = 4;
  int64 eventTime = 5;
  int64 logTime = 6;
  int64 localTimestamp = 7;
  string logDate = 8;
  string deviceId = 9;
  string sdkVersion = 10;
  string osVersion = 11;
  string appVersion = 12;
  string platform = 13;
  string region = 14;
  int64 timeZone = 15;
  int64 uaAgeMax = 16;
  int64 uaAgeMin = 17;
  double eventValue = 18;
  string extended = 19;
  string downloadChannel = 20;
  string afStatus = 21;
  string deviceType = 22;
  string deviceBrand = 23;
  string deviceModel = 24;
  string uaGender = 25;
  string mediaSource = 26;
  string campaign = 27;
  string campaignId = 28;
  string adgroup = 29;
  string adgroupId = 30;
  string adset = 31;
  string adsetId = 32;
  string adId = 33;
  string audienceExtended = 34;
  string topicId = 35;
  string topicType = 36;
  string eventName = 37;
  string eventType = 38;
  string eventExtended = 39;
  string firstLogin = 40;
  string customerUserId = 41;
  string customAudience = 42;
  string sessionId = 43;
  string eventId = 44;
  string eventMeta = 45;
  string adNetwork = 46;
  string adMeta = 47;
  string idfv = 48;
  string reqMeta = 49;
  string language = 50;
  string advertisingId = 51;
  google.protobuf.BoolValue isSampleUser = 52;
  int64 dimMappingVersion = 53;
  map<string, string> strDynamicDimensions = 54;
  map<string, double> numDynamicDimensions = 55;
  double sampleRatio = 56;
  string deviceLevel = 57;
  string city = 58;
  string installAppVersion = 59;
  string imei = 60;
  string accountId = 61;
  double stagedRolloutRandom = 62;
  string user_local_date = 63;
  repeated string case_first_dates = 64;
  string server_version = 65;
  google.protobuf.BoolValue device_reinstalled = 66;
  int32 activeDays = 67;
  string ua_account_id= 68;
  string font = 69;
  double like_play = 70;
  double like_ad_rv = 71;
  double ability_play = 72;
  double ability_ecpm_rv = 73;
  double ability_ecpm_inters = 74;
  string ability_pay = 75;
  repeated string config_id = 76;
  string store = 77;
}

Then I submit the kafka ingestion supervisors and the task run successfully. But an hour later, I didn’t see any datasource to be created.

And the supervisor status as follow image

It seem that the supervisor didn’t got any data from the topic in kafka, but the supervisor got the topic offset and the offset had been growing.

What the error at this processing?
Please help and thanks!

Hi @jimomiaomiao ,
Welcome to Druid Forum.
Can you share screenshots of the Ingestion, Services and Segments views?
Do you see the real-time segments in the Segments view? Can you query it?

How is your cluster deployed? What is your Deep Storage?

Hi @Sergio_Ferragut
The screenshots of the Ingestion as follows

And I don’t see the real-time segments in the Segments view and I can’t query it.

The cluster deployed in the common runtime properties


druid.extensions.loadList=["postgresql-metadata-storage", "druid-protobuf-extensions", "druid-kafka-indexing-service", "druid-kinesis-indexing-service", "druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "druid-s3-extensions", "druid-parquet-extensions", "druid-avro-extensions", "druid-basic-security","graphite-emitter"]

druid.startup.logging.logProperties=true

druid.zk.service.host=127.0.0.1:2181
druid.zk.paths.base=/druid


druid.metadata.storage.type=postgresql
druid.metadata.storage.connector.connectURI=jdbc:postgresql://localhost:5432/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=druid



druid.storage.type=s3
druid.storage.bucket=2133-autopilot-data-dev
druid.storage.baseKey=autopilot-servicedata/druid/segments
druid.s3.accessKey=my key
druid.s3.secretKey=my secret key


druid.indexer.logs.type=s3
druid.indexer.logs.s3Bucket=2133-autopilot-data-dev
druid.indexer.logs.s3Prefix=druid/indexing-logs

#
# Service discovery
#

druid.selectors.indexing.serviceName=druid/overlord
druid.selectors.coordinator.serviceName=druid/coordinator
#
# Monitoring
#

# druid.monitoring.monitors=["org.apache.druid.java.util.metrics.JvmMonitor"]
# druid.emitter=logging
# druid.emitter.logging.logLevel=info

# Storage type of double columns
# ommiting this will lead to index double as float at the storage layer

druid.indexing.doubleStorage=double

#
# SQL
#
druid.sql.enable=true

My cluster Metadata Storage is postgreSql and the Deep Storage is S3.

And I found I can get json format data from the kafka stream Ingestion and generated real-time Segments and queried it. But the protobuf format data failed. And my druid version is 0.20.0. It is the low version problem?

@Sergio_Ferragut
And I am a new user, so I can only upload one picture at a time.
The screenshot of the Service as follows

If it were a protobuf version problem, I would expect to see some errors in the task log. Are there any?
The protobuf version is 3.11.0 from Druid 0.20.0 and still the same in the latest Driud 24.0.0 release.

What protobuf version is being used to produce the messages?

No error log found in the ingestion task. Even in the picture above, the Ingestion procressed 0 pieces of data, processedWithError was 0, thrownAway was 0 and unparseable was 0. It was a strange thing.
Our project use the flink with protobuf-java 3.8.0 to write protobuf data into Kafka. And the low protobuf version caused this problem?

Very strange thing :slight_smile:
I am no expert, but according to this they should be compatible.

Have you seen this example: Protobuf · Apache Druid

There have been changes from parser, parseSpec to the use of inputFormat in newer releases. Not that this has anything to do with the issue, but in newer versions you will need to migrate to use this form.

I’m wondering whether the import and google.protobuf.BoolValue is tripping something up or maybe the map<T,T> dynamic dimensions. I’m not sure how those would be translated to Druid data types. An error would be nice, but since you said the offsets are moving, it is weird that it does not result in any progress on the row counters, so I’m guessing we’re dealing with some silent error situation.

The protobuf extension has evolved quite a bit since 0.20.0, I’m not sure if some of those changes will deal with your ingestion, but are you in a position to upgrade Druid and test this using the new inputFormat spec?

Thanks bro, I tried to delete google.protobuf.BoolValue and the task was succeed. And is there any I can add the third party import into Druid?
I do have the idea to upgrade Druid, but in the Druid documents, I only found this, and I didn’t find any command lines or shell scripts to upgrade. Can you give me some more detailed methods?

@jimomiaomiao,
In general, the recommendation is to first do an upgrade on a test/dev cluster where you can test your existing workloads after the upgrade. You should also read through the release notes since 0.20.0. There are specific notes regarding upgrades in a few of the notes. It will also familiarize you with newer features that may be useful for your use cases.

Let us know how it goes.

Thanks for the reply!!
And I think the feature Task autoscaling for Kafka and Kinesis streaming ingestion in Druid 0.22.0 is very important. How to upgrade to version 0.22.0 from version 0.20.0?
There is another problem, can I dynamic update the Kafka ingest supervisor-spec file without stopping the ingestion?

Here’s a similar conversation regarding upgrade procedure: How to upgrade druid version 0.18.1 to version 0.21.1

Regarding the supervisor ingestion, if you are not changing the spec, you can run the upgrade without interruption. See this Rolling updates · Apache Druid
If you mean, changing the ingestion spec, you can replace it live, but it will go through an automatic process of stopping the existing supervisor and its tasks gracefully and then starting up the new one. You should think of this as a separate step from the version upgrade.