Druid has too many rows after batch re-process

Hi,

we are using Batch reprocessing to reprocess our data.

Config:

Segment_granularity: day

query_granularity: 15 mins

Raw data volume:

149 Million rows

Dimensions: 60

Metrics: 6 ( includes some hyperunique, and some sums)

I ran a group by to get estimate on number of rows, code:

sqlContext.sql(“Select (cast(at/(1560) as int)(1560)) as at, count() as event_count, “+allDimensions.map(r=>{ (r+” as "+r+" “)}).mkString(”,”)+" from cleaned_events GROUP BY (cast(at/(1560) as int)(15*60)), “+allDimensions.map(r=>{ ®}).mkString(”,"))

``

Basically groups all the rows by all the dimension+ rounds the timestamp to earliest 15th minute. The count on these rows are 3.5 Million.

I would expect number of rows in druid also to be 3.5 million, but I have 29.5 million. Did a agg on count . Am I missing something here or looking at it in wrong direction ?

bump

What does your ingestion spec look like?

sqlContext.sql(“Select (cast(at/(1560) as int)(1560)) as at, count() as event_count, “+allDimensions.map(r=>{ (r+” as "+r+" “)}).mkString(”,”)+" from cleaned_events GROUP BY (cast(at/(1560) as int)(15*60)), “+allDimensions.map(r=>{ ®}).mkString(”,"))

Is 3.5 million rows the total sum of counts for all groupings returned by this query, or just the count for the first 15-minute bucket?

Thanks Jonathan.

Please find attached schema - input_schema.json ( too large to put it in the thread)

Looks something like:

{

“type”: “index_hadoop”,

“spec”: {

“ioConfig”: {

“type”: “hadoop”,

“inputSpec”: {

“type”: “granularity”,

“dataGranularity”: “day”,

“inputPath”: “s3a://some-bucket/folder-path1523286616094”,

“filePattern”: “.*\.json\.gz”,

“pathFormat”: “'event_date='yyyy-MM-dd”

}

},

“dataSchema”: {

“dataSource”: “raw_events”,

“granularitySpec”: {

“type”: “uniform”,

“segmentGranularity”: “day”,

“rollup”: true,

“queryGranularity”: “fifteen_minute”,

“intervals”: [

“2017-12-30/2018-01-01”

]

},

“parser”: {

“type”: “hadoopyString”,

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “at”,

“format”: “posix”

},

“dimensionsSpec”: {

“dimensions”: [

“verb”,

“actor.type”,

“actor.guest”,

]

}

}

},

“metricsSpec”: [

{

“name”: “event_count”,

“type”: “count”

},

{

“fieldName”: “direct_object.price”,

“name”: “sum_direct_object.price”,

“type”: “doubleSum”

},

{

“fieldName”: “direct_object.total_price”,

“name”: “sum_direct_object.total_price”,

“type”: “doubleSum”

},

{

“fieldName”: “actor.id”,

“name”: “approximate_actor_count”,

“type”: “hyperUnique”

},

{

“fieldName”: “direct_object.id”,

“name”: “approximate_direct_object_count”,

“type”: “hyperUnique”

},

{

“fieldName”: “direct_object.seller_id”,

“name”: “approximate_direct_object_seller_count”,

“type”: “hyperUnique”

}

]

},

“tuningConfig”: {

“type”: “hadoop”,

“partitionsSpec”: {

“type”: “hashed”,

“targetPartitionSize”: 5000000

},

“jobProperties”: {

“mapreduce.job.classloader”: “true”,

“fs.s3.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,

“fs.s3n.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,

“fs.s3a.impl”: “org.apache.hadoop.fs.s3a.S3AFileSystem”,

“io.compression.codecs”: “org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec”

}

}

}

}

3.5 million rows are the number of rows after grouping for 1 day ( not the sum of counts )

Raw data volume is 150 Million rows.

input_schema.json (5.22 KB)

Dimensions: 60
Metrics: 6 ( includes some hyperunique, and some sums)

Hm, I think that ingestion spec looks valid.

However, I do see 76 dimensions defined in your ingestion spec, instead of 60. How are the json.gz files in your input bucket generated? Are they always synced with the “cleaned_events” table that you’re running SQL queries on? (Might there be too much data in the S3 input set for some reason, or some other data mismatch compared to what’s in your SQL database?)

I would expect number of rows in druid also to be 3.5 million, but I have 29.5 million. Did a agg on count

To double check, what’s the query you’re running on Druid to check the total number of rows?

Thanks,
Jon

Dimensions: 60
Metrics: 6 ( includes some hyperunique, and some sums)

Hm, I think that ingestion spec looks valid.

However, I do see 76 dimensions defined in your ingestion spec, instead of 60.

We added more dimensions as we started using druid. But the analysis I submitted is on same set of dimensions.

How are the json.gz files in your input bucket generated? Are they always synced with the “cleaned_events” table that you’re running SQL queries on? (Might there be too much data in the S3 input set for some reason, or some other data mismatch compared to what’s in your SQL database?)

They are generated via spark processing from parquet to CSV. I did verify that CSV had same number of rows & also the count distinct ( by those 60/76 dimensions) was also around 3.5 million.

I would expect number of rows in druid also to be 3.5 million, but I have 29.5 million. Did a agg on count

To double check, what’s the query you’re running on Druid to check the total number of rows?

Running a count similar to following:

gaurav@prod-druid-access-1:~$ curl -X POST ‘prod-druid-broker-1:8082/druid/v2?pretty’ -H ‘Content-Type:application/json’ -d ‘{“queryType”:“timeseries”,“dataSource”:“raw_events”,“granularity”:“day”,“descending”:“true”,“aggregations”:[{“type”:“count”,“name”:“sample_name1”}],“intervals”:[“2018-02-01T08:00:00.000/2018-02-07T08:00:00.000”]}’

[ {

“timestamp” : “2018-02-07T00:00:00.000Z”,

“result” : {

“sample_name1” : 11552171

}

}, {

“timestamp” : “2018-02-06T00:00:00.000Z”,

“result” : {

“sample_name1” : 29245307

}

}, {

“timestamp” : “2018-02-05T00:00:00.000Z”,

“result” : {

“sample_name1” : 28503601

}

}, {

“timestamp” : “2018-02-04T00:00:00.000Z”,

“result” : {

“sample_name1” : 30147900

}

}, {

“timestamp” : “2018-02-03T00:00:00.000Z”,

“result” : {

“sample_name1” : 28917910

}

}, {

“timestamp” : “2018-02-02T00:00:00.000Z”,

“result” : {

“sample_name1” : 28656204

}

}, {

“timestamp” : “2018-02-01T00:00:00.000Z”,

“result” : {

“sample_name1” : 17244524

}

} ]

Hi,

I did try pulling data for only one dimension and all metrics and correctly got 2110 rows.

cardinality of dimension is 26 , so 26244 = 2496 rows at the max.

Unsure why when I use a larger dimension set the number of rows blow up.

Thanks

Hm, maybe you’re not getting perfect rollup during ingestion, and so the row count is higher in Druid vs the row count in that SQL group by.

If you run that timeseries query with the full dimension set, with a sum aggregator on some metric instead of a count agg, do you get correct results?

Thanks,

Jon

sorry was out for the week.

Yes the sum aggregation does give out correct results.

Also the hadoop task are expected to have perfect rollup.

from doc:

The Hadoop indexing task always runs with this perfect roll-up mode.

http://druid.io/docs/latest/design/index.html#roll-up-modes