Druid 0.12.1 (Installed via HDP-3.1.0.6)
Hello,
I have approximately 30 data sources that were created from ingesting data from a single Kafka topic. The data published to the topic consists of events with 30 different protocols (ie. SMTP, HTTP, DNS, etc.). A Kafka ingest spec (see below for example) was written for each of these 30 protocols using a transformSpec filter to match the specific protocol. However, in the running tasks ingest logs, I’m seeing the following entries, indicating events are being thrown away:
2019-10-04T14:50:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [876] events thrown away because they are outside the window period!
2019-10-04T14:51:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [321] events thrown away because they are outside the window period!
2019-10-04T14:52:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,128] events thrown away because they are outside the window period!
2019-10-04T14:53:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,131] events thrown away because they are outside the window period!
2019-10-04T14:54:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,129] events thrown away because they are outside the window period!
I have other data sources with similar ingest spec, but without a transformSpec filter, and, they're fine. I have spent hours digging into the details, including source code, to determine why the RealtimeMetricsMonitor is logging that message. My only conclusion is the filtering is the culprit. Any ideas ?
{
"type": "kafka",
"dataSchema": {
"dataSource": "SMTP",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "jq",
"name": "field_1",
"expr": ".event.\"field_1\""
},
{
"type": "jq",
"name": "field_2",
"expr": ".event.\"field_2\""
},
{
"type": "jq",
"name": "protocol",
"expr": ".event.\"protocol\""
},
... many other fields
]
},
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "total_events"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
},
"transformSpec": {
"filter": {
"type": "selector",
"dimension": "protocol",
"value": "SMTP"
}
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": true,
"resetOffsetAutomatically": false
},
"ioConfig": {
"topic": "some_kafka_topic",
"replicas": 2,
"taskCount": 3,
"useEarliestOffset": false,
"taskDuration": "PT30M",
"completionTimeout": "PT60M",
"consumerProperties": {
"bootstrap.servers": "server1:6667,server2:6667"
}
}
}