Kafka ingestion - Only running in one task

I am using kafka ingestion pattern to ingest the data to druid cluster. I deployed the druid using the druid operator in Kubernetes. Please find below the YAML file.

# This spec only works on a single node kubernetes cluster(e.g. typical k8s cluster setup for dev using kind/minikube or single node AWS EKS cluster etc)
# as it uses local disk as "deep storage".
#
apiVersion: "druid.apache.org/v1alpha1"
kind: "Druid"
metadata:
  name: tiny-cluster
spec:
  image: apache/druid:0.22.1
  # Optionally specify image for all nodes. Can be specify on nodes also
  # imagePullSecrets:
  # - name: tutu
  startScript: /druid.sh
  podLabels:
    environment: stage
    release: alpha
  podAnnotations:
    dummykey: dummyval
  readinessProbe:
    httpGet:
      path: /status/health
      port: 8088
  securityContext:
    fsGroup: 1000
    runAsUser: 1000
    runAsGroup: 1000
  services:
    - spec:
        type: ClusterIP
        clusterIP: None
  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
  jvm.options: |-
    -server
    -XX:MaxDirectMemorySize=10240g
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Dlog4j.debug
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=/druid/data
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
  common.runtime.properties: |

    # Zookeeper
    druid.zk.service.host=tiny-cluster-zk-0.tiny-cluster-zk
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false

    # Metadata Store
    druid.metadata.storage.type=derby
    druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid/data/derbydb/metadata.db;create=true
    druid.metadata.storage.connector.host=localhost
    druid.metadata.storage.connector.port=1527
    druid.metadata.storage.connector.createTables=true

    # Deep Storage
    druid.storage.type=local
    druid.storage.storageDirectory=/druid/deepstorage

    #
    # Extensions
    #
    druid.extensions.loadList=["druid-kafka-indexing-service"]

    #
    # Service discovery
    #
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    druid.indexer.logs.type=file
    druid.indexer.logs.directory=/druid/data/indexing-logs
    druid.lookup.enableLookupSyncOnStartup=false
  volumeMounts:
    - mountPath: /druid/data
      name: data-volume
    - mountPath: /druid/deepstorage
      name: deepstorage-volume
  volumes:
    - name: data-volume
      emptyDir: {}
    - name: deepstorage-volume
      hostPath:
        path: /tmp/druid/deepstorage
        type: DirectoryOrCreate
  env:
    - name: POD_NAME
      valueFrom:
        fieldRef:
          fieldPath: metadata.name
    - name: POD_NAMESPACE
      valueFrom:
        fieldRef:
          fieldPath: metadata.namespace

  nodes:
    brokers:
      # Optionally specify for running broker as Deployment
      # kind: Deployment
      nodeType: "broker"
      # Optionally specify for broker nodes
      # imagePullSecrets:
      # - name: tutu
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
      replicas: 2
      runtime.properties: |
        druid.service=druid/broker

        # HTTP server threads
        druid.broker.http.numConnections=5
        druid.server.http.numThreads=10

        # Processing threads and buffers
        druid.processing.buffer.sizeBytes=1
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1
        druid.sql.enable=true
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
      hpAutoscaler:
        maxReplicas: 10
        minReplicas: 1
        scaleTargetRef:
           apiVersion: apps/v1
           kind: StatefulSet
           name: druid-tiny-cluster-brokers
        metrics:
         - type: Resource
           resource:
             name: cpu
             target:
               type: Utilization
               averageUtilization: 50

    coordinators:
      # Optionally specify for running coordinator as Deployment
      # kind: Deployment
      nodeType: "coordinator"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
      replicas: 2
      runtime.properties: |
        druid.service=druid/coordinator

        # HTTP server threads
        druid.coordinator.startDelay=PT30S
        druid.coordinator.period=PT30S

        # Configure this coordinator to also run as Overlord
        druid.coordinator.asOverlord.enabled=true
        druid.coordinator.asOverlord.overlordService=druid/overlord
        druid.indexer.queue.startDelay=PT30S
        druid.indexer.runner.type=local
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    historicals:
      nodeType: "historical"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
      replicas: 2
      runtime.properties: |
        druid.service=druid/historical
        druid.server.http.numThreads=5
        druid.processing.buffer.sizeBytes=536870912
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1
        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
        druid.server.maxSize=10737418240
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
          
    routers:
      nodeType: "router"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
      replicas: 2
      runtime.properties: |
        druid.service=druid/router

        # HTTP proxy
        druid.router.http.numConnections=10
        druid.router.http.readTimeout=PT5M
        druid.router.http.numMaxThreads=10
        druid.server.http.numThreads=10

        # Service discovery
        druid.router.defaultBrokerServiceName=druid/broker
        druid.router.coordinatorServiceName=druid/coordinator

        # Management proxy to coordinator / overlord: required for unified web console.
        druid.router.managementProxy.enabled=true       
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

Created 3 tasks for Kafka Supervisors. It’s only running in one task. It’s showing pending in other tasks. Why it’s showing pending?
.

Welcome @Nithin_K! Have you looked at the live reports or task logs? Feel free to share the logs here and we can all have a look.

