Need help with data ingestion from S3 and local files

Hi


I am trying to ingest data both from a S3 bucket and a local filesystem (separately using different ingestion config 
files and to different data sources). I am using druid-0.12.0

I have the following set up in common.runtime,properties in both conf/druid/_common and conf-quickstart/druid/_common 
directories I have druid_s3_extensions in loadList.

For S3:

druid.storage.type=s3

druid.storage.bucket=

druid.storage.baseKey=druid/segments

druid.s3.accessKey=

druid.s3.secretKey=


But when I have the above the my S3 file data gets ingested properly but I get the below exception when I try reading data from 
local file system.

java.lang.Exception: java.io.IOException: No FileSystem for scheme: s3n
	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) ~[hadoop-mapreduce-client-common-2.7.3.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) [hadoop-mapreduce-client-common-2.7.3.jar:?]
Caused by: java.io.IOException: No FileSystem for scheme: s3n
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660) ~[hadoop-common-2.7.3.jar:?]

	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) ~[hadoop-common-2.7.3.jar:?]


When I comment the above S3 section in both the common.runtime.properties file the ingestion for local file works as expected. 
Is this how it is supposed to be? Is there a way to have both work without making any changes to the 
common.runtime.properties file?

My S3 file ingestion config file looks like -

“ioConfig” : {
“type” : “index”,
“firehose” : {
“type” : “static-s3”,
“uris” : [
“s3:///”,
“s3:///”,
“s3:///”

    ],
    "prefixes" : []
},
"appendToExisting" : true

}

My local file ingestion config file looks like this -

"ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : {
        "type" : "multi",
        "children": [
            {
                "type" : "static",
                "paths" : "<file>"
            },
            {
                "type" : "static",
                "paths" : "<file>"
            },
            {
                "type" : "static",
                "paths" : "<file>"
            }
        ]
    }
}

Can someone please let me know what I am doing wrong?

Thank you

Hi,

this error might occur if you’re using Hadoop index task and proper configurations are not set in the jobProperties of your task spec. If you want to load data from your local file system, you can simply use the index task (http://druid.io/docs/latest/ingestion/tasks.html#index-task) as you did for s3. One thing different is that you should use localFirehose (http://druid.io/docs/latest/ingestion/firehose.html#localfirehose) instead of StaticS3Firehose.

Best,

Jihoon

2018년 5월 3일 (목) 오후 2:27, Sameer Paradkar learn.samparadkar@gmail.com님이 작성:

Thanks a lot for the reply.

But my problem is I am trying to do delta ingestion (I want the data in those three files to be appended and not overwritten in datastore - since they all might have similar timestamps). So I need to have type “multi”. When I try changing inputSpec to firehose I got below error.

curl -X ‘POST’ -H ‘Content-Type:application/json’ -d @load_files.json localhost:8090/druid/indexer/v1/task

{“error”:“Could not resolve type id ‘multi’ into a subtype of [simple type, class io.druid.data.input.FirehoseFactory]\n at [Source: HttpInputOverHTTP@6e3ef583[c=4612,q=1,[0]=EOF,s=STREAM]; line: 1, column: 124]”}

When I try changing type “multi” to “local” - curl command runs fine but throws exception in log while ingesting -

2018-05-04T00:39:16,002 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[IndexTask{id=index_04132018_S3_2018-05-04T00:39:10.878Z, type=index, dataSource=04132018_S3}]
java.lang.NullPointerException
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:213) ~[guava-16.0.1.jar:?]
	at io.druid.segment.realtime.firehose.LocalFirehoseFactory.initObjects(LocalFirehoseFactory.java:83) ~[druid-server-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.AbstractTextFilesFirehoseFactory.connect(AbstractTextFilesFirehoseFactory.java:57) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.AbstractTextFilesFirehoseFactory.connect(AbstractTextFilesFirehoseFactory.java:46) ~[druid-api-0.12.0.jar:0.12.0

How should the config that reads local file(s) should look like if I have to do delta ingestion? I would really appreciate your help.

Thank you.

The ‘index’ task also supports appending. Please check the appendToExisting option.

The below NPE is because of the missing ‘baseDir’ configuration.
You might want an ioConfig like below:

“ioConfig” : {

“type” : “index”,

“firehose” : {

“type” : “local”,

“baseDir” : “/path/to/your/data/dir”,

“filter” : “regex-to-filter-data”

},

“appendToExisting”: true

}

Jihoon

2018년 5월 3일 (목) 오후 5:41, Sameer Paradkar learn.samparadkar@gmail.com님이 작성:

Thanks a lot Jihoon.
I changed my config like the you said and I see the data getting properly loaded without any errors and also getting appended instead of getting overwritten.

Thanks a lot again.

Glad to hear your issue has been resolved!

Jihoon

2018년 5월 3일 (목) 오후 6:39, Sameer Paradkar learn.samparadkar@gmail.com님이 작성:

Hi Jihoon - I have one more question - Can I use local firehose like you suggested for avro files?

I mean is the below config valid? It is very important for me to use appendToExisting : true so I was using firehose instead of inputSpec with type :static and inputFormat and path. By doing this I am getting below exception. Can you please suggest what I am doing wrong?

"ioConfig" : {
    "type" : "index",
    "firehose" : {
        "type": "local",
        "inputFormat": "io.druid.data.input.avro.AvroValueInputFormat",
        "baseDir" : "<directory>",
        "filter" : "<someFile>.avro"
    },
    "appendToExisting" : true
}
......
......

“tuningConfig” : {
“type” : “index”,
“partitionsSpec” : {
“type” : “hashed”,
“targetPartitionSize” : 5000000
},
“jobProperties” : {
“avro.schema.input.value.path” : “/<sameSchemaFile_Used_To_Convert_Parquet_To_Avro>.avsc”
}
}

The exception I am getting is -

java.lang.UnsupportedOperationException: makeParser not supported
	at io.druid.data.input.avro.AvroParseSpec.makeParser(AvroParseSpec.java:64) ~[?:?]
	at io.druid.data.input.impl.StringInputRowParser.initializeParser(StringInputRowParser.java:135) ~[druid-api-0.12.0.jar:0.12.0]
        at io.druid.data.input.impl.StringInputRowParser.startFileFromBeginning(StringInputRowParser.java:141) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.FileIteratingFirehose.getNextLineIterator(FileIteratingFirehose.java:91) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.FileIteratingFirehose.hasMore(FileIteratingFirehose.java:67) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.indexing.common.task.IndexTask.generateAndPublishSegments(IndexTask.java:660) ~[druid-indexing-service-0.12.0.jar:0.12.0]