Kafka indexing is fail to Optional.get()

Hi. All…

First. my kafka indexing body is

{
    "type": "index_parallel",
    "spec": {
        "ioConfig": {
            "type": "index_parallel",
            "inputSource": {
                "type": "hdfs",
                "paths": "hdfs://..."
            },
            "inputFormat": {
                "type": "avro_ocf"
            }
        },
        "tuningConfig": {
            "type": "index_parallel",
            "partitionsSpec": {
                "type": "dynamic"
            },
            "logParseExceptions": true,
            "maxRowsPerSegment": 50000000,
            "maxTotalRows": 200000000
        },
        "dataSchema": {
            "timestampSpec": {
                "column": "ymdhm",
                "format": "yyyyMMddHHmm"
            },
            "granularitySpec": {
                "queryGranularity": "minute",
                "rollup": false,
                "segmentGranularity": "hour"
            },
            "dimensionsSpec": {
                "dimensions": [
                    "logVersion",
                    "type",
                    "ymdhm",
                    "ymd",
                    ...
                ],
                "dimensionExclusions": [
                    "ymdt",
                    ...
                ]
            },
            "metricsSpec": [
                {
                    "name": "count",
                    "type": "count"
                },
                ...
            ],
            "dataSource": "shopad.mst.hadoop5"
        }
    }
}

When kafka indexing service is in progress
I keep getting Optional.get() errors on a specific partition.

so, my superviser status is

{
  "dataSource": "shopad.mst.mr5",
  "stream": "shopad-druid",
  "partitions": 10,
  "replicas": 2,
  "durationSeconds": 3600,
  "activeTasks": [
    {
      "id": "index_kafka_shopad.mst.mr5_cf58a1d8b70ff1d_bdlnlhll",
      "startingOffsets": {
        "0": 3460892565
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_cf58a1d8b70ff1d_pljbjnln",
      "startingOffsets": {
        "0": 3460892565
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_4ae443f90981cd2_indgmcdc",
      "startingOffsets": {
        "1": 7620553579
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_4ae443f90981cd2_dcofabfb",
      "startingOffsets": {
        "1": 7620553579
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_48c56d7b4a88df6_jkadcgpk",
      "startingOffsets": {
        "2": 3460419306
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_48c56d7b4a88df6_paenkfli",
      "startingOffsets": {
        "2": 3460419306
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_e5549434beadd7e_ckjjkcjn",
      "startingOffsets": {
        "6": 3460623150
      },
      "startTime": "2022-05-25T09:26:26.708Z",
      "remainingSeconds": 0,
      "type": "ACTIVE",
      "currentOffsets": {
        "6": 3460885756
      },
      "lag": {
        "6": 34491739
      }
    },
    {
      "id": "index_kafka_shopad.mst.mr5_4225acffcf172ad_ficlkpfd",
      "startingOffsets": {
        "7": 3460207390
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {}
    },
    {
      "id": "index_kafka_shopad.mst.mr5_75be032c10b918a_bnononen",
      "startingOffsets": {
        "9": 3459808270
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {
        "9": 3460232572
      },
      "lag": {
        "9": 34427212
      }
    }
  ],
  "publishingTasks": [],
  "latestOffsets": {
    "0": 3495758824,
    "1": 7655379775,
    "2": 3495271347,
    "3": 3495227440,
    "4": 3495015907,
    "5": 3496195562,
    "6": 3495377495,
    "7": 3495052538,
    "8": 3496127016,
    "9": 3494659784
  },
  "minimumLag": {
    "6": 34491739,
    "9": 34427212
  },
  "aggregateLag": 68918951,
  "offsetsLastUpdated": "2022-05-25T13:40:26.933Z",
  "suspended": false,
  "healthy": false,
  "state": "UNHEALTHY_SUPERVISOR",
  "detailedState": "UNHEALTHY_SUPERVISOR",
  "recentErrors": [
    {
      "timestamp": "2022-05-25T13:39:06.956Z",
      "exceptionClass": "java.lang.IllegalStateException",
      "message": "Optional.get() cannot be called on an absent value",
      "streamException": false
    },
    {
      "timestamp": "2022-05-25T13:39:36.956Z",
      "exceptionClass": "java.lang.IllegalStateException",
      "message": "Optional.get() cannot be called on an absent value",
      "streamException": false
    },
    {
      "timestamp": "2022-05-25T13:40:06.957Z",
      "exceptionClass": "java.lang.IllegalStateException",
      "message": "Optional.get() cannot be called on an absent value",
      "streamException": false
    }
  ]
}

and coordinator-overlord.log is

2022-05-25T13:42:36,956 WARN [KafkaSupervisor-shopad.mst.mr5] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception in supervisor run loop for dataSource [shopad.mst.mr5]
java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
        at com.google.common.base.Absent.get(Absent.java:47) ~[guava-16.0.1.jar:?]
        at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.updateTaskStatus(SeekableStreamSupervisor.java:2557) ~[druid-indexing-service-0.22.1.jar:0.22.1]
        at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1268) ~[druid-indexing-service-0.22.1.jar:0.22.1]
        at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$RunNotice.handle(SeekableStreamSupervisor.java:333) ~[druid-indexing-service-0.22.1.jar:0.22.1]
        at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:952) ~[druid-indexing-service-0.22.1.jar:0.22.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_112]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_112]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]

Just look at the log to see what’s wrong.
Can someone please help?

Can you reset your supervisor and see if that resolves the issue?

Yes, the same error repeats even after reset

This error usually occurs when a required value is missing. usually the interval in the granularitySpec. Please explicitly add this and retry. Please also check the formatting in the json is correct.