Druid 0.12.3 | appendToExisting flag does not work

Hello,

I have a problem with indexing a Google BigQuery Dataset.

I have a job that index data 6 times per day every day because I have 6 file sources per day. The aim of this job is to append data of the day to the existing druid datasource.

I have a segment per day.
The strange problème is, for example, when I submit a second task, I don’t see the flag appendToExisting : true into the payload on the coordinator console (see payload).

And the result is that druid create a second shard in the same segment day and after a moment the second shard override the first and the segment contains only 1 shard with indexinf-g data of the second file source.

So after 6 indexing tasks, I find 1 shard in my segment day with only the last data indexed (of the last file indexed) and not an appended segment.

Anyone have an an idea please?

Spec file sent to the indexer:

{
  "type" : "index_hadoop",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "{{ druid_datasource_name }}",
      "parser" : {
        "type" : "parquet",
        "parseSpec" : {
          "format": "timeAndDims",
          "timestampSpec": {
            "column": "date",
            "format": "yyyyMMdd"
          },
          "dimensionsSpec" : {
            "dimensions" : [
              { "name" : "date", "type" : "string" },
              { "name" : "brand", "type" : "string" },
              { "name" : "country", "type" : "string" },
              { "name" : "browser", "type" : "string" },
              { "name" : "device_category", "type" : "string" },
              { "name" : "source", "type" : "string" },
              { "name" : "medium", "type" : "string" }
            ]
          }
        }
      },
      "metricsSpec" : [
        { "name" : "count", "type" : "count" },
        { "type" : "longSum", "name" : "visits", "fieldName" : "visits" },
        { "type" : "longSum", "name" : "new_visits", "fieldName" : "new_visits" },
        { "type" : "longSum", "name" : "page_views", "fieldName" : "page_views" },
        { "type" : "longSum", "name" : "bounces", "fieldName" : "bounces" },
        { "type" : "longSum", "name" : "time_on_site", "fieldName" : "time_on_site" },
        { "type" : "doubleSum", "name" : "transactions", "fieldName" : "transactions" },
        { "type" : "doubleSum", "name" : "transaction_revenue", "fieldName" : "transaction_revenue" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["{{ start_date }}/{{ end_date }}"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      ** "appendToExisting" : true,**
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
        "paths" : "{{ s3_data_path_base }}/{{ source }}/{{ dataset }}/{{ dated_directory_path }}"
      }
    },
    "tuningConfig" : {
      "forceExtendableShardSpecs": true,
      "type" : "hadoop",
      "jobProperties" : {
        "fs.s3.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "fs.s3n.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "fs.s3.awsAccessKeyId" : "{{ s3_access_key }}",
        "fs.s3.awsSecretAccessKey" : "{{ s3_secret_key }}",
        "fs.s3n.awsAccessKeyId" : "{{ s3_access_key }}",
        "fs.s3n.awsSecretAccessKey" : "{{ s3_secret_key }}",
        "mapreduce.job.user.classpath.first": "true"
      }
    }
  }
}

Payload on the coordinator console:

