Ingesting non standard format data from a Kinesis data stream

Hi,

I have to ingest data coming from a Kinesis data stream in an unusual format (following some industrial norm) compared to the examples of the documentation. Here are two examples:

{“IoT_735_PL024_ECP001_S1_TUR003/WTUR1.ST.TurSt.st.q”:“false”,
“IoT_735_PL024_ECP001_S1_TUR003/WTUR1.ST.TurSt.st.stVal”:“7”,
“IoT_735_PL024_ECP001_S1_TUR003/WTUR1.ST.TurSt.st.t”:“1599756246”,
“id”:“29924802”}
or
{ “IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.ackTm”: “false”, “IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.almLev”: “1”,
“IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.idx”: “0”, “IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.par1”: “243”, “IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.par2”: “377”,
“IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.t”: “1599755973”, “IoT_735_PL014_ECP001_S1_TUR004/WALM1.ST.Alm.tripTm”: “false”,
“id”: “27022927”},

For the first example a more conventional json would be (however I have no control over the kinesis data stream format):
{“asset”:“PL024_ECP001_S1_TUR003”,
“WTUR1.ST.TurSt.st.q”:“false”,
“WTUR1.ST.TurSt.st.stVal”:“7”,
“timestamp”:“1599756246”}

It has been difficult for me to find how to express that in terms of a google search to find related situations, so apologies if this is already answered.

How do you recommend I can proceed with this? Is it best to have an intermediary step that transforms the data from the stream into a nicely ingestible json, and if so which technical solution would you recommend (in AWS environment)? Or is it possible to do that in Druid?

(This is a repost as I was not happy about the formulation of my previous post).

Many thanks,
Adam

Hey Adam - in your example it does look like you’re doing field renames - you can use a super simple transform for that - but your also adding a field called “asset” which is abstracted from an actual field name? Is that right?

https://druid.apache.org/docs/latest/ingestion/index.html#transformspec

Maybe look at AWS Lambda to do transformations on the Kinesis stream if you want maximum control and flexibility?

Ping me back if this does / doesn’t help…!

Oh I think I get it… preceeding the / you get the asset name… and you need to pull that out into a different event with the asset as a dimension value… hmmm… Yeah I think you will need to use a stream transform to do that… (/me continues thinking)

Yes, it’s what we decided to do in the end! We used an AWS Lambda to transform the data of the stream in a more suitable format for ingestion.
It is now fine for ingestion!

Many thanks,
Adam

This is a test message that should not be seen.

No worries. Let us know how it goes :smiley: