Unable to get Tranquility 0.8.2 to work with Druid 0.9.1.1

We’ve just switched to Druid 0.9.1.1 and we also updated our Tranquility to 0.8.2 in our Storm cluster. The index tasks are created however, we’re unable to get the data. I don’t see any exceptions from the cluster. I’m not sure how to debug the issue. I noticed some of the changes in Tranquility 0.8.2, so I had to update some code. Also, I seems like SpatialDimensionsSchema is deprecated, not sure what should I use instead.

@Override
public Beam makeBeam(Map map, IMetricsContext iMetricsContext) {
    final String timestampField = (String) this.config.get(this.TIMESTAMP_FIELD);
    String zookeeper = (String) this.config.get(this.ZOOKEEPER);
    String dataSource = (String) this.config.get(this.DATASOURCE);
    int partitionNumber = Integer.parseInt((String) this.config.get(this.PARTITION));
    int replicationNumber = Integer.parseInt((String) this.config.get(this.REPLICATION));

    final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
            new CountAggregatorFactory("count"),
            new HyperUniquesAggregatorFactory("unique", "uID"),
            new HyperUniquesAggregatorFactory("session", "sID"),
            new HyperUniquesAggregatorFactory("pageSession", "pID"),
            new HyperUniquesAggregatorFactory("iunique", "xID"),
            new LongSumAggregatorFactory("audience", "uUq"),
            new LongSumAggregatorFactory("newAudience", "uNw"),
            new DoubleSumAggregatorFactory("valueSum", "_v"),
            new DoubleSumAggregatorFactory("valueSum1", "_v1"),
            new DoubleSumAggregatorFactory("valueSum2", "_v2")
    );

    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zookeeper, new BoundedExponentialBackoffRetry(100, 1000, 5));
    curatorFramework.start();

    DruidSpatialDimension latLng = new DruidSpatialDimension() {
        @Override
        public SpatialDimensionSchema schema() {
            return new SpatialDimensionSchema("latLng", Arrays.asList("lLa", "lLo"));
        }
    };

    DruidSpatialDimension mousePosition = new DruidSpatialDimension() {
        @Override
        public SpatialDimensionSchema schema() {
            return new SpatialDimensionSchema("mousePosition", Arrays.asList("uMx", "uMy"));
        }
    };

    DruidDimensions druidDimensions = DruidDimensions.schemalessWithExclusions(Arrays.asList("uID", "uIP", "pHp", "cTi", "lCi", "xID", "sID", "pID"))
            .withSpatialDimensions(Arrays.asList(latLng, mousePosition));

    Beam<Map<String, Object>> beam = DruidBeams.builder(new Timestamper<Map<String, Object>>() {
        @Override
        public DateTime timestamp(Map<String, Object> theMap) {
            return fmt.parseDateTime(theMap.get(timestampField).toString()).withZone(UTC);

        }
    }).curator(curatorFramework)
            .discoveryPath("/druid/discovery")
            .location(DruidLocation.create("druid/overlord", dataSource))
            .rollup(DruidRollup.create(druidDimensions, aggregators, QueryGranularities.NONE))
            .timestampSpec(new TimestampSpec(timestampField, "auto", new DateTime()))
            .tuning(ClusteredBeamTuning.create(HOUR, new Period("PT10M"), new Period("PT10M"), partitionNumber, replicationNumber)).buildBeam();

    LOG.info("Finished making Druid Bolt");
    return beam;
}

``

Hi Noppanit,

Tranquility should return some metrics on the number of events sent to the indexing task and the number of events dropped. I would check this first to confirm the events are actually getting sent. Afterwards, I would look at the logs for the indexing tasks to see if there’s anything strange going on in there. If you post the logs for your overlord + an indexing task that completed without generating any data it should help in diagnosing the issue.

Hi David,

Thanks for your response. I can see that the events are ingested but the schema is all wrong. I noticed that the spec file generated from Tranquility has parser as “map” instead of “string”. Not sure if that causes the problem.

