Column Flattening - String escaped values

Hi Druid community

I am currently facing an issue when flattening columns. I am ingesting data from a Kafka stream. Certain fields originate from a Postgres jsonb field which is then stringified when ingested into Kafka (see “txn” field below).

Now, I would like to flatten these columns with nested values, but I cannot figure out how to retrieve these values with jsonPath or jq. As an example, I would like to access the “rs” value within the “txn”. Is there away to strip the quotes within Druid such that it is recognized? I am attaching a full row below.

Thanks in advance!

Original row:

{
  "round": 62440,
  "intra": 0,
  "typeenum": 1,
  "asset": 0,
  "txid": "N01LNldMS0ZCUEMzMjNBVFNFS05FS1VUUVoyM1RDQ003NVNKTlNGQUhFTTY1R1lKNUFOUQ==",
  "txn": "{\"rs\": 30576000000, \"txn\": {\"fv\": 62000, \"gh\": \"wGHE2Pwdvd7S12BL5FaOP20EGYesN73ktiC1qzkkit8=\", \"lv\": 63000, \"amt\": 100000, \"fee\": 1000, \"gen\": \"mainnet-v1.0\", \"rcv\": \"AszogaMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=\", \"snd\": \"RvfOlpCGjRCcuYPXiEL9OljI5YAIjxXavoiTgXnO4xw=\", \"note\": \"QS4gOyBBbGV4IE1jQ2FiZSA7IEFsbGlzb24gTm9sYW4gOyBBbm5lIFdhcm5lciA7IGLimKVhIDsgQmVuamFtaW4gQ2hhbiA7IEJlbmphbWluIEQuIFdhcmQgOyBCbyBMaSA7IENocmlzIEh1cmxleSA7IGRlcmVrIDsgZGltaXRyaXMgOyBFbGxlIFlvdSA7IEVyaWMgR2llc2VrZSA7IEV2YW4gSmFtZXMgUmljaGFyZCA7IEdyZWcgQ29sdmluIDsgR1YgOyBIb2V0ZWNrIDsgSWFuIENyb3NzIDsgSmFrZSBFdmFuIEdyZWVuc3RlaW4gOyBKYXNvbiBXZWF0aGVyc2J5IDsgSmluZyBDaGVuIDsgS2FybWFzdGljIDsgS2F0cmljZSBHcmFkeSA7IEtlbGkgSiBDYWxsYWdoYW4gOyBMZW8gUmV5emluIDsgTGl6IEJhcmFuIDsgTWFrZW5hIFN0b25lIDsgTWFzb24oWXVkZSlIdWFuZyA7IE1hdXJpY2UgSGVybGloeSA7IE1heCBKdXN0aWN6IDsgTXVzcyA7IE5hdmVlZCBJaHNhbnVsbGFoIDsgTmlja29sYWkgWmVsZG92aWNoIDsgUGFibG8gQXphciA7IFBhdWwgUmllZ2xlIDsgUmVnaW5hIDsgcmZ1c3Rpbm8gOyBSb3RlbSBIZW1vICjXqNeV16rXnSDXl9ee15UpIDsgc2FtIGFiYmFzc2kgOyBTYXd5ZXIgSHVybGV5IDsgU2VyZ2V5IEdvcmJ1bm92ICjQodC10YDQs9C10Lkg0JPQvtGA0LHRg9C90L7QsikgOyBTaWx2aW8gTWljYWxpIDsgU3Jpaml0aCBQb2R1dmFsIDsgU3RldmVuIEtva2lub3MgOyBUc2FjaGkgSGVybWFuIDsgVHlsZXIgTWFya2xleSA7IFZpY3RvciBMdWNoYW5nY28gOyBXLiBTZWFuIEZvcmQgOyBXaWxsICJaZXJvTWljcm9uIiBXaW5kZXIgOyBZb3NzaSBHaWxhZCA7IFpoZW5mZWkgWmhhbmc=\", \"type\": \"pay\"}, \"msig\": {\"v\": 1, \"thr\": 4, \"subsig\": [{\"pk\": \"fDPiywtmtrpA2WOY+Mx9y6etNBCij1VKwZmGWW4PbKk=\"}, {\"pk\": \"ETnffVmxyVfJtVgCWFuStLsPJna9G1SHA1yJrfIo6RU=\"}, {\"s\": \"eBLuSsmbqXTtKcoDpI88t7CNyQ7ggJ8ZMGjpy+hLWnvjNi938/5U6Eb25Dmes0WLkCxnDZG7gsj3YIDmZfFLAA==\", \"pk\": \"hYkIN+Iyt2675q+XuYwoAzwR8B0P17WTUFGYn456E4o=\"}, {\"s\": \"45ndEdxV115jUGBmqt4WSjcBDg847CiPlE0w5omziLftSRzOtJSd5zrF1zkHOa1B1GJV4AE8E2qriMIbifnYBw==\", \"pk\": \"5ChQFEXiHWTeXoJCRymNn8rmEAJAxpaigu4wIgcaODU=\"}, {\"s\": \"LbmMSdKaqD/s9M1ldNAvLYGRMwxWdVPbl4i2zBVKwRnrRLM1Ape9zWMAxX1yJGxk/mAKGa9lZwAfQUlyus58Cw==\", \"pk\": \"RjQ91+zvYumrPm9UOEMN+GnlHW+0gliRCCV2b6KOlwk=\"}, {\"s\": \"47b3oXSW6ZVGXmnFy59iQZohcs79v4Da05MTNr0jkUAHl5kseS7Br0C838nbZB79Yj9+wt7kuiiJkCOFgAAwBw==\", \"pk\": \"k5F6WQJGyeiPHaN7fvmnBXz6YNq4NQ6BguE7yUmRWkI=\"}]}}",
  "extra": "{}"
}

