Data transformation (cast expression) for kafka ingestion

Hey all,

I’m trying to cast some fields of the kafka messages I’m ingesting but it doesn’t seem to work as they end up null.

Is this supported by druid?

Thanks,

Michael

Hi Michael,

I think you can use expression transformSpec. Please check http://druid.io/docs/latest/ingestion/transform-spec.html#expression-transform.

Jihoon

Hello Jihoon,

thanks for taking the time to answer.
I’m ingesting data using kafka ingestion, and the transformations don’t seem to work, this is my supervisor-spec

{
“type”: “kafka”,
“dataSchema”: {
“dataSource”: “cdrs-kafka-two”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“format”: “iso”,
“column”: “@timestamp
},
“dimensionsSpec”: {
“dimensions”: [
“filedate”,
“cdr_comment”,
“incoming_trunk_grp_name”,
“outgoing_trunk_grp_name”,
“dest_type”,
“orig_type”,
“switch_description”,
“trunk_type”,
“dest_area_code”,
“incoming_contract_desc”,
“outgoing_contract_desc”,
“outgoing_customer_name”,
“incoming_customer_name”,
“release_desc”,
“cdr_day”,
“orig_mkt_id”,
“incoming_denver_trnkgrp_id”,
“outgoing_denver_trnkgrp_id”,
“incoming_denver_contract_id”,
“outgoing_denver_contract_id”,
“seizure_time”,
“answer_time”,
“disconnect_time”,
“completion_code”,
“in_rlsdir”,
“out_rlsdir”,
“out_wnp_id”,
“dest_mkt_id”,
“imt_status”,
“completion_swid”,
“completion_desc”,
“incoming_switch_trunk_id”,
“outgoing_switch_trunk_id”,
“calling_number”,
“outgoing_denver_customer_id”,
“incoming_denver_customer_id”,
“origin_country_name”,
“switch_id”,
“message”,
“in_wnp_id”,
“dest_country_name”,
“called_number”,
“event_order”,
“post_dial_delay”,
“rounded_duration”,
“isanr”,
{
“name”:“rounded_duration_long”,
“type”:“long”
},
{
“name”:“post_dial_delay_long”,
“type”:“long”
},
{
“name”:“isanr_long”,
“type”:“long”
}
]
}
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: “NONE”,
“rollup”: false
},
“transformSpec”: {
“transforms”: [
{
“type”: “expression”,
“name”: “rounded_duration_long”,
“expression”: “cast(rounded_duration,‘LONG’)”
},
{
“type”: “expression”,
“name”: “post_dial_delay_long”,
“expression”: “cast(post_dial_delay,‘LONG’)”
},
{
“type”: “expression”,
“name”: “isanr_long”,
“expression”: “cast(isanr,‘LONG’)”
}
]
}
},
“ioConfig”: {
“topic”: “cdr-one”,
“replicas”: 2,
“taskDuration”: “PT10M”,
“completionTimeout”: “PT20M”,
“consumerProperties”: {
“bootstrap.servers”: “localhost:9092”
}
},
“tuningConfig”: {
“type”: “kafka”,
“reportParseExceptions”: false,
“httpTimeout”:“PT59S”
}
}
Enter code here…

``

Michael

Hi Michael,

That looks to me like it should work. I’d try double checking that you’re using a version new enough to support ingest-time transforms (I think 0.12). I’d also try checking that the input data is actually castable to long (it needs to look like an integer, basically: something that Long.parseLong could handle).

Hi Gian,

Thanks for answering, yea the fields are pretty much “1” or “4”…

But i would like to stress that I’m talking about Kafka ingestion and not Batch ingestion, when reading about Kafka Indexing service and the relative supervisor spec here:http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html

there’s no “transformationSpec” reference… So I’m starting to think that Druid 0.12.3 does not support transformation when consuming Kafka messages…

If I’m mistaken please correct me.

Michael

Hi Michael,

since dataSchema supports transformSpec, every task types using dataSchema supports transformation at ingestion time. I think Kafka should support it too.

Would you tell me what errors you’re seeing?

Jihoon

Hey Jihoon,

No errors, just null values on the fields I’ve created (e.g.: rounded_duration_long)…

since dataSchema supports transformSpec, every task types using dataSchema supports transformation at ingestion time. I think Kafka should support it too.

^^ that was my initial thought too …

Hmm, Michael, have you had a chance to check what the original value was before casting? Did 1 or 4 come from your check? Then, what was the exact string representation of them?

Please note that they can be null if they are in some unsupported format.
Since your dimensionsSpec has both ‘rounded_duration’ and ‘rounded_duration_long’, you can check what values were exactly before casting.

Jihoon

I came with new findings, i made the exact same ingestion spec, this time for Batch ingestion

{
“type”: “index”,
“spec”:{
“dataSchema”: {
“dataSource”: “unrated-cdrs-quoted-index”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“timestampSpec”: {
“format”: “iso”,
“column”: “@timestamp
},
“dimensionsSpec”: {
“dimensions”: [
“filedate”,
“cdr_comment”,
“incoming_trunk_grp_name”,
“outgoing_trunk_grp_name”,
“dest_type”,
“orig_type”,
“switch_description”,
“trunk_type”,
“dest_area_code”,
“incoming_contract_desc”,
“outgoing_contract_desc”,
“outgoing_customer_name”,
“incoming_customer_name”,
“release_desc”,
“cdr_day”,
“orig_mkt_id”,
“incoming_denver_trnkgrp_id”,
“outgoing_denver_trnkgrp_id”,
“incoming_denver_contract_id”,
“outgoing_denver_contract_id”,
“seizure_time”,
“answer_time”,
“disconnect_time”,
“completion_code”,
“in_rlsdir”,
“out_rlsdir”,
“out_wnp_id”,
“dest_mkt_id”,
“imt_status”,
“completion_swid”,
“completion_desc”,
“incoming_switch_trunk_id”,
“outgoing_switch_trunk_id”,
“calling_number”,
“outgoing_denver_customer_id”,
“incoming_denver_customer_id”,
“origin_country_name”,
“switch_id”,
“message”,
“in_wnp_id”,
“dest_country_name”,
“called_number”,
“event_order”,
“post_dial_delay”,
“rounded_duration”,
“isanr”,
{
“name”:“rounded_duration_long”,
“type”:“long”
},
{
“name”:“post_dial_delay_long”,
“type”:“long”
},
{
“name”:“isanr_long”,
“type”:“long”
}
]
}
}
},
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “day”,
“queryGranularity”: “none”,
“intervals”: [“2011-09-12/2019-09-13”],
“rollup”: false
},
“transformSpec”: {
“transforms”: [{
“type”: “expression”,
“name”: “rounded_duration_long”,
“expression”: “cast(rounded_duration,‘LONG’)”
},
{
“type”: “expression”,
“name”: “post_dial_delay_long”,
“expression”: “cast(post_dial_delay,‘LONG’)”
},
{
“type”: “expression”,
“name”: “isanr_long”,
“expression”: “cast(isanr,‘LONG’)”
}
]
}
},
“ioConfig”: {
“type”: “index”,
“firehose”: {
“type”: “local”,
“baseDir”: “/home/mpdev/Workspace/Druid/RemoteDruid/data”,
“filter”: “cdr-data_with_type.json”
},
“appendToExisting”: false
},
“tuningConfig”: {
“type”: “index”,
“targetPartitionSize”: 5000000,
“maxRowsInMemory”: 25000,
“forceExtendableShardSpecs”: true
}
}
}

``

which works perfectly, and converts the values correctly…!

This makes me think that transformations during Kafka real-time ingestion are not available…

the fields with _long suffix, are added in the spec by me because otherwise druid wont create the new fields. So druid converts the value and puts it in that field.

Hi Michael,

thank you for the testing. It sounds definitely a bug!

I raised https://github.com/apache/incubator-druid/issues/6534 and will further check it.

Thanks,

Jihoon

Michael,

I’ve tested two things.

  • ‘KafkaIndexTaskTest.testRunWithTransformSpec()’ after tweaking it to include a transformSpec with cast

  • Running kafka ingestion with the transformSpec with cast.

And both worked for me.

I think there’s something wrong in your supervisor spec… Please double check it.

I attached the supervisor spec i used. Hope this helps.

Jihoon

wikiticker-transform.json (2.09 KB)

Jihoon,

I don’t think i can help you enough for putting the time on this.
I started rewriting the supervisor specs after reading yours and now to my surprise they work fine,

I spent the whole day today trying to figure out what i was doing wrong, or if this is an erratic behavior
of some sort, but nothing came out…

I’ll continue the following days, i hope i don’t find anything and assume that i was wrong from the beginning.

Thank you again,

Michael

Thanks Michael,

good to hear that your issue has gone.

Please let me know if you have further questions.

Best,

Jihoon