tranquility kafka unparseable messages

hi,
I am trying to use tranquility kakfa to ingest real time data to druid.

But i am seeing all my messages are unparseable. Please suggest if my ingestion spec needs changes and how to fix it.

[KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {my_stream={receivedCount=22296, sentCount=0, droppedCount=0, unparseableCount=22296}} pending messages in 0ms and committed offsets in 17592ms.

i am using below for my ingestion spec and if you scroll down, you can see format of messages from my kafka stream. i tried both “column” : “StartTime”, and “column” : “server.StartTime”.

Both did not help.

“dataSources” : {

"vrmetricsp" : {

  "spec" : {

    "dataSchema" : {

      "dataSource" : "vrmetricsp",

      "parser" : {

        "type" : "string",

        "parseSpec" : {

          "timestampSpec" : {

            "column" : "requestStartTime",

            "format" : "auto"

          },

          "dimensionsSpec" : {

            "dimensions" : [],

            "dimensionExclusions" : [

              "timestamp",

              "value"

            ]

          },

          "format" : "json"

        }

      },

      "granularitySpec" : {

        "type" : "uniform",

        "segmentGranularity" : "hour",

        "queryGranularity" : "none"

      },

      "metricsSpec" : [

        {

          "type" : "count",

          "name" : "count"

        }

      ]

    },

    "ioConfig" : {

      "type" : "realtime"

    },

this is structure of my messages from my kafka

root

– req: struct (nullable = true)

– method: string (nullable = true)

– server: struct (nullable = true)

– StartTime: long (nullable = true)

Thanks

VR

Hi Folks,
Plz suggest

If your JSON is nested, take a look at: http://druid.io/docs/0.9.1.1/ingestion/flatten-json.html

I’m not sure if the timestampSpec can reference a derived ‘flattened’ column or not (maybe someone who knows can chime in, or you could just try it out). If not, you’ll need to transform your message to make your time column root level.

Thanks, I tried like this and getting null pointer exception , may be something wrong with my spec. Please refer my spec below

also, for your other point" how do i transform my message to make my timestamp column at root level?" please share samples.

2016-11-14 22:25:13,148 [KafkaConsumer-0] INFO c.m.t.kafka.writer.WriterController - Creating EventWriter for topic [targeting.hubble_stream_sams] using dataSource [vrmetricsp]

2016-11-14 22:25:13,158 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.NullPointerException: null

“dataSchema” : {

      "dataSource" : "vrmetricsp",

      "parser" : {

        "type" : "string",

        "parseSpec" : {

          "flattenSpec" : {

              "useFieldDiscovery": true,

              "fields" : [{

                "type" : "path",

                "name" : "timestamp",

                "expr" : "$.server.StartTime",

                "format" : "auto"

              }]

           },

          "dimensionsSpec" : {

            "dimensions" : [],

            "dimensionExclusions" : [

             ]

          },

          "format" : "json"

        }

      },

      "granularitySpec" : {

        "type" : "uniform",

        "segmentGranularity" : "hour",

        "queryGranularity" : "none"

      },

      "metricsSpec" : [

        {

          "type" : "count",

          "name" : "count"

        }

      ]

    },

Hi Experts/David,
Please suggest about these two below…

Thanks, I tried like this and getting null pointer exception , may be something wrong with my spec. Please refer my spec below

also, for your other point" how do i transform my message to make my timestamp column at root level?" and use tranquility kafka after that? please share samples.

Hey,

Your parseSpec still needs a timestampSpec, i.e.:

          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          }

As for message transforming, you currently would have to do the transformation as another stage in your pipeline (i.e. Storm, Samza, Spark Streaming, Kafka Streams, etc.) and feed the transformed data into Druid. If the flatten JSON spec works for you, then you may not need to do this.

Thanks David.
Finally got it to work using timestampSpec also along with my above mentioned field spec.