My Kafka topics sometimes contains bad (corrupted) records. I would like to skip them while ingesting data, becauese now an error appears while ingesting. I have found the maxParseExceptions parameter but it is set to a big number by default (2147483647) and the error still happens.
- I configured Druid to ingest data from kafka with schema_registry successfully. Everything works as expected (records are parsed well and loaded into the DataSource.
- It works well until Druid hits, at some offset, a bad record which doesn’t have valid data because it’s corrupted. If I look into task logs, I can find an error message with an exception saying that the schema id is not found in the schema registry (see logs below). That’s because the first bytes of that record are corrupted and doesn’t contain a correct schema id, but just noise.
- When that happens, the ingestion stops and starts again after some time. When it is restarde, the same happens. it keeps doing that in a loop.
Because of that, I cant load kafka messages after the bad one. I would like to skip all corrupted messages from being ingested and ignore them.
Things I've tried
- I have played with maxParseExceptions with no luck.
References
maxParseExceptions parameter is defined here:
Logs
This is the error I get during ingestion:
2022-03-14T13:46:03,647 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
org.apache.druid.java.util.common.RE: Failed to get Avro schema: 1786271608
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:144) ~[?:?]
at org.apache.druid.data.input.avro.AvroStreamReader.intermediateRowIterator(AvroStreamReader.java:69) ~[?:?]
at org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:44) ~[druid-core-0.22.1.jar:0.22.1]
at org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43) ~[druid-processing-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:78) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:620) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443) [druid-indexing-service-0.22.1.jar:0.22.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276) ~[?:?]
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:140) ~[?:?]
... 15 more
This is the configuration spec for the ingestion:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "my.data.source.name",
"timestampSpec": {
"column": "lastOperationTime",
"format": "millis",
"missingValue": null
},
"dimensionsSpec": {
"dimensions": [
... (ommited) ...
],
"dimensionExclusions": [
"__time",
"lastOperationTime"
]
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": []
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"topic": "my.topic.name",
"inputFormat": {
"type": "avro_stream",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
... (ommited) ...
]
},
"avroBytesDecoder": {
"type": "schema_registry",
"url": "http://schema_registry_host:port",
"capacity": 2147483647,
"urls": null,
"config": null,
"headers": null
},
"binaryAsString": true,
"extractUnionsByType": false
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "bootstrap servers ips and ports"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": true,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "my.topic.name",
"useEarliestSequenceNumber": true,
"autoscalerConfig": null,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"appendableIndexSpec": {
"type": "onheap"
},
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"skipBytesInMemoryOverheadCheck": false,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/opt/druid/var/tmp/druid-realtime-persist349032434046494455",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 10,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}