About error "Perfect rollup not supported yet."

Hi,

I am trying to set a secondary partition. For this, documentation say I need to enable perfect roll ups… This is the sample payload I am sending.

{

  "type": "index_parallel",

  "spec": {

          "dataSchema": {

            "dataSource": "ffsdataset",

                "parser": {

                        "type": "string",

                          "parseSpec": {

                             "type" : "jsonLowercase",

                                  "format": "json",

                                  "dimensionsSpec": {

                                        "dimensions": [

                                            "id",

                                              "name"

                                     ]

                                  },

                                 "timestampSpec": {

                                         "column": "date",

                                          "format": "iso"

                            }

                          }

                  },

                 "metricsSpec": [{

                                  "type": "count",

                                   "name": "count"

                    },

                 {

                    "type": "doubleSum",

               "name": "metric",

                  "fieldName": "metric"

            }                              

                    ],

                 "granularitySpec": {

                       "type": "uniform",

                         "segmentGranularity": "day",

                       "queryGranularity": "day",

                         "intervals": ["2001-02-01/2019-03-31"],

                    "rollup": true

             }

          },

         "ioConfig": {

              "type": "index_parallel",

                  "firehose": {

                      "type": "static-s3",

                       "uris": ["s3://file1","s3://file2","s3://file3"]

                   },

                 "appendToExisting": true

           },

         "tuningConfig": {

                  "type": "index_parallel",

                  "targetPartitionSize": 500000000,

                  "maxRowsInMemory": 500000,

                 "forceExtendableShardSpecs": true,

                 "forceGuaranteedRollup": true,

                  "partitionDimensions": ["id"]

      }

  }

}

``

This is the error message -

2019-04-11T15:44:07,740 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Exception while running task[AbstractTask{id='index_parallel_ffsdataset_2019-04-11T15:44:03.049Z', groupId='index_parallel_ffsdataset_2019-04-11T15:44:03.049Z', taskResource=TaskResource{availabilityGroup='index_parallel_ffsdataset_2019-04-11T15:44:03.049Z', requiredCapacity=1}, dataSource='ffsdataset', context={}}]
java.lang.UnsupportedOperationException: Perfect roll-up is not supported yet
        at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.createRunner(ParallelIndexSupervisorTask.java:193) ~[druid-indexing-service-0.14.0-incubating.jar:0.14.0-incubating]
        at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.runParallel(ParallelIndexSupervisorTask.java:292) ~[druid-indexing-service-0.14.0-incubating.jar:0.14.0-incubating]
        at org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask.run(ParallelIndexSupervisorTask.java:251) ~[druid-indexing-service-0.14.0-incubating.jar:0.14.0-incubating]
        at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.14.0-incubating.jar:0.14.0-incubating]
        at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.14.0-incubating.jar:0.14.0-incubating]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_191]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
2019-04-11T15:44:07,742 INFO [main] org.apache.druid.server.initialization.jetty.JettyServerModule - Starting Jetty Server...
2019-04-11T15:44:07,742 INFO [task-runner-0-priority-0] org.apache.druid.indexing.overlord.TaskRunnerUtils - Task [index_parallel_ffsdataset_2019-04-11T15:44:03.049Z] status changed to [FAILED].
2019-04-11T15:44:07,744 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_parallel_ffsdataset_2019-04-11T15:44:03.049Z",
  "status" : "FAILED",
  "duration" : 4,
  "errorMsg" : "java.lang.UnsupportedOperationException: Perfect roll-up is not supported yet"
}

``

Am I missing any specific setting in my config?

Hi Karthik,

Partitioning isn’t yet available in parallel_indexing. This means rollup should be false:

http://druid.io/docs/latest/ingestion/native_tasks.html#parallel-index-task

Best,

Caroline

To clarify - currently, with parallel indexing, you can enable rollup (‘rollup’ can be true in the granularitySpec) but you cannot request perfect rollup (‘forceGuaranteedRollup’ must not be true). Its rollup is best effort, not perfect. If two rows that should roll up happen to be processed by the same task, maybe because they’re in the same input file, then they will be rolled up together. Otherwise, they won’t be. This won’t affect query correctness (they’ll still be aggregated together at query time); it only affects footprint.

Thanks Caroline/Gian.

I will explain my use case so perhaps you have a better suggestion to my problem. I ingest about 10 gigs of compressed data daily to my druid from my data warehouse. I am using s3 firehose to ingest data from S3. say I have 150 s3 files, what I then do is run a parallel index job to split the workload of these 150 files across my 15 workers; each worker on average would run 10 child jobs.

Since the data is partitioned on time, I am making sure that the list of files I am feeding to the overlord (and the data in each zipped is sorted by the data column which is used as timestamp in my spec) so that the segments are created wisely (if I didnt do this, I’d ended up in more segments than I should). I am trying to set another dimension I have as a secondary partition key so that I can fine tune (presumably) druid’s query response times.

If I cannot set and “forceGuaranteedRollup” (
and in turn
“partitionDimensions” ) with parallel jobs, how can I efficiently ingest data and have the secondary partition both?

Hey Karthik,

We do plan to support guaranteed rollup and partition dimensions with index_parallel in the future, but it’s not supported right now. The best workarounds for the time being would be any one of the following (no need to do them all):

  1. Set up your input files to be partitioned by the dimension already. When doing ingestion, Druid will preserve the pre-existing partitioning.

  2. Do initial ingestion with a parallel index job, and then repartition using a non-parallel index task with an ingestSegment input. For 10GB of data, this will take a while, but should be doable.

  3. Use a Hadoop job to repartition the data, since Hadoop based indexing does parallelize and does support a variety of partitioning setups.

Stay tuned for an update later this year that should be introducing the partitioning features to native parallel ingestion!

Gian

Thanks Gian… Option #1 is what we were doing already. I will be on the lookout for the feature (and hopefully in the meantime understand the internals to see if I can contribute)