Hadoop-based batch ingestion get “java.lang.NullPointerException” error

Hi,
I have two question ,when i do hadoop-based batch ingestion.

First,

**error message:**
Exception running child : io.druid.java.util.common.RE: Failure on row[3b81dc53ba66\x10,,\xe0\xcf\xe97c3b7e4b519103	123	abc	false	2018-01-02 01:57:44]
	at io.druid.indexer.HadoopDruidIndexerMapper.map(HadoopDruidIndexerMapper.java:91)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:155)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:805)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.NullPointerException
	at io.druid.query.aggregation.datasketches.theta.SketchAggregator.updateUnion(SketchAggregator.java:107)
	at io.druid.query.aggregation.datasketches.theta.SketchAggregator.aggregate(SketchAggregator.java:51)
	at io.druid.indexer.InputRowSerde.toBytes(InputRowSerde.java:98)
	at io.druid.indexer.IndexGeneratorJob$IndexGeneratorMapper.innerMap(IndexGeneratorJob.java:300)
	at io.druid.indexer.HadoopDruidIndexerMapper.map(HadoopDruidIndexerMapper.java:87)
	... 8 more

**data:**
3b81dc53ba66\x10**,,**\xe0\xcf\xe97c3b7e4b519103	123	abc	false	2018-01-02 01:57:44
3b81dc53ba66\x10**,,**\xe0\xcf\xe97c3b7e4b519103	456	ccc	false	2018-01-02 02:57:44
3b81dc53ba667c3b7e4b519103	456	ccc	true	2018-01-02 03:57:44


**json file:**
{
    "type":"index_hadoop",
    "spec":{
        "ioConfig":{
            "type":"hadoop",
            "inputSpec":{
                "type":"static",
                "paths":".../AbtestExperimentExec.txt"
            }
        },
        "dataSchema":{
            "dataSource":"valid_llj",
            "granularitySpec":{
                "type":"uniform",
                "segmentGranularity":"day",
                "queryGranularity":"day",
                "intervals":[
                    "2018-01-02/2018-01-03"
                ]
            },
            "parser":{
                "type":"hadoopyString",
                "parseSpec":{
                    "format":"tsv",
                    **"listDelimiter":",",**
                    "dimensionsSpec":{
                        "dimensions":[
                            "rid",
                            "sdkver",
                            "sid"
                        ]
                    },
                    "columns":[
                        "uid",
                        "rid",
                        "sdkver",
                        "sid",
                        "acc_time"
                    ],
                    **"delimiter":"\t",**
                    "timestampSpec":{
                        "format":"yyyy-MM-dd HH:mm:ss",
                        "column":"acc_time"
                    }
                }
            },
            "metricsSpec":[
                {
                    "name":"r",
                    "type":"count"
                },
                {
                    "name":"m2",
                    "type":"thetaSketch",
                    "fieldName":"uid"
                }
            ]
        },
        "tuningConfig":{
            "type":"hadoop",
            "partitionsSpec":{
                "type":"hashed",
                "targetPartitionSize":5000000
            },
            "jobProperties":{
                "mapreduce.map.memory.mb":"1000",
                "mapreduce.reduce.memory.mb":"12000",
                "mapreduce.job.queuename":"druid",
                "mapreduce.job.classloader":true
            }
        }
    }
}


**code:**

static void updateUnion(Union union, Object update)
{
if (update instanceof SketchHolder) {
((SketchHolder) update).updateUnion(union);
} else if (update instanceof String) {
union.update((String) update);
} else if (update instanceof byte) {
union.update((byte) update);
} else if (update instanceof Double) {
union.update(((Double) update));
} else if (update instanceof Integer || update instanceof Long) {
union.update(((Number) update).longValue());
} else if (update instanceof int) {
union.update((int) update);
} else if (update instanceof long) {
union.update((long) update);
} else if (update instanceof List) {
for (Object entry : (List) update) {

      union.update(entry.toString());//**Is there need to add a judge for null???**
}

} else {
throw new ISE(“Illegal type received while theta sketch merging [%s]”, update.getClass());
}
}



Because first column has two commas, after the split, there is a column is empty, in the update, the empty pointer will be reported exceptions, here need to add to the empty judgment


**Second,**

Is there a way to make listDelimiter only work on dimension?


thank you!