The same segment on realtime node and historical node

Hi,
When I used the realtime node to implement streaming ingestion, I found when the same segment on the realtime node and historical node after running for a while.

so the querying result in these interval will be larger than Real data.

Showing as above picture, “ip-10-5-1-45” is realtime node, “ip-10-5-2-137” is historical node. Could somebody help to explain why…

Here is my deployment.

I create 9 topics in kafka, there are nrt_dashboard_0, nrt_dashboard_1, … ,nrt_dashboard_8.

They have only 1 partition in order to generate the time sequence in one topic,

The data source will ingest to these topics as round-robin method.

There are 3 realtime nodes to consume these topics. one node consume three topics.

Here is the spec in one realtime node.

[
  {
    "dataSchema" : {
      "dataSource" : "nrt_dashboard",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format": "tsv",
          "timestampSpec": {
            "column": "timestamp",
            "format": "iso"
          },
          "columns": [
            "timestamp",
            "pkgname",
            "country",
            "impressions",
            "clicks",
            "installs",
            "revenue"
          ],
          "delimiter": "\t",
          "dimensionsSpec": {
            "dimensions": [
              "pkgname",
              "country"
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec" : [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "longSum",
          "name": "impressions",
          "fieldName": "impressions"
        },
        {
          "type": "longSum",
          "name": "clicks",
          "fieldName": "clicks"
        },
        {
          "type": "longSum",
          "name": "installs",
          "fieldName": "installs"
        },
        {
          "type": "doubleSum",
          "name": "revenue",
          "fieldName": "revenue"
        }
      ],
      "granularitySpec" : {
        "type": "uniform",
        "segmentGranularity": "FIVE_MINUTE",
        "queryGranularity": "MINUTE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "node1:2181,node2:2181,node3:2181/kafka",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-nrt_dashboard-0",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "nrt_dashboard_0"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 50000,
      "intermediatePersistPeriod": "PT5m",
      "windowPeriod": "PT5m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist\/0",
      "rejectionPolicy": {
        "type": "messageTime"
      },
      "shardSpec": {
        "type": "linear",
        "partitionNum": 0
      }
    }
  },
  {
    "dataSchema" : {
      "dataSource" : "nrt_dashboard",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format": "tsv",
          "timestampSpec": {
            "column": "timestamp",
            "format": "iso"
          },
          "columns": [
            "timestamp",
            "pkgname",
            "country",
            "impressions",
            "clicks",
            "installs",
            "revenue"
          ],
          "delimiter": "\t",
          "dimensionsSpec": {
            "dimensions": [
              "pkgname",
              "country"
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec" : [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "longSum",
          "name": "impressions",
          "fieldName": "impressions"
        },
        {
          "type": "longSum",
          "name": "clicks",
          "fieldName": "clicks"
        },
        {
          "type": "longSum",
          "name": "installs",
          "fieldName": "installs"
        },
        {
          "type": "doubleSum",
          "name": "revenue",
          "fieldName": "revenue"
        }
      ],
      "granularitySpec" : {
        "type": "uniform",
        "segmentGranularity": "FIVE_MINUTE",
        "queryGranularity": "MINUTE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "node1:2181,node2:2181,node3:2181/kafka",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-nrt_dashboard-1",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "nrt_dashboard_1"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 50000,
      "intermediatePersistPeriod": "PT5m",
      "windowPeriod": "PT5m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist\/1",
      "rejectionPolicy": {
        "type": "messageTime"
      },
      "shardSpec": {
        "type": "linear",
        "partitionNum": 1
      }
    }
  },
  {
    "dataSchema" : {
      "dataSource" : "nrt_dashboard",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format": "tsv",
          "timestampSpec": {
            "column": "timestamp",
            "format": "iso"
          },
          "columns": [
            "timestamp",
            "pkgname",
            "country",
            "impressions",
            "clicks",
            "installs",
            "revenue"
          ],
          "delimiter": "\t",
          "dimensionsSpec": {
            "dimensions": [
              "pkgname",
              "country"
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec" : [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "longSum",
          "name": "impressions",
          "fieldName": "impressions"
        },
        {
          "type": "longSum",
          "name": "clicks",
          "fieldName": "clicks"
        },
        {
          "type": "longSum",
          "name": "installs",
          "fieldName": "installs"
        },
        {
          "type": "doubleSum",
          "name": "revenue",
          "fieldName": "revenue"
        }
      ],
      "granularitySpec" : {
        "type": "uniform",
        "segmentGranularity": "FIVE_MINUTE",
        "queryGranularity": "MINUTE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "node1:2181,node2:2181,node3:2181/kafka",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-nrt_dashboard-2",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "nrt_dashboard_2"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 50000,
      "intermediatePersistPeriod": "PT5m",
      "windowPeriod": "PT5m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist\/2",
      "rejectionPolicy": {
        "type": "messageTime"
      },
      "shardSpec": {
        "type": "linear",
        "partitionNum": 2
      }
    }
  }
]

It can work normally for a while, after some time, some segment will be duplicately on realtime node and historical node.

Could somebody help me?

Thanks

Does the segment from the realtime node unannounce after some time? The handoff logic is such that the realtime node segment does not unannounce and get removed until after a copy of the segment exists on historicals. This is to make sure data is always queryable.

Hi,Fangjin:
Yes, it does. I found there are “merged” folder and “isPushedMarker” under these segments’ persistency folder.

But If I stop ingest the data to kafka, after some time, these segments are queryable normally, and only on the historical node.

I found the occupancy rate of CPU is very high, when realtime node process the streaming data. Is it for the realtime node busy and have no time to unannounce the segments, so this segment on realtime node and historical node?

By the way, if I used the “serverTime” reject policy, will the segment on realtime node hand off to load on historical node after “windowPeriod” + “SegmentGranularity”?

在 2015年7月14日星期二 UTC+8下午12:08:54,Fangjin Yang写道:

Inline.

Hi,Fangjin:
Yes, it does. I found there are “merged” folder and “isPushedMarker” under these segments’ persistency folder.

But If I stop ingest the data to kafka, after some time, these segments are queryable normally, and only on the historical node.

I found the occupancy rate of CPU is very high, when realtime node process the streaming data. Is it for the realtime node busy and have no time to unannounce the segments, so this segment on realtime node and historical node?

The realtime node uses Zookeeper to see when a historical node announces a segment it is serving. After it sees that signal, it should unannounce the segment. If this is not happening, some logs from the historical and realtime should help.

By the way, if I used the “serverTime” reject policy, will the segment on realtime node hand off to load on historical node after “windowPeriod” + “SegmentGranularity”?

Yes + some time to merge intermediate chunks and hand data off.