HTTP Firehose

I a using a HTTP firehose to populate druid from a postgres database (as I can’t used JDBC as it gives an error and I can’t update the production postgres config).

I have written a web server to create the index task json (see below) and to respond to calls to get the data.

The http firehose uris are in the following format http://192.168.130.187:5000/events/csv/2019-03-12 and they return CSV data for 1 day.

I have noticed that my web server gets called multiple times for the same day and can ask for old days that I am not currently indexing. Is this normal?

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "events",
      "parser": {
        "type": "string",
        "parseSpec": {
          "format": "csv",
          "timestampSpec": {
            "column": "event_datetime",
            "format": "auto"
          },
          "hasHeaderRow": true,
          "dimensionsSpec": {
            "dimensions": [
              "organisations",
              "customer_name",
              "customer_icao",
              "customer_type",
              "customer_country",
              "customer_region",
              "fleet_name",
              "fleet_type",
              "tail_number",
              "ac_ident",
              "lfl",
              "aircraft_model",
              "aircraft_series",
              "aircraft_family",
              { "name": "flight_id", "type": "long" },
              "analysis_datetime",
              "takeoff_datetime",
              "landing_datetime",
              "captain_code",
              "first_officer_code",
              "third_pilot_code",
              { "name": "takeoff_fuel", "type": "double" },
              { "name": "landing_fuel", "type": "double" },
              { "name": "takeoff_gross_weight", "type": "double" },
              { "name": "landing_gross_weight", "type": "double" },
              "takeoff_airport_icao",
              "takeoff_airport_iata",
              "takeoff_airport_name",
              "takeoff_runway",
              { "name": "takeoff_elevation", "type": "double" },
              "takeoff_airport_city",
              "takeoff_airport_country",
              { "name": "takeoff_airport_longitude", "type": "double" },
              { "name": "takeoff_airport_latitude", "type": "double" },
              "takeoff_weather",
              { "name": "takeoff_wind_heading", "type": "double" },
              { "name": "takeoff_wind_speed", "type": "double" },
              "landing_airport_icao",
              "landing_airport_iata",
              "landing_airport_name",
              "landing_runway",
              { "name": "landing_elevation", "type": "double" },
              "landing_airport_city",
              "landing_airport_country",
              { "name": "landing_airport_longitude", "type": "double" },
              { "name": "landing_airport_latitude", "type": "double" },
              "landing_weather",
              { "name": "landing_wind_heading", "type": "double" },
              { "name": "landing_wind_speed", "type": "double" },
              "flight_type",
              "flight_status",
              "flight_number",
              { "name": "event_id", "type": "long" },
              "event_code",
              "event_desc",
              "event_category",
              "event_section",
              "event_type",
              "event_status",
              "event_status_changed_by",
              "event_level",
              "event_validity",
              "operator_invalid",
              "event_validity_changed_by",
              "invalidity_reason",
              "kpv_name",
              "threshold_operator",
              { "name": "threshold_value", "type": "double" },
              { "name": "kpv_value", "type": "double" },
              { "name": "event_longitude", "type": "double" },
              { "name": "event_latitude", "type": "double" }
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec": [
        {
          "type": "count",
          "name": "count"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "day",
        "queryGranularity": "none",
        "intervals": [ "2019-03-12/2019-03-02" ],
        "rollup": false
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "firehose": {
        "type": "http",
        "uris": [ "http://192.168.130.187:5000/events/csv/2019-03-12", "http://192.168.130.187:5000/events/csv/2019-03-11", "http://192.168.130.187:5000/events/csv/2019-03-10", "http://192.168.130.187:5000/events/csv/2019-03-09", "http://192.168.130.187:5000/events/csv/2019-03-08", "http://192.168.130.187:5000/events/csv/2019-03-07", "http://192.168.130.187:5000/events/csv/2019-03-06", "http://192.168.130.187:5000/events/csv/2019-03-05", "http://192.168.130.187:5000/events/csv/2019-03-04", "http://192.168.130.187:5000/events/csv/2019-03-03", "http://192.168.130.187:5000/events/csv/2019-03-02", "http://192.168.130.187:5000/events/csv/2019-03-01" ]
      },
      "appendToExisting": false
    },
    "tuningConfig": {
      "type": "index_parallel"
    }
  }
}

Hi,

I think it’s possible if there were some failures while getting the same URI. Do you see any errors in task log?

Jihoon