how to change ingestion time interval by tranquility

hi:
I have a problem when using tranquility to ingesting data, I can only successfully ingested data when i set every message with current time, and it will fail while using the time extracted from the messge, those time may old than current time.

发自 网易邮箱大师

what is your rejection policy for the events ? tranquility will reject the event if timestamp is out of the window period.

This thread might also be interesting: https://groups.google.com/forum/#!searchin/druid-development/windowperiod/druid-development/kHgHTgqKFlQ/fXvtsNxWzlMJ

I use the tranquility scala test code DruidBeamTest.scala with segment granularity == 15, and window period = 15.

When I successfully ingested data to druid with current timestamp, I check the log got this:

{

“type” : “index_realtime”,

  "id" : "index_realtime_test_rtf_2015-10-30T03:30:00.000Z_1_0",

“resource” : {

    "availabilityGroup" : "test_rtf-30-0001",

"requiredCapacity" : 1
  },

“spec” : {

    "dataSchema" : {

  "dataSource" : "test_rtf",
      "parser" : {

    "type" : "map",
        "parseSpec" : {

      "format" : "json",
          "timestampSpec" : {

        "column" : "auc_time",
            "format" : "posix",

        "missingValue" : null
          },

      "dimensionsSpec" : {
            "dimensions" : ["url", "vendor_id" ],

        "dimensionExclusions" : ["auc_time", "imp", "click" ],
            "spatialDimensions" : [ ]

      }
        }

  },
      "metricsSpec" : [ {

    "type" : "count",
        "name" : "pv"

  }, {
        "type" : "longSum",

    "name" : "imp_pv",
        "fieldName" : "imp"

  }, {
        "type" : "longSum",

    "name" : "clicked_pv",
        "fieldName" : "click"

  } ],
      "granularitySpec" : {

    "type" : "uniform",
        "segmentGranularity" : "FIFTEEN_MINUTE",

    "queryGranularity" : {
          "type" : "none"

    },
        "intervals" : null

  }
    },

"ioConfig" : {
      "type" : "realtime",

  "firehose" : {
        "type" : "clipped",

    "delegate" : {
          "type" : "timed",

      "delegate" : {
            "type" : "receiver",

        "serviceName" : "tranquility:firehose:test_rtf-30-0001-0000",
            "bufferSize" : 100000

      },
          "shutoffTime" : "2015-10-30T04:05:00.000Z"

    },
        "interval" : "2015-10-30T03:30:00.000Z/2015-10-30T03:45:00.000Z"

  }
    },

"tuningConfig" : {
      "type" : "realtime",

  "maxRowsInMemory" : 75000,
      "intermediatePersistPeriod" : "PT10M",

  "windowPeriod" : "PT15M",
      "basePersistDirectory" : "/tmp/1446176465043-0",

  "versioningPolicy" : {
        "type" : "intervalStart"

  },
      "rejectionPolicy" : {

    "type" : "none"
      },

  "maxPendingPersists" : 0,
      "shardSpec" : {

    "type" : "linear",
        "partitionNum" : 1

  },
      "indexSpec" : {

    "bitmap" : {
          "type" : "concise"

    },
        "dimensionCompression" : null,

    "metricCompression" : null
      },

  "persistInHeap" : false,
      "ingestOffheap" : false,

  "aggregationBufferRatio" : 0.5,
      "bufferSize" : 134217728

}
  },

“groupId” : “index_realtime_test_rtf”,

  "dataSource" : "test_rtf"

}

Also, I am curious about the “ioConfig” : {“interval” : “2015-10-30T03:30:00.000Z/2015-10-30T03:45:00.000Z”} how does this happened, and how to set rejection policy?

Second question:

In tranquility test code:

case class SimpleEvent(ts: DateTime, fields: Dict)

{

@JsonValue

def toMap = fields ++ Map(TimeColumn -> (ts.millis / 1000))

}

What does Map(TimeColumn -> (ts.millis / 1000)) do?

If I already have log timestamp in fields, do I still need add this extra Map(TimeColumn -> (ts.millis / 1000))?

Third question:

This job only contain several logs to ingest, but it took me to long to finish the job, I don’t think the hardware is not strong enough, since I ingest the wikipedia sample data very quickly. What is the reason of that?

  1. In tranquility the clipping interval is just based on your segmentGranularity. The shutoffTime is based on your windowPeriod.

  2. The “Map(TimeColumn -> (ts.millis / 1000))” business is adding a timestamp to your data. If you do already have a timestamp field in your data, then you don’t need to add one in the tranquility code.

  3. This is tough to answer without more info- do you have any idea what part is slow? Running a profiler or looking at logs to see what parts are slow might help.