This is the generated spec file

{
“type” : “index_realtime”,
“id” : “index_realtime_sparrow-firehose-web_2016-10-17T20:00:00.000Z_0_0”,
“resource” : {
“availabilityGroup” : “sparrow-firehose-web-2016-10-17T20:00:00.000Z-0000”,
“requiredCapacity” : 1
},
“spec” : {
“dataSchema” : {
“dataSource” : “sparrow-firehose-web”,
“parser” : {
“type” : “map”,
“parseSpec” : {
“format” : “json”,
“timestampSpec” : {
“column” : “_ts”,
“format” : “auto”,
“missingValue” : “2016-10-14T02:19:17.393Z”
},
“dimensionsSpec” : {
“dimensionExclusions” : [ “uID”, “uIP”, “sID”, “lCi”, “pHp”, “cTi”, “xID”, “pID” ],
“spatialDimensions” : [ {
“dimName” : “latLng”,
“dims” : [ “lLa”, “lLo” ]
}, {
“dimName” : “mousePosition”,
“dims” : [ “uMx”, “uMy” ]
} ]
}
}
},
“metricsSpec” : [ {
“type” : “count”,
“name” : “count”
}, {
“type” : “hyperUnique”,
“name” : “unique”,
“fieldName” : “uID”
}, {
“type” : “hyperUnique”,
“name” : “session”,
“fieldName” : “sID”
}, {
“type” : “hyperUnique”,
“name” : “pageSession”,
“fieldName” : “pID”
}, {
“type” : “hyperUnique”,
“name” : “iunique”,
“fieldName” : “xID”
}, {
“type” : “longSum”,
“name” : “audience”,
“fieldName” : “uUq”
}, {
“type” : “longSum”,
“name” : “newAudience”,
“fieldName” : “uNw”
}, {
“type” : “doubleSum”,
“name” : “valueSum”,
“fieldName” : “_v”
}, {
“type” : “doubleSum”,
“name” : “valueSum1”,
“fieldName” : “_v1”
}, {
“type” : “doubleSum”,
“name” : “valueSum2”,
“fieldName” : “_v2”
}],
“granularitySpec” : {
“type” : “uniform”,
“segmentGranularity” : “HOUR”,
“queryGranularity” : {
“type” : “none”
},
“intervals” : null
}
},
“ioConfig” : {
“type” : “realtime”,
“firehose” : {
“type” : “clipped”,
“delegate” : {
“type” : “timed”,
“delegate” : {
“type” : “receiver”,
“serviceName” : “firehose:druid:overlord:sparrow-firehose-web-020-0000-0000”,
“bufferSize” : 100000
},
“shutoffTime” : “2016-10-17T21:15:00.000Z”
},
“interval” : “2016-10-17T20:00:00.000Z/2016-10-17T21:00:00.000Z”
},
“firehoseV2” : null
},
“tuningConfig” : {
“type” : “realtime”,
“maxRowsInMemory” : 75000,
“intermediatePersistPeriod” : “PT10M”,
“windowPeriod” : “PT10M”,
“basePersistDirectory” : “/tmp/1476733801147-0”,
“versioningPolicy” : {
“type” : “intervalStart”
},
“rejectionPolicy” : {
“type” : “none”
},
“maxPendingPersists” : 0,
“shardSpec” : {
“type” : “linear”,
“partitionNum” : 0
},
“indexSpec” : {
“bitmap” : {
“type” : “concise”
},
“dimensionCompression” : null,
“metricCompression” : null
},
“buildV9Directly” : false,
“persistThreadPriority” : 0,
“mergeThreadPriority” : 0,
“reportParseExceptions” : false,
“handoffConditionTimeout” : 0
}
},
“context” : null,
“groupId” : “index_realtime_sparrow-firehose-web”,
“dataSource” : “sparrow-firehose-web”
}

``

The map parser in the realtime task is expected and shouldn’t be causing your problems. What do you mean that the schema is all wrong? What are you expecting and what are you getting?

Hi,

This is the payload we’re sending to Druid

{"_o":“origin”,"_c":“performance”,"_t":“parse-context”,"_ts":“2016-10-19T18:45:08.445Z”,"_v":“171”,“cTy”:“article”,“cCh”:“Fashion”,“cSch”:“Features”,“uIP”:"",“lCo”:“US”,“lRe”:“VA”,“lCi”:“Ashburn”,“lLa”:" “,“lLo”:”"}

``

However, when I tried querying with this query

{
“queryType”: “topN”,
“dataSource”: “sparrow-firehose-web”,
“dimension”: “_o”,
“threshold”: 10,
“metric”: “count”,
“granularity”: “all”,
“filter”: {
“type”: “and”,
“fields”: [{
“type”: “selector”,
“dimension”: “_o”,
“value”: “origin”
}]
},
“aggregations”: [{
“type”: “longSum”,
“name”: “count”,
“fieldName”: “count”
}],
“intervals”: [“2016-10-17T18:14:00/2016-10-20T18:16:59”]
}

``

I get empty result but if I take out the filter

{
“queryType”: “topN”,
“dataSource”: “sparrow-firehose-web”,
“dimension”: “_o”,
“threshold”: 10,
“metric”: “count”,
“granularity”: “all”,
“aggregations”: [{
“type”: “longSum”,
“name”: “count”,
“fieldName”: “count”
}],
“intervals”: [“2016-10-17T18:14:00/2016-10-20T18:16:59”]
}

``

I can see that there’s 300k events in there.

“result”: [
{
“count”: 362037,
“_o”: null
}
]

``

What data do you get back when you issue a select query? Are any of the dimensions / metrics indexed correctly?

Here’s what we have tried so far. Initially, we had this code in our spec file

DruidDimensions druidDimensions = DruidDimensions.schemalessWithExclusions(Arrays.asList(“uID”, “uIP”, “pHp”, “cTi”, “lCi”, “xID”, “sID”, “pID”))
.withSpatialDimensions(Arrays.asList(latLng, mousePosition));

``

So the spec file will have all those fields excluded and all other fields will be indexed as dimensions. But after we upgraded to 0.9.1.1 and Tranquility 0.8.2. We got the problem that no matter what we query we always get null. Here’s what we try putting this to Druid.

{“user”: “pedro”}

``

And when we query using this query

{
“queryType”: “topN”,
“dataSource”: “sparrow-firehose-web”,
“dimension”: “user”,
“threshold”: 10,
“metric”: “count”,
“granularity”: “all”,
“aggregations”: [{
“type”: “longSum”,
“name”: “count”,
“fieldName”: “count”
}],
“intervals”: [“2016-10-17T18:14:00/2016-10-20T18:16:59”]
}

``

This is what we get back.

    {
      "count": 541290,
      "user": null
    }

``

We then tried and specify all the dimensions in our spec file like this.

DruidDimensions druidDimensions = DruidDimensions.specific(Arrays.asList(“page”,“language”,“user”,“unpatrolled”,“newPage”,“robot”,“anonymous”,“namespace”,“continent”,“country”,“region”,“city”));

``

Then we issue the same query and this is what we get back. So it works.

    {
      "count": 10000,
      "user": "pedro"
    }

``

We even tried

DruidDimensions.schemaless();

``

And it didn’t work.

So it seems like we lost scheme-less ability after upgrading to Druid 0.9.1.1 and Tranquility 0.8.2. We use Storm and Tranquility.

Hey Noppanit,

I tracked down what’s going on. Declaring a spatial dimension now counts as specifying a dimension, which prevents the parser from introspecting the dimensions from the fields. See:

https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/impl/DimensionsSpec.java#L93 and
https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/impl/MapInputRowParser.java#L48

Is schemaless ingestion a nice to have or essential for your use case? I filed issue https://github.com/druid-io/druid/issues/3599 to track this.

Thanks for filling the issue on Github. We actually need to have schemaless as a must feature. I’ll follow up on the github issue.