{“task”: “index_hadoop_GA_Visits_2019-05-24T09:24:27.851Z”,“payload”: {“type”: “index_hadoop”,“id”: “index_hadoop_GA_Visits_2019-05-24T09:24:27.851Z”,“spec”: {“dataSchema”: {“dataSource”: “s3_ga_visits”,“parser”: {“type”: “parquet”,“parseSpec”: {“format”: “timeAndDims”,“timestampSpec”: {“column”: “date”,“format”: “yyyyMMdd”},“dimensionsSpec”: {“dimensions”: [{“name”: “date”,“type”: “string”},{“name”: “brand”,“type”: “string”},{“name”: “country”,“type”: “string”},{“name”: “browser”,“type”: “string”},{“name”: “device_category”,“type”: “string”},{“name”: “source”,“type”: “string”},{“name”: “medium”,“type”: “string”}]}}},“metricsSpec”: [{“type”: “count”,“name”: “count”},{“type”: “longSum”,“name”: “visits”,“fieldName”: “visits”,“expression”: null},{“type”: “longSum”,“name”: “new_visits”,“fieldName”: “new_visits”,“expression”: null},{“type”: “longSum”,“name”: “page_views”,“fieldName”: “page_views”,“expression”: null},{“type”: “longSum”,“name”: “bounces”,“fieldName”: “bounces”,“expression”: null},{“type”: “longSum”,“name”: “time_on_site”,“fieldName”: “time_on_site”,“expression”: null},{“type”: “doubleSum”,“name”: “transactions”,“fieldName”: “transactions”,“expression”: null},{“type”: “doubleSum”,“name”: “transaction_revenue”,“fieldName”: “transaction_revenue”,“expression”: null}],“granularitySpec”: {“type”: “uniform”,“segmentGranularity”: “DAY”,“queryGranularity”: {“type”: “none”},“rollup”: false,“intervals”: [“2019-05-22T00:00:00.000Z/2019-05-23T00:00:00.000Z”]},“transformSpec”: {“filter”: null,“transforms”: }},“ioConfig”: {"type": “hadoop”,"inputSpec": {"type": “static”,"inputFormat": “io.druid.data.input.parquet.DruidParquetInputFormat”,"paths": "s3n://bucket/data/google/visits/brand/2019/05/22"},"metadataUpdateSpec": null,"segmentOutputPath": null****},“tuningConfig”: {“type”: “hadoop”,“workingPath”: null,“version”: “2019-05-24T09:24:27.851Z”,“partitionsSpec”: {“type”: “hashed”,“targetPartitionSize”: -1,“maxPartitionSize”: -1,“assumeGrouped”: false,“numShards”: -1,“partitionDimensions”: },“shardSpecs”: {},“indexSpec”: {“bitmap”: {“type”: “concise”},“dimensionCompression”: “lz4”,“metricCompression”: “lz4”,“longEncoding”: “longs”},“maxRowsInMemory”: 75000,“leaveIntermediate”: false,“cleanupOnFailure”: true,“overwriteFiles”: false,“ignoreInvalidRows”: false,“jobProperties”: {“fs.s3.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,“fs.s3n.impl”: “org.apache.hadoop.fs.s3native.NativeS3FileSystem”,“fs.s3.awsAccessKeyId”: “xxxxxxxxxxxxxxxxxxxxxxx”,“fs.s3.awsSecretAccessKey”: “xxxxxxxxxxxxxxxxxxxxxxx”,“fs.s3n.awsAccessKeyId”: “xxxxxxxxxxxxxxxxxxxxxxx”,“fs.s3n.awsSecretAccessKey”: “xxxxxxxxxxxxxxxxxxxxxxx”,“mapreduce.job.user.classpath.first”: “true”},“combineText”: false,“useCombiner”: false,“buildV9Directly”: true,“numBackgroundPersistThreads”: 0,“forceExtendableShardSpecs”: true,“useExplicitVersion”: false,“allowedHadoopPrefix”: },“uniqueId”: “5e8db3aef39a48b6bf5f4957d90ec6a6”},“hadoopDependencyCoordinates”: null,“classpathPrefix”: null,“context”: {},“groupId”: “index_hadoop_GA_Visits_2019-05-24T09:24:27.851Z”,“dataSource”: “s3_ga_visits”,“resource”: {“availabilityGroup”: “index_hadoop_GA_Visits_2019-05-24T09:24:27.851Z”,“requiredCapacity”: 1}}}

Hi,

the hadoop index task doesn’t support ‘appendToExisting’ configuration.

You might want to use ‘multi’ inputSpec instead. Please see http://druid.io/docs/latest/ingestion/update-existing-data.html for details.

Jihoon

Hi Jihoon,

Ok I understand but this is not mentioned in the doc here http://druid.io/docs/latest/tutorials/tutorial-update-data.html

Now the problème I will have is my dataset must be created before submit a ‘multi’ task and Druid don’t create empty dataset like indicies in elasticsearch. I would do this in once like an ‘upsert’.

Thanks for your response :slight_smile: