How to flatten nested JSON

Hi All
I’m trying to flatten a JSON data coming from kafka stream as per below structure

Original row: {
“ID”: 2,
“KEY”: “iHUfWwb7CYkoTQvmSb6o6NZBk9XwxcDr”,
“CREATED_ON”: 1568878778160981,
“UPDATED_ON”: 1568878868240763,
“PERSISTENT”: 0,
“DELETED”: 1,
“FINISHED”: 1,
“TITLE”: “”,
“DATA”: “{“payment_method_id”:1,“lines”:[{“accounting_identifier”:”",“line_id”:“272270373”,“base_unit_price”:{"$dec":“100”},“parent_line_id”:null,“type”:{"$enum":[“shuup.core.models:OrderLineType”,1]},“sku”:“1”,“text”:“Movie English Name”,“discounted_price”:“50”,“product_id”:1,“quantity”:1,“discount_amount”:{"$dec":“50”},“supplier_id”:1,“tax_class_id”:1,“shop_id”:1,“require_verification”:false}],“customer_id”:2,“orderer_id”:2}",
“CURRENCY”: “AED”,
“PRICES_INCLUDE_TAX”: 1,
“PRODUCT_COUNT”: 1,
“CREATOR_ID”: 2,
“CUSTOMER_ID”: 2,
“SHOP_ID”: 1,
“ORDERER_ID”: 2
}

My Supervisor spec looks like this

{
“type”:“kafka”,
“dataSchema”:{
“dataSource”:“datasourcename”,
“parser”:{
“type”:“string”,
“parseSpec”:{
“format”:“json”,
“timestampSpec”:{
“column”:“CREATED_ON”,
“format”:“micro”
},
“dimensionsSpec”:{
“dimensions”:[
{
“type”:“long”,
“name”:“CREATOR_ID”
},
“CURRENCY”,
{
“type”:“long”,
“name”:“CUSTOMER_ID”
},
“DATA”,
{
“type”:“long”,
“name”:“DELETED”
},
{
“type”:“long”,
“name”:“FINISHED”
},
{
“type”:“long”,
“name”:“ID”
},
“KEY”,
{
“type”:“long”,
“name”:“ORDERER_ID”
},
{
“type”:“long”,
“name”:“PERSISTENT”
},
{
“type”:“long”,
“name”:“PRICES_INCLUDE_TAX”
},
{
“type”:“long”,
“name”:“PRODUCT_COUNT”
},
{
“type”:“long”,
“name”:“SHOP_ID”
},
{
“type”:“long”,
“name”:“UPDATED_ON”
}
]
},
“flattenSpec”:{
“useFieldDiscovery”:true,
“fields”:[
{
“type”:“path”,
“name”:“PAYMENT_METHOD_ID”,
“expr”:"$.payment_method_id"
}
]
}
}
},
“metricsSpec”:[

  ],
  "granularitySpec":{  
     "type":"uniform",
     "segmentGranularity":"HOUR",
     "queryGranularity":{  
        "type":"none"
     },
     "rollup":false,
     "intervals":null
  },
  "transformSpec":{  
     "filter":null,
     "transforms":[  

     ]
  }

}

But I’m not able to extract data from JSON within DATA column using the above flattenSpec.

Any ideas on how to solve this would be really helpful

Thanks in advance
Visakh

the below ingestion spec works with your data. The below spec will add paymenth_method_id and line_id as dimensions. You need to specify a path expression for every field you want to include in your dimensions. I have kept is simple below and used your json in a file.

{
“type”: “index_parallel”,
“ioConfig”: {
“type”: “index_parallel”,
“firehose”: {
“type”: “local”,
“baseDir”: “/Users/vijaynarayanan/Documents”,
“filter”: “test.json”
}
},
“tuningConfig”: {
“type”: “index_parallel”
},
“dataSchema”: {
“dataSource”: “Documents1”,
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: “HOUR”,
“rollup”: true
},
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“column”: “!!!no_such_column!!!”,
“missingValue”: “2010-01-01T00:00:00Z”
},
“flattenSpec”: {
“fields”: [
{
“type”: “path”,
“name”: “payment_method_id”,
“expr”: “.payment_method_id" }, { "type": "path", "expr": ".lines[*].line_id”,
“name”: “line_id”
}
]
},
“dimensionsSpec”: {
“dimensions”:
}
}
},
“metricsSpec”: [
{
“name”: “count”,
“type”: “count”
}
]
}
}

FWIW, I would only use the flattening portion of the previous spec. The rest of your original spec looks ok.

Thanks for the response Vijay and Fangjin
Sorry for not getting back sooner on this
I tried as suggested using flattened spec as above but still I cant see these columns getting added to my data in preview tab as well as in the query window when I try to query the datasource in Druid. I double checked and ensured the submitted Supervisor spec contains the above flattenspec part.

Regards
Visakh

please send you current spec

Hi Vijay

It looks like the below

{
“type”:“kafka”,
“dataSchema”:{
“dataSource”:“datasourcename”,
“parser”:{
“type”:“string”,
“parseSpec”:{
“format”:“json”,
“timestampSpec”:{
“column”:“CREATED_ON”,
“format”:“micro”
},
“flattenSpec”:{
“fields”:[
{
“type”:“path”,
“name”:“payment_method_id”,
“expr”:".payment_method_id" }, { "type":"path", "name":"product_id", "expr":".lines[*].product_id"
}
]
},
“dimensionsSpec”:{
“dimensions”:[

           ]
        }
     }
  },
  "metricsSpec":[  
     {  
        "type":"count",
        "name":"count"
     }
  ],
  "granularitySpec":{  
     "type":"uniform",
     "segmentGranularity":"HOUR",
     "queryGranularity":"HOUR",
     "rollup":true,
     "intervals":null
  },
  "transformSpec":{  
     "filter":null,
     "transforms":[  

     ]
  }

},
“tuningConfig”:{
“type”:“kafka”,
“maxRowsInMemory”:1000000,
“maxBytesInMemory”:0,
“maxRowsPerSegment”:5000000,
“maxTotalRows”:20000000,
“intermediatePersistPeriod”:“PT10M”,
“basePersistDirectory”:"/opt/druid/var/tmp/1572154666890-0",
“maxPendingPersists”:0,
“indexSpec”:{
“bitmap”:{
“type”:“concise”
},
“dimensionCompression”:“lz4”,
“metricCompression”:“lz4”,
“longEncoding”:“longs”
},
“indexSpecForIntermediatePersists”:{
“bitmap”:{
“type”:“concise”
},
“dimensionCompression”:“lz4”,
“metricCompression”:“lz4”,
“longEncoding”:“longs”
},
“buildV9Directly”:true,
“reportParseExceptions”:false,
“handoffConditionTimeout”:0,
“resetOffsetAutomatically”:false,
“segmentWriteOutMediumFactory”:null,
“workerThreads”:null,
“chatThreads”:null,
“chatRetries”:8,
“httpTimeout”:“PT10S”,
“shutdownTimeout”:“PT80S”,
“offsetFetchPeriod”:“PT30S”,
“intermediateHandoffPeriod”:“P2147483647D”,
“logParseExceptions”:true,
“maxParseExceptions”:2147483647,
“maxSavedParseExceptions”:0,
“skipSequenceNumberAvailabilityCheck”:false
},
“ioConfig”:{
“topic”:“TopicName”,
“replicas”:1,
“taskCount”:1,
“taskDuration”:“PT3600S”,
“consumerProperties”:{
“bootstrap.servers”:“kafka:29092”
},
“pollTimeout”:100,
“startDelay”:“PT5S”,
“period”:“PT30S”,
“useEarliestOffset”:true,
“completionTimeout”:“PT1800S”,
“lateMessageRejectionPeriod”:null,
“earlyMessageRejectionPeriod”:null,
“stream”:“StreamName”,
“useEarliestSequenceNumber”:true,
“type”:“kafka”
},
“context”:null,
“suspended”:false
}

what is your kafka topic?

I see “topic”:“TopicName” in your spec is you topic called “TopicName”?

I just changed it for example purpose
Actual case its different.

I checked the topic through K-SQL and can confirm the stream data is coming fine. Can also see the preview on Druid data loader and data looks like below. Issue is I’m unable to flatten out data elements from within the JSON. If it matters, these are optional fields so not all rows will contain data for all these JSON elements

Original row: {
  "ID": 169,
  "KEY": "EAOyAgMtU34IvrAGyzHS34miF1kaL2eN",
  "CREATED_ON": 1570710632300710,
  "UPDATED_ON": 1570710871896125,
  "PERSISTENT": 0,
  "DELETED": 0,
  "FINISHED": 0,
  "TITLE": "",
  "DATA": "{\"shipping_method_id\":null,\"payment_method_id\":null,\"customer_comment\":\"\",\"lines\":[{\"discounted_price\":null,\"cinema_price\":null,\"selected_seats\":[{\"rowLabel\":\"E\",\"seatLabel\":\"1\",\"seatInfo\":\"E1\"}],\"ticket_type_id\":\"0001\",\"show_time_id\":\"135184\",\"partner_id\":\"1\",\"cinema_id\":\"0001\",\"line_id\":\"420944007\",\"parent_line_id\":null,\"type\":{\"$enum\":[\"shuup.core.models:OrderLineType\",1]},\"shop_id\":1,\"product_id\":6,\"supplier_id\":1,\"tax_class_id\":1,\"quantity\":1,\"base_unit_price\":{\"$dec\":\"0\"},\"discount_amount\":{\"$dec\":\"0\"},\"sku\":\"dream-girl\",\"text\":\"DREAM GIRL\",\"require_verification\":false,\"accounting_identifier\":\"\"}],\"codes\":[],\"extra_data\":{\"responseCode\":\"21\",\"responseDescription\":\"Showtime is not configured to be seat-allocated\",\"orderId\":\"f728eb235863400ca433696f973fc4a3\",\"totalPrice\":50.0,\"quantity\":1,\"isSeatAllocated\":true,\"tickets\":[{\"ticketTypeId\":\"0001\",\"price\":50.0,\"seatInfo\":[{\"rowLabel\":\"E\",\"seatLabel\":\"1\",\"seatInfo\":\"E1\"}]}]}}",
  "CURRENCY": "AED",
  "PRICES_INCLUDE_TAX": 1,
  "PRODUCT_COUNT": 1,
  "CREATOR_ID": null,
  "CUSTOMER_ID": null,
  "SHOP_ID": 1,
  "ORDERER_ID": null
}

i think the escape characters in the DATA element are causing an issue. The typical path would be $.DATA.payment_method_id but this does not work with the escape characters.

How can I cleanse it out?

As far as I see, this is happening only inside the Druid.
When I inspect the same topic in K-SQL, it shows the data like below without any escape characters

{“orderer_id”:“anonymous”,“customer_id”:“anonymous”,“lines”:[{“unit_price”:50.0,“selected_seats”:[{“seatLabel”:“3”,“rowLabel”:“A”,“seatInfo”:“A3”}],“ticket_type_id”:“0142”,“show_time_id”:“139248”,“partner_id”:“1”,“cinema_id”:“0001”,“line_id”:“1912536427”,“parent_line_id”:null,“type”:{"$enum":[“shuup.core.models:OrderLineType”,1]},“shop_id”:1,“product_id”:17,“supplier_id”:1,“tax_class_id”:1,“quantity”:1,“base_unit_price”:{"$dec":“50”},“discount_amount”:{"$dec":“0”},“sku”:“81DEFD69254200C55A65CBB0AA2E50CF0C53D0B68704A6DE472A4CCFF614E444”,“text”:“WAR (HINDI)”,“require_verification”:false,“accounting_identifier”:""}],“extra_data”:{“partnerId”:“1”,“cinemaId”:“0001”,“showTimeId”:“139248”,“orderId”:“ff2d6018734a40c68cca6d3dcca1177d”,“totalPrice”:50.0,“quantity”:1,“isSeatAllocated”:true,“tickets”:[{“ticketTypeId”:“0142”,“price”:50.0,“seatInfo”:[{“seatLabel”:“3”,“rowLabel”:“A”,“seatInfo”:“A3”}]}]}}

The json is not valid without the escape characters as you have double quote for the value of DATA. If you use the below json (I just removed the opening and closing quote for the value of DATA and tested by posting to kafka) then you will see that the console auto generates the flatten spec. The issue is that with the double quote for the value of data there is no way of using json path to flatten.

{“ID”: 169,“KEY”: “EAOyAgMtU34IvrAGyzHS34miF1kaL2eN”,“CREATED_ON”: 1570710632300710,“UPDATED_ON”: 1570710871896125,“PERSISTENT”: 0,“DELETED”: 0,“FINISHED”: 0,“TITLE”: “”,“CURRENCY”: “AED”,“DATA”: {“shipping_method_id”:null,“payment_method_id”:null,“customer_comment”:"",“lines”:[{“discounted_price”:null,“cinema_price”:null,“selected_seats”:[{“rowLabel”:“E”,“seatLabel”:“1”,“seatInfo”:“E1”}],“ticket_type_id”:“0001”,“show_time_id”:“135184”,“partner_id”:“1”,“cinema_id”:“0001”,“line_id”:“420944007”,“parent_line_id”:null,“type”:{"$enum":[“shuup.core.models:OrderLineType”,1]},“shop_id”:1,“product_id”:6,“supplier_id”:1,“tax_class_id”:1,“quantity”:1,“base_unit_price”:{"$dec":“0”},“discount_amount”:{"$dec":“0”},“sku”:“dream-girl”,“text”:“DREAM GIRL”,“require_verification”:false,“accounting_identifier”:""}],“codes”:,“extra_data”:{“responseCode”:“21”,“responseDescription”:“Showtime is not configured to be seat-allocated”,“orderId”:“f728eb235863400ca433696f973fc4a3”,“totalPrice”:50.0,“quantity”:1,“isSeatAllocated”:true,“tickets”:[{“ticketTypeId”:“0001”,“price”:50.0,“seatInfo”:[{“rowLabel”:“E”,“seatLabel”:“1”,“seatInfo”:“E1”}]}]}},“PRICES_INCLUDE_TAX”: 1,“PRODUCT_COUNT”: 1,“CREATOR_ID”: null,“CUSTOMER_ID”: null,“SHOP_ID”: 1,“ORDERER_ID”: null}