Native Batch Indexing with static-s3 firehose (tasks success, but no segments got loaded)

Hi,

I am trying to make the native batch indexing to work with static-s3 firehose.

Here is how I posted the indexing tasks:

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d @firehose.json http://localhost:8090/druid/indexer/v1/task

Here is my ingestion spec json (i.e. firehose.json):

{

“type”: “index_parallel”,

“spec”: {

“dataSchema”: {

“dataSource”: “batch_event_stats”,

“metricsSpec”: [{

“type”: “count”,

“name”: “count”

}],

“granularitySpec”: {

“segmentGranularity”: “hour”,

“queryGranularity”: “hour”

},

“parser”: {

“parseSpec”: {

“format”: “json”,

“timestampSpec”: {

“column”: “eventTime”, # 10 digits

“format”: “posix”

},

“dimensionsSpec”: {

“dimensions”: [“eventName”, “carrierId”, “eventType”, “tapType”, “fieldName”, “fieldTitle”, “screenName”, “fieldAction”, “source”, “storeName”, “project”, “success”]

}

}

}

},

“ioConfig”: {

“type”: “index_parallel”,

“firehose”: {

“type”: “static-s3”,

“prefixes”: [“s3://blahblah”]

}

},

“tuningconfig”: {

“type”: “index_parallel”,

“maxNumSubTasks”: 2

}

}

``

The overlord indexing tasks I am seeing success for the task:

Enter code here…{“task”:“index_sub_batch_event_stats_2019-04-15T21:58:28.087Z”,“status”:{“id”:“index_sub_batch_event_stats_2019-04-15T21:58:28.087Z”,“type”:“index_sub”,“createdTime”:“2019-04-15T21:58:28.091Z”,“queueInsertionTime”:“1970-01-01T00:00:00.000Z”,“statusCode”:“SUCCESS”,“status”:“SUCCESS”,“runnerStatusCode”:“WAITING”,“duration”:12880,“location”:{“host”:null,“port”:-1,“tlsPort”:-1},“dataSource”:“batch_event_stats”,“errorMsg”:null}}

``

Disregard the good sign on the indexing log, there is no segment got ingested into the datasource. What should I check? Thanks.

Hi,

I guess it may be failing to read any input data. Do you see any log starting with "Published segments: " in the subtask logs? If the published segments is empty, then it didn’t read any data.

Jihoon

Thanks for your input, Jihoon.

Here is an excerpt from the sub task log file about publishing segments. Looks like it got dropped.

Enter code here…2019-04-15T22:36:02,414 INFO [publish-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Dropping segments[]

``

I am also attaching the log on the reply here.

subtask.log (83.6 KB)

Ah, sorry. In the subtask log, it should start with "Pushing segments: ".
And here it is.

2019-04-15T22:36:02,351 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing segments in background:

This means, there was nothing to read for the subtask, so it created no segments. Would you please double check that your input data exists and the prefix in ioConfig is valid?

Jihoon

I found what the problem was, indexing does read the input file. However, the input data is not structured to include the eventTime (timestamp spec column) on the most upper level. It’s nested inside of the event key. I’ll need to fix this first.

I have had this before and can confirm fixing the time and the right format will fix this. The logs not picking the segments to index gives it away.

After fixing the timespec, I my batch ingestion got futher but still cannot persist into the deepstorage. Looking at the log, I guess the events got read, but it got sinked from some realtime.appenderator impl. Please help me to understand, I am starting a native batch task, I thought it’s not real time and it won’t get dropped even the timestamp is not within the realtime window. Does the message in the log means the segment is outside of realtime window so it will be dropped? and how do I workaround this? Thanks!

Here is what I found in the indexing log.


2019-04-16T21:52:09,444 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - New segment[batch_event_stats_2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z_2019-04-16T21:38:54.004Z_3] for row[MapBasedInputRow{timestamp=2019-04-04T01:10:50.000Z, event={eventName=AAAONBOARDING_ScreenEvent, eventTime=1554340250, scope=AAAONBOARDING, carrier=AAA, api_key=CaFcHfMhPkSpUrWuZw3z6B8DbGdJfNjQmSqVsXv2, source=onboarding}, dimensions=[eventName, scope, carrier, api_key, source]}] sequenceName[index_sub_batch_event_stats_2019-04-16T21:39:06.176Z].

2019-04-16T21:52:09,492 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing segments in background: [batch_event_stats_2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z_2019-04-16T21:38:54.004Z_3]

......

2019-04-16T21:52:09,995 INFO [publish-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Dropping segments[[DataSegment{size=2203, shardSpec=NumberedShardSpec{partitionNum=3, partitions=0}, metrics=[count], dimensions=[eventName, scope, carrier, api_key, source], version=‘2019-04-16T21:38:54.004Z’, loadSpec={type=>s3_zip, bucket=>ccapp-druid-qa, key=>druid/segments/batch_event_stats/2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z/2019-04-16T21:38:54.004Z/3/index.zip, S3Schema=>s3n}, interval=2019-04-04T01:00:00.000Z/2019-04-04T02:00:00.000Z, dataSource=‘batch_event_stats’, binaryVersion=‘9’}]]
2019-04-16T21:52:10,006 INFO [appenderator_persist_0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Removing commit metadata for segment[batch_event_stats_2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z_2019-04-16T21:38:54.004Z_3].
2019-04-16T21:52:10,006 INFO [appenderator_persist_0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Removing sink for segment[batch_event_stats_2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z_2019-04-16T21:38:54.004Z_3].
2019-04-16T21:52:10,008 INFO [appenderator_persist_0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Deleting Index File[var/druid/task/index_sub_batch_event_stats_2019-04-16T21:39:06.176Z/work/persist/batch_event_stats_2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z_2019-04-16T21:38:54.004Z_3]
2019-04-16T21:52:10,021 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTask - Pushed segments[[DataSegment{size=2203, shardSpec=NumberedShardSpec{partitionNum=3, partitions=0}, metrics=[count], dimensions=[eventName, scope, carrier, api_key, source], version=‘2019-04-16T21:38:54.004Z’, loadSpec={type=>s3_zip, bucket=>ccapp-druid-qa, key=>druid/segments/batch_event_stats/2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z/2019-04-16T21:38:54.004Z/3/index.zip, S3Schema=>s3n}, interval=2019-04-04T01:00:00.000Z/2019-04-04T02:00:00.000Z, dataSource=‘batch_event_stats’, binaryVersion=‘9’}]]
2019-04-16T21:52:10,023 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down…
2019-04-16T21:52:10,072 INFO [task-runner-0-priority-0] org.apache.druid.indexing.overlord.TaskRunnerUtils - Task [index_sub_batch_event_stats_2019-04-16T21:39:06.176Z] status changed to [SUCCESS].
2019-04-16T21:52:10,074 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
“id” : “index_sub_batch_event_stats_2019-04-16T21:39:06.176Z”,
“status” : “SUCCESS”,
“duration” : 2125,
“errorMsg” : null
}


``

with the ingestion spec:

``

“type”: “index_parallel”,
“spec”: {
“dataSchema”: {
“dataSource”: “batch_event_stats”,
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
}
],
“granularitySpec”: {
“segmentGranularity”: “hour”,
“queryGranularity”: “hour”,
“intervals”: [“2019-04-01/2019-05-01”]
},
“parser”: {
“parseSpec”: {
“format” : “json”,
“flattenSpec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”: “jq”,
“name”: “eventName”,
“expr”: “.payload.events[0].eventName”
},
{
“type”: “jq”,
“name”: “eventTime”,
“expr”: “.payload.events[0].eventTime”
}
]
},
“timestampSpec”: {
“column”: “eventTime”,
“format”: “posix”
},
“dimensionsSpec”: {
“dimensions”:
}
}
}
},
“ioConfig”: {
“type”: “index_parallel”,
“firehose”: {
“type”: “static-s3”,
“prefixes”: [“s3://ccapp-druid-raw-qa”]
},“appendToExisting”: false
},
“tuningconfig”: {
“type”: “index_parallel”,
“maxNumSubTasks”: 2
}
}
}

Hey,

regarding Appenderator, I’m also sad about its package name. It was designed for stream ingestion first, but now it’s being used for batch ingestion as well.

The subtask log you posted looks find to me.

Do you see any errors or exceptions in the supervisor task log?

Also note that it may take some time (usually a few mins) for the new dataSource to appear in the coordinator console.

The supervisor task log shows all succeed. (see below). I according to the subtask log, they got dropped. I waited for a long time the new ingested data did not show up on coordinator nor the metadata store as segments. (Metadata did not have much info anyways except for list of tasks being processed on what datasource and what datasources are in the druid.)

I will poke around to find out what causes segments to drop and will post finding if I found any. Please let me know if you have other suggestions at the same time. Thank you!

Enter code here…2019-04-16T23:41:10,271 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner - Waiting for subTasks to be completed

``

2019-04-16T23:41:22,831 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [1/382] tasks succeeded
2019-04-16T23:41:24,431 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [2/382] tasks succeeded
2019-04-16T23:41:34,481 INFO [qtp1572066684-79] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_parallel_batch_event_stats_2019-04-16T23:40:56.697Z]: LockListAction{}
2019-04-16T23:41:34,487 INFO [qtp1572066684-79] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_parallel_batch_event_stats_2019-04-16T23:40:56.697Z] to overlord: [LockListAction{}].
2019-04-16T23:41:35,920 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [3/382] tasks succeeded
2019-04-16T23:41:37,308 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [4/382] tasks succeeded
2019-04-16T23:41:44,940 INFO [qtp1572066684-78] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_parallel_batch_event_stats_2019-04-16T23:40:56.697Z]: LockListAction{}
2019-04-16T23:41:44,941 INFO [qtp1572066684-78] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_parallel_batch_event_stats_2019-04-16T23:40:56.697Z] to overlord: [LockListAction{}].
2019-04-16T23:41:45,724 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [5/382] tasks succeeded
2019-04-16T23:41:47,043 INFO [task-monitor-0] org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor - [6/382] tasks succeeded

2019-04-16T21:52:09,995 INFO [publish-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Dropping segments[[DataSegment{size=2203, shardSpec=NumberedShardSpec{partitionNum=3, partitions=0}, metrics=[count], dimensions=[eventName, scope, carrier, api_key, source], version=‘2019-04-16T21:38:54.004Z’, loadSpec={type=>s3_zip, bucket=>ccapp-druid-qa, key=>druid/segments/batch_event_stats/2019-04-04T01:00:00.000Z_2019-04-04T02:00:00.000Z/2019-04-16T21:38:54.004Z/3/index.zip, S3Schema=>s3n}, interval=2019-04-04T01:00:00.000Z/2019-04-04T02:00:00.000Z, dataSource=‘batch_event_stats’, binaryVersion=‘9’}]]


If you’re worrying about this log, you should be fine. It’s normal to drop segments in the subtask once they are successfully pushed.

Do you see those pushed segments in your deep storage?

Then, maybe they are not being loaded because of your load rule. Have you set any load rule for this dataSource?

I did not set any load rules. And yes, I finally see some data in the deepstorage (s3 bucket). However, these data do not show up on the coordinator and metadata storage.

Hmm, could you post the full task log of the supervisor task?

Here is the log. gz because of the size too large for the post.

supervisor_task.log.gz (34.2 KB)

Hmm, your log file looks truncated.
If it’s too large, do you see any log like “Published [number of published segments] segments”?

Or would you grep “SinglePhaseParallelIndexTaskRunner” in the supervisor log and post the result?

When I checked tonight. The data arrived and finally got published. I allocated 2 parallel threads for this job. It slowly finished. After all finished, I noticed the data files got inserted into deep storage, but they are not available to the coordinator console and dsql until after several hours.

Question: What cause this delay?

That sounds strange.
The way native parallel indexing and assigning segments work is:

  1. The supervisor task spawns subtasks to read input data and generate segments. The generated segments are pushed to deep storage by subtasks.

  2. Once subtasks push generated segments to deep storage, the supervisor task publishes all pushed segments into the metadata store.

  3. The coordinator periodically picks up published segments and assigns them to historicals.

  4. Historicals downloads assigned segments from deep storage and starts serving those segments.

I think each step might take some time if something went wrong.

  1. may take hours if pushing segments is slow.

  2. may take mins if metadata store is slow.

  3. may take mins if the coordinator is slow.

  4. may take hours if downloading segments is slow.

Do you see any of these issues?

I am seeing a bunch of druid_pendingsegments records in the metadata store. Would it be the cause of delay? I have done some new ingestion and seeing partial data. Am going to wait for few hours to see the out come.

batch firehose from s3 seems to be problematic. I am just thinking out loud, have you heard of people actually write a program to traverse thru the s3 and insert those records? with reading each file in s3 will process a post request using:

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d @ingestion_spec.json http://{druid}:8090/druid/indexer/v1/task

``

Yes, some of our customers are using parallel index task to load data from S3.
I don’t think it’s a particular problem of parallel index task if it takes a lot of time for your dataSources to show up.

If it takes several hours, it would be likely a problem of historicals or the coordinator (the steps 3 and 4 in my previous comment).

Do you see any weird logs in historicals or coordinator about assigning segments or downloading segments?

You can also check the load status using the coordinator API (http://druid.io/docs/latest/operations/api-reference.html#segment-loading).

Regareing pendingsegments table, new segment allocation might get slower if the table contains a lot of entries, but this affects only ingestion performance. You may want to set “druid.coordinator.kill.pendingSegments.on” to true (http://druid.io/docs/latest/configuration/index.html#coordinator-operation).

Once the index task is finished, pendingsegments table is not used by historicals or the coordinator.

It’s very useful information, thanks. I am doing several more batch index parallel loading and observing the following:

  1. It’s a slow but I have small amount of data, it takes couple minutes for data to become fully available in coordinator.

  2. I don’t understand why is there always accumulating druid_pendingsegments on the metadata storage. I am seeing the exact same ID on the druid_segments for all ID in the druid_pendingsegments. (i am attaching the csv on the druid_segments and druid_pendingsegments with this post) I would things once segments got published and marked as used, they will be disappering from the pending segment. However, the copy stays at pending_segments if I do batch index parallel loading (real time ingestion does not have the same problem). Here is my batch loading.

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "batch_event_stats_2019_04_18",
      "metricsSpec": [
        {
          "type": "count",
              "name": "count"
            }
        ],
        "granularitySpec": {
          "segmentGranularity": "hour",
          "queryGranularity": "hour",
          "rollup": true
        },
        "parser": {
          "parseSpec": {
            "format" : "json",
            "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              {
                "type": "jq",
                "name": "eventName",
                "expr": ".payload.events[0].eventName"
              },
              {
                "type": "jq",
                "name": "eventTime",
                "expr": ".payload.events[0].eventTime"
              }
            ]
          },
            "timestampSpec": {
              "column": "eventTime",
              "format": "posix"
            },
            "dimensionsSpec": {
              "dimensions": ["carrier", "eventName", "scope", "source"]
            }
          }
        }
    },
    "ioConfig": {
        "type": "index_parallel",
        "firehose": {
          "type": "static-s3",
          "prefixes": ["s3://ccapp-druid-raw-qa/2019/04/18"]
        },"appendToExisting": false
    },
    "tuningconfig": {
        "type": "index_parallel",
        "maxNumSubTasks": 2
    }
  }
}

``

  1. With the above ingestion spec I am also seeing 2 other problems:
  • The data don’t rollup. i.e. The count is always 1, even there could be multiple entry of same time chunk, dimensions.
  • The job only read 1 entry per file in s3. Given there could be multiple json in the file, it tends to only count one.

pending_segment.csv (10.3 KB)

segment.csv (17 KB)