Hi @abaschkim, have you tried the flattenSpec?

Yes, but I’m only left with null values. As an example:

“type”: “path”,
“name”: “txn.sig”,
“expr”: “$.txn.sig”

So I figure there is a problem that the json is stringyfied. Now I am trying to find a way to overcome this. Here’s my full spec:

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "http://HOST:9092"
      },
      "topic": "algorand-pgsql-txn",
      "inputFormat": {
        "type": "json",
        "flattenSpec": {
          "fields": [
            {
              "type": "path",
              "name": "txn.rs",
              "expr": "$.txn.rs"
            },
            {
              "type": "path",
              "name": "txn.sig",
              "expr": "$.txn.sig"
            }
          ]
        }
      },
      "inputSource": {
        "type": "inline",
        "data": "{\n  \"round\": 62440,\n  \"intra\": 0,\n  \"typeenum\": 1,\n  \"asset\": 0,\n  \"txid\": \"N01LNldMS0ZCUEMzMjNBVFNFS05FS1VUUVoyM1RDQ003NVNKTlNGQUhFTTY1R1lKNUFOUQ==\",\n  \"txn\": \"{\\\"rs\\\": 30576000000, \\\"txn\\\": {\\\"fv\\\": 62000, \\\"gh\\\": \\\"wGHE2Pwdvd7S12BL5FaOP20EGYesN73ktiC1qzkkit8=\\\", \\\"lv\\\": 63000, \\\"amt\\\": 100000, \\\"fee\\\": 1000, \\\"gen\\\": \\\"mainnet-v1.0\\\", \\\"rcv\\\": \\\"AszogaMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=\\\", \\\"snd\\\": \\\"RvfOlpCGjRCcuYPXiEL9OljI5YAIjxXavoiTgXnO4xw=\\\", \\\"note\\\": \\\"QS4gOyBBbGV4IE1jQ2FiZSA7IEFsbGlzb24gTm9sYW4gOyBBbm5lIFdhcm5lciA7IGLimKVhIDsgQmVuamFtaW4gQ2hhbiA7IEJlbmphbWluIEQuIFdhcmQgOyBCbyBMaSA7IENocmlzIEh1cmxleSA7IGRlcmVrIDsgZGltaXRyaXMgOyBFbGxlIFlvdSA7IEVyaWMgR2llc2VrZSA7IEV2YW4gSmFtZXMgUmljaGFyZCA7IEdyZWcgQ29sdmluIDsgR1YgOyBIb2V0ZWNrIDsgSWFuIENyb3NzIDsgSmFrZSBFdmFuIEdyZWVuc3RlaW4gOyBKYXNvbiBXZWF0aGVyc2J5IDsgSmluZyBDaGVuIDsgS2FybWFzdGljIDsgS2F0cmljZSBHcmFkeSA7IEtlbGkgSiBDYWxsYWdoYW4gOyBMZW8gUmV5emluIDsgTGl6IEJhcmFuIDsgTWFrZW5hIFN0b25lIDsgTWFzb24oWXVkZSlIdWFuZyA7IE1hdXJpY2UgSGVybGloeSA7IE1heCBKdXN0aWN6IDsgTXVzcyA7IE5hdmVlZCBJaHNhbnVsbGFoIDsgTmlja29sYWkgWmVsZG92aWNoIDsgUGFibG8gQXphciA7IFBhdWwgUmllZ2xlIDsgUmVnaW5hIDsgcmZ1c3Rpbm8gOyBSb3RlbSBIZW1vICjXqNeV16rXnSDXl9ee15UpIDsgc2FtIGFiYmFzc2kgOyBTYXd5ZXIgSHVybGV5IDsgU2VyZ2V5IEdvcmJ1bm92ICjQodC10YDQs9C10Lkg0JPQvtGA0LHRg9C90L7QsikgOyBTaWx2aW8gTWljYWxpIDsgU3Jpaml0aCBQb2R1dmFsIDsgU3RldmVuIEtva2lub3MgOyBUc2FjaGkgSGVybWFuIDsgVHlsZXIgTWFya2xleSA7IFZpY3RvciBMdWNoYW5nY28gOyBXLiBTZWFuIEZvcmQgOyBXaWxsICJaZXJvTWljcm9uIiBXaW5kZXIgOyBZb3NzaSBHaWxhZCA7IFpoZW5mZWkgWmhhbmc=\\\", \\\"type\\\": \\\"pay\\\"}, \\\"msig\\\": {\\\"v\\\": 1, \\\"thr\\\": 4, \\\"subsig\\\": [{\\\"pk\\\": \\\"fDPiywtmtrpA2WOY+Mx9y6etNBCij1VKwZmGWW4PbKk=\\\"}, {\\\"pk\\\": \\\"ETnffVmxyVfJtVgCWFuStLsPJna9G1SHA1yJrfIo6RU=\\\"}, {\\\"s\\\": \\\"eBLuSsmbqXTtKcoDpI88t7CNyQ7ggJ8ZMGjpy+hLWnvjNi938/5U6Eb25Dmes0WLkCxnDZG7gsj3YIDmZfFLAA==\\\", \\\"pk\\\": \\\"hYkIN+Iyt2675q+XuYwoAzwR8B0P17WTUFGYn456E4o=\\\"}, {\\\"s\\\": \\\"45ndEdxV115jUGBmqt4WSjcBDg847CiPlE0w5omziLftSRzOtJSd5zrF1zkHOa1B1GJV4AE8E2qriMIbifnYBw==\\\", \\\"pk\\\": \\\"5ChQFEXiHWTeXoJCRymNn8rmEAJAxpaigu4wIgcaODU=\\\"}, {\\\"s\\\": \\\"LbmMSdKaqD/s9M1ldNAvLYGRMwxWdVPbl4i2zBVKwRnrRLM1Ape9zWMAxX1yJGxk/mAKGa9lZwAfQUlyus58Cw==\\\", \\\"pk\\\": \\\"RjQ91+zvYumrPm9UOEMN+GnlHW+0gliRCCV2b6KOlwk=\\\"}, {\\\"s\\\": \\\"47b3oXSW6ZVGXmnFy59iQZohcs79v4Da05MTNr0jkUAHl5kseS7Br0C838nbZB79Yj9+wt7kuiiJkCOFgAAwBw==\\\", \\\"pk\\\": \\\"k5F6WQJGyeiPHaN7fvmnBXz6YNq4NQ6BguE7yUmRWkI=\\\"}]}}\",\n  \"extra\": \"{}\"\n}\n"
      }
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "algorand-pgsql-txn"
    }
  }
}

Thanks for letting me know. I’m playing around with this and may have managed to duplicate the problem:

I’ll keep at it and see if I can make any progress.

1 Like