@Mark_Herrera I checked the log files of all the pods. I am not able to see any error

Could not find middlemanager in nodes tag .
can u also post the screenshot of services tab

Services tab output

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
druid-tiny-cluster-brokers ClusterIP None 8088/TCP 3h38m
druid-tiny-cluster-coordinators ClusterIP None 8088/TCP 3h38m
druid-tiny-cluster-historicals ClusterIP None 8088/TCP 3h38m
druid-tiny-cluster-routers ClusterIP None 8088/TCP 3h38m
tiny-cluster-zk ClusterIP None 2181/TCP,2888/TCP,3888/TCP 3h38m

There is no middle-manager service. Can u add middle-manager in ur yaml.

@TijoThomas Deployed middle manager. PFB the YAML file

# This spec only works on a single node kubernetes cluster(e.g. typical k8s cluster setup for dev using kind/minikube or single node AWS EKS cluster etc)
# as it uses local disk as "deep storage".
#
apiVersion: "druid.apache.org/v1alpha1"
kind: "Druid"
metadata:
  name: tiny-cluster
spec:
  image: apache/druid:0.22.1
  # Optionally specify image for all nodes. Can be specify on nodes also
  # imagePullSecrets:
  # - name: tutu
  startScript: /druid.sh
  podLabels:
    environment: stage
    release: alpha
  podAnnotations:
    dummykey: dummyval
  readinessProbe:
    httpGet:
      path: /status/health
      port: 8088
  securityContext:
    fsGroup: 1000
    runAsUser: 1000
    runAsGroup: 1000
  services:
    - spec:
        type: ClusterIP
        clusterIP: None
  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
  jvm.options: |-
    -server
    -XX:MaxDirectMemorySize=10240g
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Dlog4j.debug
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=/druid/data
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
  common.runtime.properties: |

    # Zookeeper
    druid.zk.service.host=tiny-cluster-zk-0.tiny-cluster-zk
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false

    # Metadata Store
    druid.metadata.storage.type=derby
    druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid/data/derbydb/metadata.db;create=true
    druid.metadata.storage.connector.host=localhost
    druid.metadata.storage.connector.port=1527
    druid.metadata.storage.connector.createTables=true

    # Deep Storage
    druid.storage.type=local
    druid.storage.storageDirectory=/druid/deepstorage
    #
    # Extensions
    #
    druid.extensions.loadList=["druid-kafka-indexing-service"]

    #
    # Service discovery
    #
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    druid.indexer.logs.type=file
    druid.indexer.logs.directory=/druid/data/indexing-logs
    druid.lookup.enableLookupSyncOnStartup=false

  metricDimensions.json: |-
    {
      "query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer"},
      "query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count"},
      "query/node/time" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/bytes" : { "dimensions" : ["server"], "type" : "count"},
      "query/node/backpressure": { "dimensions" : ["server"], "type" : "timer"},
      "query/intervalChunk/time" : { "dimensions" : [], "type" : "timer"},

      "query/segment/time" : { "dimensions" : [], "type" : "timer"},
      "query/wait/time" : { "dimensions" : [], "type" : "timer"},
      "segment/scan/pending" : { "dimensions" : [], "type" : "gauge"},
      "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" },
      "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },

      "query/count" : { "dimensions" : [], "type" : "count" },
      "query/success/count" : { "dimensions" : [], "type" : "count" },
      "query/failed/count" : { "dimensions" : [], "type" : "count" },
      "query/interrupted/count" : { "dimensions" : [], "type" : "count" },
      "query/timeout/count" : { "dimensions" : [], "type" : "count" },

      "query/cache/delta/numEntries" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/sizeBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hits" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/misses" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/evictions" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hitRate" : { "dimensions" : [], "type" : "count", "convertRange" : true },
      "query/cache/delta/averageBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/timeouts" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/errors" : { "dimensions" : [], "type" : "count" },

      "query/cache/total/numEntries" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/sizeBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hits" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/misses" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/evictions" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hitRate" : { "dimensions" : [], "type" : "gauge", "convertRange" : true },
      "query/cache/total/averageBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },

      "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/processed" : { "dimensions" : ["dataSource", "taskType", "taskId"], "type" : "count" },
      "ingest/events/messageGap" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/backPressure" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/persists/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },

      "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/running/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" },

      "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
      "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },

      "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/moved/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/dropped/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/deleted/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unneeded/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unavailable/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/underReplicated/count" : { "dimensions" : ["dataSource", "tier"], "type" : "gauge" },
      "segment/cost/raw" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalization" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalized" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/loadQueue/size" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/failed" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/dropQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/size" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/overShadowed/count" : { "dimensions" : [], "type" : "gauge" },

      "segment/max" : { "dimensions" : [], "type" : "gauge"},
      "segment/used" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },
      "segment/usedPercent" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "convertRange" : true },
      "segment/pendingDelete" : { "dimensions" : [], "type" : "gauge"},

      "jvm/pool/committed" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/init" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/max" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/used" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/bufferpool/count" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/used" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/capacity" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/mem/init" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/max" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/used" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/committed" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/gc/count" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },
      "jvm/gc/cpu" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },

      "ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge"},

      "sys/swap/free" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageOut" : { "dimensions" : [], "type" : "gauge"},
      "sys/disk/write/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/write/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/net/write/size" : { "dimensions" : [], "type" : "count"},
      "sys/net/read/size" : { "dimensions" : [], "type" : "count"},
      "sys/fs/used" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/fs/max" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/mem/used" : { "dimensions" : [], "type" : "gauge"},
      "sys/mem/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/storage/used" : { "dimensions" : ["fsDirName"], "type" : "gauge"},
      "sys/cpu" : { "dimensions" : ["cpuName", "cpuTime"], "type" : "gauge"},

      "coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },

      "jetty/numOpenConnections" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/requests" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/loadTime" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/evictionBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/memcached/total" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "gauge" },
      "query/cache/caffeine/delta/requests" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/loadTime" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/evictionBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/memcached/delta" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "count" }
    }

  volumeMounts:
    - mountPath: /druid/data
      name: data-volume
    - mountPath: /druid/deepstorage
      name: deepstorage-volume
  volumes:
    - name: data-volume
      emptyDir: {}
    - name: deepstorage-volume
      hostPath:
        path: /tmp/druid/deepstorage
        type: DirectoryOrCreate
  env:
    - name: POD_NAME
      valueFrom:
        fieldRef:
          fieldPath: metadata.name
    - name: POD_NAMESPACE
      valueFrom:
        fieldRef:
          fieldPath: metadata.namespace

  nodes:
    brokers:
      # Optionally specify for running broker as Deployment
      # kind: Deployment
      nodeType: "broker"
      # Optionally specify for broker nodes
      # imagePullSecrets:
      # - name: tutu
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
      replicas: 1
      runtime.properties: |
        druid.service=druid/broker
        # HTTP server threads
        druid.broker.http.numConnections=5
        druid.server.http.numThreads=10
        # Processing threads and buffers
        druid.processing.buffer.sizeBytes=1
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1
        druid.sql.enable=true
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    coordinators:
      # Optionally specify for running coordinator as Deployment
      # kind: Deployment
      nodeType: "coordinator"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
      replicas: 1
      runtime.properties: |
        druid.service=druid/coordinator

        # HTTP server threads
        druid.coordinator.startDelay=PT30S
        druid.coordinator.period=PT30S

        # Configure this coordinator to also run as Overlord
        druid.coordinator.asOverlord.enabled=true
        druid.coordinator.asOverlord.overlordService=druid/overlord
        druid.indexer.queue.startDelay=PT30S
        druid.indexer.runner.type=local
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    historicals:
      nodeType: "historical"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
      replicas: 1
      runtime.properties: |
        druid.service=druid/historical
        druid.server.http.numThreads=5
        druid.processing.buffer.sizeBytes=536870912
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1

        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
        druid.server.maxSize=10737418240
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
    middlemanagers:
      druid.port: 8080
      kind: StatefulSet
      nodeType: middleManager
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      env:
        - name: DRUID_XMX
          value: 4096m
        - name: DRUID_XMS
          value: 4096m
        - name: AWS_REGION
          value: ap-southeast-1
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 2
      # resources:
      #   limits:
      #     cpu: 8
      #     memory: 8Gi
      #   requests:
      #     cpu: 8
      #     memory: 8Gi
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      runtime.properties: |
          druid.service=druid/middleManager
          druid.worker.capacity=4
          druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
          druid.indexer.task.baseTaskDir=var/druid/task
          # HTTP server threads
          druid.server.http.numThreads=500
          # Processing threads and buffers on Peons
          druid.indexer.fork.property.druid.processing.numMergeBuffers=2
          druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
          druid.indexer.fork.property.druid.processing.numThreads=2
      volumeClaimTemplates:
        -
          metadata:
            name: data-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 15Gi
      volumeMounts:
        -
          mountPath: /var/druid
          name: data-volume
        
    routers:
      nodeType: "router"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
      replicas: 1
      runtime.properties: |
        druid.service=druid/router

        # HTTP proxy
        druid.router.http.numConnections=10
        druid.router.http.readTimeout=PT5M
        druid.router.http.numMaxThreads=10
        druid.server.http.numThreads=10

        # Service discovery
        druid.router.defaultBrokerServiceName=druid/broker
        druid.router.coordinatorServiceName=druid/coordinator

        # Management proxy to coordinator / overlord: required for unified web console.
        druid.router.managementProxy.enabled=true       
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

Still, the Kafka jobs are in a pending state. Only one job is running

can u also post the screenshot of services tab

@TijoThomas
Please find the services screenshot

I am not sure if this is the case. You dont have enough memory for the MM pod .
you have given 2 gb for xmx and direct also 2 gb but ur pod size is 4 gb. Can u try increasing the pod size .

@Nithin_K apols if this is a silly suggestion (!!!) but would you mind sharing a screenshot of the services tab in Druid? It will show the number of available slots that Druid believes is available on each type of service - as defined by druid.worker.capacity. Streaming ingestion will require one slot for the supervisor, and then additional slots for each of the workers, so it’ll be useful just to see what Druid thinks is happening.