How Druid would react to retry records with later current timestamp?

I’m just trying to debug how Druid reacts to records that’s being retry. We’re ingesting data from Kinesis Stream. And sometimes the record failed so we would have to retry it again. What’re noticing is that the data is not ingested after the retry.

For example, We have this record with _ts as 2016-06-30T14:09:06+00:00 and it failed to insert into Kinesis so my application retries that record again but when I query that record I see that the count is a bit off. So, I’m just wondering what Druid would react to the record coming later than the current time. It might be something wrong with my code but I just wanted to make sure that the Druid side is fine.

My segment is hourly and windowPeriod is 10 minutes.

Here’s my Tranquility setup. We use Storm with Kinesis.

@Override
public Beam makeBeam(Map map, IMetricsContext iMetricsContext) {
    final String timestampField = (String) this.config.get(this.TIMESTAMP_FIELD);
    String zookeeper = (String) this.config.get(this.ZOOKEEPER);
    String dataSource = (String) this.config.get(this.DATASOURCE);
    int partitionNumber = Integer.parseInt((String) this.config.get(this.PARTITION));
    int replicationNumber = Integer.parseInt((String) this.config.get(this.REPLICATION));

    final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
            new CountAggregatorFactory("count"),
            new HyperUniquesAggregatorFactory("unique", "uID"),
            new HyperUniquesAggregatorFactory("session", "sID"),
            new HyperUniquesAggregatorFactory("pageSession", "pID"),
            new HyperUniquesAggregatorFactory("iunique", "xID"),
            new LongSumAggregatorFactory("audience", "uUq"),
            new LongSumAggregatorFactory("newAudience", "uNw"),
            new DoubleSumAggregatorFactory("valueSum", "_v"),
            new DoubleSumAggregatorFactory("valueSum1", "_v1"),
            new DoubleSumAggregatorFactory("valueSum2", "_v2")
    );

    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zookeeper, new BoundedExponentialBackoffRetry(100, 1000, 5));
    curatorFramework.start();

    DruidSpatialDimension latLng = new DruidSpatialDimension() {
        @Override
        public SpatialDimensionSchema schema() {
            return new SpatialDimensionSchema("latLng", Arrays.asList("lLa", "lLo"));
        }
    };

    DruidSpatialDimension mousePosition = new DruidSpatialDimension() {
        @Override
        public SpatialDimensionSchema schema() {
            return new SpatialDimensionSchema("mousePosition", Arrays.asList("uMx", "uMy"));
        }
    };

    DruidDimensions druidDimensions = DruidDimensions.schemalessWithExclusions(Arrays.asList("uID", "uIP", "pHp", "cTi", "lCi", "xID", "sID", "pID"))
            .withSpatialDimensions(Arrays.asList(latLng, mousePosition));

    Beam<Map<String, Object>> beam = DruidBeams.builder(new Timestamper<Map<String, Object>>() {
        @Override
        public DateTime timestamp(Map<String, Object> theMap) {
            return fmt.parseDateTime(theMap.get(timestampField).toString()).withZone(UTC);

        }
    }).curator(curatorFramework)
            .discoveryPath("/druid/discovery")
            .location(DruidLocation.create("druid/overlord", dataSource))
            .rollup(DruidRollup.create(druidDimensions, aggregators, MINUTE))
            .timestampSpec(new TimestampSpec(timestampField, "auto", new DateTime()))
            .tuning(ClusteredBeamTuning.create(HOUR, new Period("PT10M"), new Period("PT10M"), partitionNumber, replicationNumber)).buildBeam();

    LOG.info("Finished making Druid Bolt");
    return beam;
}

And I use longSum as suggested in http://druid.io/docs/latest/ingestion/faq.html

{

“queryType”: “topN”,

“dataSource”: “firehose-web”,

“dimension”: “_o”,

“threshold”: 10,

“metric”: “count”,

“granularity”: “all”,

“filter”: {

“type”: “and”,

“fields”: [{

“type”: “selector”,

“dimension”: “_o”,

“value”: “integrity_test”

}]

},

“aggregations”: [{

“type”: “longSum”,

“name”: “count”,

“fieldName”: “count”

}],

“intervals”: [“2016-06-30T17:29:00/2016-06-30T17:31:59”]

}

I also check the ingest/metric and I don’t see anything thrownAway. It shows 0

Hello Toy,

I am trying to figure out a way to send data from Kinesis data stream to druid via tranquility. It looks like you have successfully set up the tranquility-kinesis link. Any pointers here would be helpful.

Thanks,

Rahul