Hi
I am processing the kafka ingestion and the data format is protobuf.
Druid version is 0.20.0
The supervisor-spec.json as follows
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "172.32.4.250:9092"
},
"topic": "allevent"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"dataSchema": {
"dataSource": "allevent_stream",
"parser": {
"type": "protobuf",
"descriptor": "file:///home/ubuntu/autopilot_event.desc",
"protoMessageType": "AutopilotEvent",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "logTime",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"eventDate",
"firstLogin",
"relative_day",
"bundleId",
"platform",
"deviceModel",
"device_brand_model",
"deviceLevel",
"deviceId",
"customerUserId",
"imei",
"autopilotId",
"eventName",
"eventType",
"region",
"afStatus",
"mediaSource",
"topicId",
{
"name": "cases",
"type": "string",
"multiValueHandling": "array"
},
{
"name": "config_id",
"type": "string",
"multiValueHandling": "array"
},
{
"name": "case_first_dates",
"type": "string",
"multiValueHandling": "array"
},
"isSampleUser",
"stagedRolloutRandom",
"adgroup",
"uaAgeMax",
"uaGender",
"uaAgeMin",
"adgroupId",
"device_reinstalled",
"dimMappingVersion",
"activeDays",
"font",
"like_play",
"like_ad_rv",
"ability_play",
"ability_ecpm_rv",
"ability_ecpm_inters",
"ability_pay",
"store"
]
}
}
},
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"segmentGranularity": "HOUR"
},
"metricsSpec": [
{
"type": "count",
"name": "event_count"
},
{
"type": "doubleSum",
"name": "event_value",
"fieldName": "eventValue",
"expression": null
},
{
"type": "thetaSketch",
"name": "device_id_sketch",
"fieldName": "deviceId",
"size": 16384,
"shouldFinalize": true,
"isInputThetaSketch": false,
"errorBoundsStdDev": null
},
{
"type": "longMax",
"name": "event_time",
"fieldName": "eventTime",
"expression": null
}
],
"transformSpec": {
"filter": {
"type": "selector",
"dimension": "value_abnormal",
"value": "false",
"extractionFn": null
},
"transforms": [
{
"type": "expression",
"name": "relative_day",
"expression": "(unix_timestamp(eventDate) - unix_timestamp(firstLogin)) / 86400"
},
{
"type": "expression",
"name": "device_brand_model",
"expression": "concat(deviceBrand, '__', deviceModel)"
},
{
"type": "expression",
"name": "value_abnormal",
"expression": "if((eventName == 'main_app_open' && eventValue < 0) || (eventName == 'main_app_close' && (eventValue <0 || eventValue > 43200)) || (eventName == 'extended_active' && eventValue < 0) || (eventName == 'ad_show' && eventValue < 0) || (eventName == 'ad_click' && eventValue < 0) || (eventName == 'iap' && (eventValue < 0 || eventValue > 1000)) || (eventName == 'test' && eventValue <0) || (eventName == 'app_open' && eventValue < 0) || (eventName == 'app_close' && (eventValue < 0 || eventValue > 43200)) || (eventName == 'active' && eventValue < 0) || (eventName == 'ad_display' && eventValue < 0), 'true', 'false')"
}
]
}
}
}
}
And the protobuf description file is
syntax = "proto3";
import "google/protobuf/wrappers.proto";
message AutopilotEvent {
int64 autopilotId = 1;
string eventDate = 2;
string bundleId = 3;
repeated string cases = 4;
int64 eventTime = 5;
int64 logTime = 6;
int64 localTimestamp = 7;
string logDate = 8;
string deviceId = 9;
string sdkVersion = 10;
string osVersion = 11;
string appVersion = 12;
string platform = 13;
string region = 14;
int64 timeZone = 15;
int64 uaAgeMax = 16;
int64 uaAgeMin = 17;
double eventValue = 18;
string extended = 19;
string downloadChannel = 20;
string afStatus = 21;
string deviceType = 22;
string deviceBrand = 23;
string deviceModel = 24;
string uaGender = 25;
string mediaSource = 26;
string campaign = 27;
string campaignId = 28;
string adgroup = 29;
string adgroupId = 30;
string adset = 31;
string adsetId = 32;
string adId = 33;
string audienceExtended = 34;
string topicId = 35;
string topicType = 36;
string eventName = 37;
string eventType = 38;
string eventExtended = 39;
string firstLogin = 40;
string customerUserId = 41;
string customAudience = 42;
string sessionId = 43;
string eventId = 44;
string eventMeta = 45;
string adNetwork = 46;
string adMeta = 47;
string idfv = 48;
string reqMeta = 49;
string language = 50;
string advertisingId = 51;
google.protobuf.BoolValue isSampleUser = 52;
int64 dimMappingVersion = 53;
map<string, string> strDynamicDimensions = 54;
map<string, double> numDynamicDimensions = 55;
double sampleRatio = 56;
string deviceLevel = 57;
string city = 58;
string installAppVersion = 59;
string imei = 60;
string accountId = 61;
double stagedRolloutRandom = 62;
string user_local_date = 63;
repeated string case_first_dates = 64;
string server_version = 65;
google.protobuf.BoolValue device_reinstalled = 66;
int32 activeDays = 67;
string ua_account_id= 68;
string font = 69;
double like_play = 70;
double like_ad_rv = 71;
double ability_play = 72;
double ability_ecpm_rv = 73;
double ability_ecpm_inters = 74;
string ability_pay = 75;
repeated string config_id = 76;
string store = 77;
}
Then I submit the kafka ingestion supervisors and the task run successfully. But an hour later, I didn’t see any datasource to be created.
And the supervisor status as follow image
It seem that the supervisor didn’t got any data from the topic in kafka, but the supervisor got the topic offset and the offset had been growing.
What the error at this processing?
Please help and thanks!