Number of records resetting in Druid

I am ingesting the data to druid using Kafka loader. When I checked the records(select count(*)) it has 1000 records. After some time when I check the number of records reduced to 50. Why druid is auto deleting the records? I am using the following YAML file to create the druid cluster in Kubernetes.

apiVersion: druid.apache.org/v1alpha1
kind: Druid
metadata:
  name: cluster
spec:
  commonConfigMountPath: /opt/druid/conf/druid/cluster/_common
  rollingDeploy: true
  image: "apache/druid:0.19.0"
  startScript: /druid.sh
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
      <Appenders>
        <Console name="console" target="SYSTEM_OUT">
          <PatternLayout pattern="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n" />
        </Console>
      </Appenders>
      <Loggers>
        <Root level="info">
          <AppenderRef ref="console"/>
        </Root>
        <Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="INFO">
            <AppenderRef ref="console"/>
        </Logger>
      </Loggers>
    </Configuration>
  jvm.options: |-
    -server
    -XX:+UseG1GC
    -Xloggc:gc-%t-%p.log
    -XX:+UseGCLogFileRotation
    -XX:GCLogFileSize=100M
    -XX:NumberOfGCLogFiles=10
    -XX:+HeapDumpOnOutOfMemoryError
    -XX:HeapDumpPath=/opt/druid/var/
    -verbose:gc
    -XX:+PrintGCDetails
    -XX:+PrintGCTimeStamps
    -XX:+PrintGCDateStamps
    -XX:+PrintGCApplicationStoppedTime
    -XX:+PrintGCApplicationConcurrentTime
    -XX:+PrintAdaptiveSizePolicy
    -XX:+PrintReferenceGC
    -XX:+PrintFlagsFinal
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Dorg.jboss.logging.provider=slf4j
    -Dnet.spy.log.LoggerImpl=net.spy.memcached.compat.log.SLF4JLogger
    -Dlog4j.shutdownCallbackRegistry=org.apache.druid.common.config.Log4jShutdown
    -Dlog4j.shutdownHookEnabled=true
    -Dcom.sun.management.jmxremote.authenticate=false
    -Dcom.sun.management.jmxremote.ssl=false
  common.runtime.properties: |
    # Zookeeper
    druid.zk.service.host=tiny-cluster-zk-0.tiny-cluster-zk
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false
    ###############################################
    # service names for coordinator and overlord
    ###############################################
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator
    ##################################################
    # Request logging, monitoring, and segment
    ##################################################
    druid.request.logging.type=slf4j
    druid.request.logging.feed=requests
    ##################################################
    # Monitoring ( enable when using prometheus )
    #################################################
    
    ################################################
    # Extensions
    ################################################
    druid.extensions.directory=/opt/druid/extensions
    druid.extensions.loadList=["druid-s3-extensions","druid-kafka-indexing-service","druid-datasketches","postgresql-metadata-storage", "druid-avro-extensions"]
    druid.query.segmentMetadata.defaultAnalysisTypes=["cardinality","size","interval","minmax"]
    ####################################################
    # Enable sql
    ####################################################
    druid.sql.enable=true
  # deepStorage:
  #   spec:
  #     properties: |-
  #       druid.storage.type=s3
  #       druid.storage.bucket=
  #       druid.s3.accessKey=
  #       druid.s3.secretKey=  # use env to and k8s secret, if possible use vault.
  #       druid.storage.baseKey=druid/segments
  #       druid.indexer.logs.directory=data/logs/
  #   type: default
  # metadataStore:
  #   spec:
  #     properties: |-
  #       druid.metadata.storage.type=postgresql
  #       druid.metadata.storage.connector.connectURI=jdbc:postgresql://postgres.druid.svc.cluster.local:5432/druiddb
  #       druid.metadata.storage.connector.password=admin123
  #       druid.metadata.storage.connector.createTables=true
  #   type: default
  # zookeeper:
  #   spec:
  #     properties: |-
  #       druid.zk.service.host=druid-zookeeper-client.druid.svc.cluster.local
  #       druid.zk.paths.base=/druid
  #   type: default
  nodes:
    brokers: 
      kind: Deployment
      maxSurge: 2
      maxUnavailable: 0
      druid.port: 8080
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/query/broker
      nodeType: broker
      env:
        - name: DRUID_XMS
          value: 12000m
        - name: DRUID_XMX
          value: 12000m
        - name: DRUID_MAXDIRECTMEMORYSIZE
          value: 12g
        - name: AWS_REGION
          value: ap-southeast-1
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 3
      resources:
        limits:
          cpu: 10
          memory: 15Gi
        requests:
          cpu: 10
          memory: 15Gi
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 10
        failureThreshold: 30
        httpGet:
          path: /druid/broker/v1/readiness
          port: 8080
      runtime.properties: |
         druid.service=druid/broker
         druid.log4j2.sourceCategory=druid/broker
         druid.broker.http.numConnections=500
         # Processing threads and buffers
         druid.processing.buffer.sizeBytes=268435456
         druid.processing.numMergeBuffers=2
         druid.processing.numThreads=11
    coordinators:
      druid.port: 8080
      kind: Deployment
      maxSurge: 2
      maxUnavailable: 0
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/master/coordinator-overlord
      nodeType: coordinator
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 1
      resources:
        limits:
          cpu: 6
          memory: 6Gi
        requests:
          cpu: 6
          memory: 6Gi
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      env:
        - name: DRUID_XMS
          value: 2g 
        - name: DRUID_XMX
          value: 2g
        - name: AWS_REGION
          value: ap-southeast-1
      runtime.properties: |
          druid.service=druid/coordinator
          druid.log4j2.sourceCategory=druid/coordinator
          druid.indexer.runner.type=httpRemote
          druid.indexer.queue.startDelay=PT5S
          druid.coordinator.balancer.strategy=cachingCost
          druid.serverview.type=http
          druid.indexer.storage.type=metadata
          druid.coordinator.startDelay=PT10S
          druid.coordinator.period=PT5S
          druid.server.http.numThreads=5000
          druid.coordinator.asOverlord.enabled=true
          druid.coordinator.asOverlord.overlordService=druid/overlord
    historical:
      druid.port: 8080
      kind: StatefulSet
      nodeType: historical
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/historical
      replicas: 1
      livenessProbe:
        initialDelaySeconds: 1800
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        httpGet:
          path: /druid/historical/v1/readiness
          port: 8080
        periodSeconds: 10
        failureThreshold: 18
      # startupProbe:
      #   httpGet:
      #     path: /druid/historical/v1/readiness
      #     port: 8080
      #   periodSeconds: 10
      #   failureThreshold: 60
      resources:
        limits:
          cpu: 3
          memory: 5Gi
        requests:
          cpu: 1
          memory: 2Gi
      env:
        - name: DRUID_XMS
          value: 1500m
        - name: DRUID_XMX
          value: 1500m 
        - name: DRUID_MAXDIRECTMEMORYSIZE
          value: 2g
        - name: AWS_REGION
          value: ap-southeast-1
      runtime.properties: |
        druid.service=druid/historical
        druid.log4j2.sourceCategory=druid/historical
        # HTTP server threads
        druid.server.http.numThreads=10
        # Processing threads and buffers
        druid.processing.buffer.sizeBytes=400000000
        druid.processing.numMergeBuffers=2
        druid.processing.numThreads=2
        druid.processing.tmpDir=var/druid/processing
        # Segment storage 
        druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize":40000000000}]
        druid.server.maxSize=40000000000
        # Query cache
        druid.historical.cache.useCache=true
        druid.historical.cache.populateCache=true
        druid.cache.type=caffeine
        druid.cache.sizeInBytes=256000000
      volumeClaimTemplates:
        -
          metadata:
            name: historical-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 50Gi
            # storageClassName: kops-ssd-1-17
      volumeMounts:
        -
          mountPath: var/druid
          name: historical-volume

    middlemanagers:
      druid.port: 8080
      kind: StatefulSet
      nodeType: middleManager
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      env:
        - name: DRUID_XMX
          value: 4096m
        - name: DRUID_XMS
          value: 4096m
        - name: AWS_REGION
          value: ap-southeast-1
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 2
      resources:
        limits:
          cpu: 8
          memory: 8Gi
        requests:
          cpu: 8
          memory: 8Gi
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      runtime.properties: |
          druid.service=druid/middleManager
          druid.worker.capacity=4
          druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
          druid.indexer.task.baseTaskDir=var/druid/task
          # HTTP server threads
          druid.server.http.numThreads=500
          # Processing threads and buffers on Peons
          druid.indexer.fork.property.druid.processing.numMergeBuffers=2
          druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
          druid.indexer.fork.property.druid.processing.numThreads=2
      volumeClaimTemplates:
        -
          metadata:
            name: data-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 15Gi
            # storageClassName: kops-ssd-1-17
      volumeMounts:
        -
          mountPath: /var/druid
          name: data-volume

    routers:
      kind: Deployment
      maxSurge: 2
      maxUnavailable: 0
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      druid.port: 8080
      env:
        - name: AWS_REGION
          value: ap-southeast-1
        - name: DRUID_XMX
          value: 1024m
        - name: DRUID_XMS
          value: 1024m
      resources:
        limits:
          cpu: 10
          memory: 8Gi
        requests:
          cpu: 10
          memory: 4Gi
      nodeType: router
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/query/router
      replicas: 1
      runtime.properties: |
          druid.service=druid/router
          druid.log4j2.sourceCategory=druid/router
          # HTTP proxy
          druid.router.http.numConnections=5000
          druid.router.http.readTimeout=PT5M
          druid.router.http.numMaxThreads=1000
          druid.server.http.numThreads=1000
          # Service discovery
          druid.router.defaultBrokerServiceName=druid/broker
          druid.router.coordinatorServiceName=druid/coordinator
          druid.router.managementProxy.enabled=true
      services:
        -
          metadata:
            name: router-%s-service
          spec:
            ports:
              -
                name: router-port
                port: 8080
            type: ClusterIP

Hey @Nithin_K ,

Any chance your kafka loader is producing data with future dates and your segments do not have future dates flag turned on? (found in Retention column of DataSources tab)

There was a similar issue recently reported in the Druid Slack.
They were doing real-time ingestion as well and would “lose” records after some time. It turned out to be a problem in the configuration of deep storage, which had been erroneously set to “local”.
Real-time tasks would produce query results while the ingestion occurred and later when the segments were “published”, they were only published on the local MM storage and therefore not visible to the historicals.

I’m not familiar with the druid operator which you seem to be using for this deployment, but the deep storage configuration is commented out:

So I suspect this is the issue.

Hey @Nithin_K JIK this is helpful, at what time is the COUNT with 50? If it is after a task has ended, that would imply that the records were being returned by the task, and because it is not available (it should show as FAILED because the hand-off didn’t succeed) they will be gone.

Note that – as with all good consumers – the topic offset is retained in Druid and updated only when the hand-off succeeds – so restarting the task will take it back to where it stopped last time (unless that offset is no longer available, of course).

Another thing I thought maybe was that you could have rollup turned on, and that it is summarising data? Do you have rollup on?