Kafka Ingest - Events being thrown away

Druid 0.12.1 (Installed via HDP-3.1.0.6)

Hello,

I have approximately 30 data sources that were created from ingesting data from a single Kafka topic. The data published to the topic consists of events with 30 different protocols (ie. SMTP, HTTP, DNS, etc.). A Kafka ingest spec (see below for example) was written for each of these 30 protocols using a transformSpec filter to match the specific protocol. However, in the running tasks ingest logs, I’m seeing the following entries, indicating events are being thrown away:

2019-10-04T14:50:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [876] events thrown away because they are outside the window period!
2019-10-04T14:51:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [321] events thrown away because they are outside the window period!
2019-10-04T14:52:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,128] events thrown away because they are outside the window period!
2019-10-04T14:53:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,131] events thrown away because they are outside the window period!
2019-10-04T14:54:17,855 WARN [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [1,129] events thrown away because they are outside the window period!

I have other data sources with similar ingest spec, but without a transformSpec filter, and, they're fine. I have spent hours digging into the details, including source code, to determine why the RealtimeMetricsMonitor is logging that message. My only conclusion is the filtering is the culprit. Any ideas ?


{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "SMTP",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [
            {
              "type": "jq",
              "name": "field_1",
              "expr": ".event.\"field_1\""
            },
            {
              "type": "jq",
              "name": "field_2",
              "expr": ".event.\"field_2\""
            },
            {
              "type": "jq",
              "name": "protocol",
              "expr": ".event.\"protocol\""
            },

			... many other fields
          ]
        },
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": []
        }
      }
    },
    "metricsSpec" : [
      {
        "type" : "count",
        "name" : "total_events"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "NONE",
      "rollup": false
    },
    "transformSpec": {
      "filter": {
        "type": "selector",
        "dimension": "protocol",
        "value": "SMTP"
      }
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "reportParseExceptions": true,
    "resetOffsetAutomatically": false
  },
  "ioConfig": {
    "topic": "some_kafka_topic",
    "replicas": 2,
    "taskCount": 3,
    "useEarliestOffset": false,
    "taskDuration": "PT30M",
    "completionTimeout": "PT60M",
    "consumerProperties": {
      "bootstrap.servers": "server1:6667,server2:6667"
    }
  }
}

Should I cross-post this to the Druid Slack channel? Any help would be greatly appreciated.

The class TaskRealtimeMetricsMonitor.java has a logging statement in the doMonitor(ServiceEmitter emitter) method with the following:

log.warn("[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.", thrownAway);

``

https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

So, I believe the filter in each of the Kafka ingest specs is what’s causing the message “events thrown away because they are outside the window period!” to be written to the Supervisor task log. Therefore, I’m assuming this is normal and the events are not getting thrown away/dropped. Each Kafka ingest supervisor task should have their own offsets.

If someone could confirm this, it would be appreciated.

Hey JB,

Yes, you’re probably right. In fact, in Druid 0.15.0 we changed the error messge to be more clear. It’s now: “[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside windowPeriod.”

Thanks Gian, I appreciate the confirmation.