After upgrade 0.18.1 - 0.20.1: only last 14 days loaded as "realtime"

In February, we updated Druid from 0.18.1 to 0.20.1. No configuration was changed. We are using Kafka-based ingestion. We did not notice any problems at first, and only recently the customer complained about missing data. Checking the segments, I found that until the date we upgraded, the data is there, with size, published=true & realtime=false. After that, there is no data, up until 14 days ago, which seems to be a rolling window with the oldest segment disappearing every day. The segments are listed as size=(realtime), published=false, realtime=true.

The supervisor is showing UNHEALTHY_TASKS:
[moved to comment]
Here is the payload:

{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "xxxxxxxxxx",
      "timestampSpec": {
        "column": "dataCaptureTime",
        "format": "iso",
        "missingValue": null
      },
      "dimensionsSpec": {
        "dimensions": [],
        "dimensionExclusions": [
          "dataCaptureTime"
        ]
      },
      "metricsSpec": [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": {
          "type": "none"
        },
        "rollup": false,
        "intervals": null
      },
      "transformSpec": {
        "filter": null,
        "transforms": []
      },
      "parser": {
        "type": "string",
        "parseSpec": {
          "format": "json",
          "timestampSpec": {
            "column": "dataCaptureTime",
            "format": "iso"
          },
          "dimensionsSpec": {
            "dimensions": []
          },
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": []
          }
        }
      }
    },
    "ioConfig": {
      "topic": "xxxxxxxxxx",
      "inputFormat": {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": []
        },
        "featureSpec": {}
      },
      "replicas": 1,
      "taskCount": 1,
      "taskDuration": "PT86400S",
      "consumerProperties": {
        "bootstrap.servers": "broker:9092"
      },
      "pollTimeout": 100,
      "startDelay": "PT5S",
      "period": "PT30S",
      "useEarliestOffset": true,
      "completionTimeout": "PT1800S",
      "lateMessageRejectionPeriod": null,
      "earlyMessageRejectionPeriod": null,
      "lateMessageRejectionStartDateTime": null,
      "stream": "xxxxxxxxxx",
      "useEarliestSequenceNumber": true
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 1000000,
      "maxBytesInMemory": 0,
      "maxRowsPerSegment": 5000000,
      "maxTotalRows": null,
      "intermediatePersistPeriod": "PT10M",
      "basePersistDirectory": "/opt/apache-druid-0.18.1/var/tmp/druid-realtime-persist1766485076736994494",
      "maxPendingPersists": 0,
      "indexSpec": {
        "bitmap": {
          "type": "roaring",
          "compressRunOnSerialization": true
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "indexSpecForIntermediatePersists": {
        "bitmap": {
          "type": "roaring",
          "compressRunOnSerialization": true
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "buildV9Directly": true,
      "reportParseExceptions": false,
      "handoffConditionTimeout": 0,
      "resetOffsetAutomatically": false,
      "segmentWriteOutMediumFactory": null,
      "workerThreads": null,
      "chatThreads": null,
      "chatRetries": 8,
      "httpTimeout": "PT10S",
      "shutdownTimeout": "PT80S",
      "offsetFetchPeriod": "PT30S",
      "intermediateHandoffPeriod": "P2147483647D",
      "logParseExceptions": false,
      "maxParseExceptions": 2147483647,
      "maxSavedParseExceptions": 0,
      "skipSequenceNumberAvailabilityCheck": false,
      "repartitionTransitionDuration": "PT120S"
    }
  },
  "dataSchema": {
    "dataSource": "xxxxxxxxxx",
    "timestampSpec": {
      "column": "dataCaptureTime",
      "format": "iso",
      "missingValue": null
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "dataCaptureTime"
      ]
    },
    "metricsSpec": [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": false,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    },
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "dataCaptureTime",
          "format": "iso"
        },
        "dimensionsSpec": {
          "dimensions": []
        },
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": []
        }
      }
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/opt/apache-druid-0.18.1/var/tmp/druid-realtime-persist1766485076736994494",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "roaring",
        "compressRunOnSerialization": true
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs",
      "segmentLoader": null
    },
    "indexSpecForIntermediatePersists": {
      "bitmap": {
        "type": "roaring",
        "compressRunOnSerialization": true
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs",
      "segmentLoader": null
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false,
    "repartitionTransitionDuration": "PT120S"
  },
  "ioConfig": {
    "topic": "xxxxxxxxxx",
    "inputFormat": {
      "type": "json",
      "flattenSpec": {
        "useFieldDiscovery": true,
        "fields": []
      },
      "featureSpec": {}
    },
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT86400S",
    "consumerProperties": {
      "bootstrap.servers": "broker:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": true,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "lateMessageRejectionStartDateTime": null,
    "stream": "xxxxxxxxxx",
    "useEarliestSequenceNumber": true
  },
  "context": null,
  "suspended": false
}

This is happening in production and we have a very unhappy customer.

I’m not sure why that happened, since we did not change any configuration. I hope it means the data is only not loaded, as opposed to deleted, but I’m way out of my experience here.

Please help me find the configuration necessary for this behaviour, and change it. If possible, and the data is still there somewhere (just not available), please help me make it available (most data can be re-ingested, but not all).


Things I've tried Nothing so far, since I'm out of my expertise here.
References
Architecture The Druid and Kafka services are running on OpenShift.
Logs

Relates to Apache Druid 0.20.1

Hey Hoss,

From your description, it sounds like for some reason, handoff stopped working properly after the upgrade. Handoff is described here: Design · Apache Druid. I’d start debugging by looking at these areas:

  • Check out logs for the “FAILED” tasks and see if there are any interesting error messages there. This will tell you if there are any problems on the indexing side.
  • Next check that the Coordinator / Historical protocol is working properly. You can check the Coordinator logs for signs that it is doing what it’s supposed to do (you should see “EmitClusterStatsAndMetrics” logged periodically with a summary of activity). You can also check Historical logs to see if they’re able to fetch segments from deep storage.

If you’re lucky, the segments did make it to deep storage and were published (i.e. the indexing side is fine), and there is just some issue with the Coordinator / Historical protocol. Fixing that issue should get all of your data reloaded automatically.

If you’re unlucky, the segments did not make it to deep storage, and so the data is no longer available on the Druid side. If you have a backup of the Kafka data — perhaps a copy in a data lake — then you could restore from that via batch ingestion.

Good luck getting to the bottom of this. When you do figure it out, I’m curious to know what went wrong. Knowing that might help other people or might help us improve the system. Btw, if you need a second opinion on debugging then please let us know and post some of those logs I mentioned.

Finally: this advice is coming a bit late, but, it’s always good to monitor that handoff is working properly and that tasks are not failing. You can do that through queries on the sys.segments and sys.tasks tables. Hopefully you won’t run into a problem like this again, but if you do, monitoring like that should help detect it more quickly.

Thanks for the long reply. We already found the sources of the problem in the meantime:

  • basePersistDirectory in the ingestion spec was somehow stuck to /opt/apache-druid-0.18.1/var/tmp/druid-realtime-persist… which did not exist anymore. We recreated the ingestion spec (without explicitly setting basePersistDirectory), after which it automatically pointed to /opt/apache-druid-0.20.1/var/tmp/druid-realtime-persist…
  • The permissions of the log directory were not set to druid:druid (1000:1000), but something else, resulting in the logs not being written. We fixed that with an initContainer in Kubernetes
    The data for the last 14 days was automatically restored (I assume from Kafka), the rest we are restoring from the original data source now.

Thanks again for your help! We’ll definitely monitor the system more closely next time we update.

1 Like