Realtime node holding on to segments

My realtime nodes are holding on to their segments until the disk is full - after having worked with Druid the past 2+ months, I know I’m probably missing a setting, or misunderstanding one :slight_smile:

The segments are getting pushed to HDFS after 10 minutes, which I would expect:

“intermediatePersistPeriod”: “PT10m”,

“windowPeriod”: “PT10m”,

Any thoughts?

========== spec file ===========

[

{

“dataSchema” : {

“dataSource” : “annotated_flow”,

“parser” : {

“type” : “string”,

“parseSpec” : {

“format” : “csv”,

“timestampSpec” : {

“column” : “timestamp”,

“format” : “auto”

},

“columns”: [“timestamp”, “src_addr”, “src_v6_addr”, “src_port”, “src_vlan”, “src_match_flags”, “src_arbor_locid”, “src_origin_asn”, “src_peer_asn”, “src_aspath_gid”, “src_communities”, “src_num_communities”, “src_nexthop_addr”, “src_asadjacency_ases”, “src_pfroute”, “src_external_ases”, “src_num_external_ases”, “src_external_as_offset”, “dst_addr”, “dst_v6_addr”, “dst_port”, “dst_vlan”, “dst_match_flags”, “dst_arbor_locid”, “dst_origin_asn”, “dst_peer_asn”, “dst_aspath_gid”, “dst_communities”, “dst_num_communities”, “dst_nexthop_addr”, “dst_asadjacency_ases”, “dst_pfroute”, “dst_external_ases”, “dst_num_external_ases”, “dst_external_as_offset”, “device_type”, “router”, “start_tv_sec”, “start_tv_usec”, “duration_tv_sec”, “duration_tv_usec”, “class_flags”, “sampling_rate”, “scale”, “missing”, “next_hop”, “input_iface”, “output_iface”, “bin_port_flag”, “ip_ver”, “tcp_flags”, “ip_proto”, “tos”, “ttl”, “mpls_label”, “num_mpls_label”, “mpls_pe_addr”, “v9_set”, “v9_set_arbor”, “router_gid”, “appid”, “http_status_codes”, “sip_invites”, “retransmit_percent”, “out_of_order_percent”, “rtt”, “window_size”, “missing_percent”, “delay_variation”, “dns_request_flags”, “matched_blobs_0_gid”, “matched_blobs_0_flags”, “num_matched_blobs”, “input_iface_gid”, “output_iface_gid”, “tms_port_gid”, “tms_vlan_gid”, “pkts_sent”, “bytes_sent”, “unscaled_pkts_sent”, “unscaled_bytes_sent”, “avg_pkt_len”],

“dimensionsSpec” : {

“dimensions”: [“src_addr”, “src_v6_addr”, “src_port”, “src_vlan”, “src_match_flags”, “src_arbor_locid”, “src_origin_asn”, “src_peer_asn”, “src_aspath_gid”, “src_communities”, “src_num_communities”, “src_nexthop_addr”, “src_asadjacency_ases”, “src_pfroute”, “src_external_ases”, “src_num_external_ases”, “src_external_as_offset”, “dst_addr”, “dst_v6_addr”, “dst_port”, “dst_vlan”, “dst_match_flags”, “dst_arbor_locid”, “dst_origin_asn”, “dst_peer_asn”, “dst_aspath_gid”, “dst_communities”, “dst_num_communities”, “dst_nexthop_addr”, “dst_asadjacency_ases”, “dst_pfroute”, “dst_external_ases”, “dst_num_external_ases”, “dst_external_as_offset”, “device_type”, “router”, “start_tv_sec”, “start_tv_usec”, “duration_tv_sec”, “duration_tv_usec”, “class_flags”, “sampling_rate”, “scale”, “missing”, “next_hop”, “input_iface”, “output_iface”, “bin_port_flag”, “ip_ver”, “tcp_flags”, “ip_proto”, “tos”, “ttl”, “mpls_label”, “num_mpls_label”, “mpls_pe_addr”, “v9_set”, “v9_set_arbor”, “router_gid”, “appid”, “http_status_codes”, “sip_invites”, “retransmit_percent”, “out_of_order_percent”, “rtt”, “window_size”, “missing_percent”, “delay_variation”, “dns_request_flags”, “matched_blobs_0_gid”, “matched_blobs_0_flags”, “num_matched_blobs”, “input_iface_gid”, “output_iface_gid”, “tms_port_gid”, “tms_vlan_gid”],

“dimensionExclusions” : [“pkts_sent”, “bytes_sent”, “unscaled_pkts_sent”, “unscaled_bytes_sent”, “avg_pkt_len”],

“spatialDimensions” :

}

}

},

“metricsSpec” : [{

“type” : “count”,

“name” : “count”

}, {

“type” : “longSum”,

“name” : “pkts_sent”,

“fieldName” : “pkts_sent”

}, {

“type” : “longSum”,

“name” : “bytes_sent”,

“fieldName” : “bytes_sent”

}, {

“type” : “longSum”,

“name” : “unscaled_pkts_sent”,

“fieldName” : “unscaled_pkts_sent”

}, {

“type” : “longSum”,

“name” : “unscaled_bytes_sent”,

“fieldName” : “unscaled_bytes_sent”

}, {

“type” : “longSum”,

“name” : “avg_pkt_len”,

“fieldName” : “avg_pkt_len”

}],

“granularitySpec” : {

“type” : “uniform”,

“segmentGranularity” : “MINUTE”,

“queryGranularity” : “NONE”

}

},

“ioConfig” : {

“type” : “realtime”,

“firehose”: {

“type”: “kafka-0.8”,

“consumerProps”: {

“zookeeper.connect”: “10.8.30.98:2181”,

“zookeeper.connection.timeout.ms” : “15000”,

“zookeeper.session.timeout.ms” : “15000”,

“zookeeper.sync.time.ms” : “5000”,

“consumer.id”: “druid05”,

“client.id”: “druid05”,

“group.id”: “druid-example”,

“fetch.message.max.bytes” : “1048586”,

“auto.offset.reset”: “largest”,

“auto.commit.enable”: “false”

},

“feed”: “annotated_flow”

},

“plumber”: {

“type”: “realtime”

}

},

“tuningConfig”: {

“type” : “realtime”,

“maxRowsInMemory”: 200000,

“intermediatePersistPeriod”: “PT10m”,

“windowPeriod”: “PT10m”,

“basePersistDirectory”: “/tmp/realtime/basePersist”,

“rejectionPolicy”: {

“type”: “messageTime”

},

“shardSpec”: {

“type”: “linear”,

“partitionNum”: 0

}

}

}

]

====== realtime config ==========

Druid - a distributed column store.

Copyright 2012 - 2015 Metamarkets Group Inc.

Hi Ron, are there any interesting logs in your coordinator node? One common reason why handoff stops is because historicals are full and cannot load any more segments.

Ah, there it is. I didn’t have historical nodes running (doing a weird experiment), and so lots of these are in the log:

2015-07-27T06:04:57,639 WARN [Coordinator-Exec–0] io.druid.server.coordinator.rules.LoadRule - Not enough [_default_tier] servers or node capacity to assign segment[annotated_flow_2015-07-25T21:38:00.000Z_2015-07-25T21:39:00.000Z_2015-07-25T21:38:00.000Z_1]! Expected Replicants[2]

I’m assuming that’s it at least - do you agree?

It’s like you guys know druid well or something :wink:

Thanks!

Ron

Followup question: Why do some of my queries time out after 5 minutes?

{

“error” : “Query timeout”

}

I don’t have a timeout set…

Ron, default query timeout is 5 minutes, which also matches the idle timeout on http connections.
Druid’s main use case is to support interactive queries, where anything that longer than a few minutes rarely makes sense.

If you need to adjust the default timeout, you can do so by changing the following property, e.g. to use 20min:

druid.server.http.maxIdleTime=PT20M

Hmmm. You say Druid’s main use case is interactive queries - do you think it is a poor choice for longer queries? While we want interactive queries, we also have usage cases that could easily require longer queries (we’re looking at ingesting 200k+ per second and holding on to the data on the order of months). Our customers would be running monthly reports, and don’t mind waiting many minutes for responses. Of course, we’re on the early side of development/experimentation and so don’t exactly know the end product yet.

Thanks for the response!

Ron

Hi Ron,

Druid’s main value add is fast, interactive queries currently. Result sets for this use case tend to be aggregated and much smaller than the input set. If your result set is close to your input set in size (billions of rows), you can consider using Hadoop as no system will be able to do that query interactively. With that said however, Druid is often used for reporting use cases (with pagination - where each result is a few hundred thousand rows at most) and we will add more features in the future for better support of traditional reporting.

The result set will rarely be on the order of the input set (although we have a potential side case where it might be, but we can break that into sub queries if needed).

Mostly it will be TopN queries and aggregation. However, like I said, querying for TopN over dozens of TBs of (uncompressed data) is a main usage case (“reports”).

I was just making sure Druid doesn’t break down if you are doing huge queries. Typically there is only one query running at any time (one user).

Ron

It’s definitely possible for Druid to do interactive queries on months of 200k/sec data, with the right configuration and hardware. If you want to shoot for that, then minutes-long queries may not be something you need to worry about at all.

In general the guidance I would follow for “big” queries is:

  1. Don’t worry about how much data is being scanned- it’s ok to scan a lot of data for a single query.

  2. Don’t worry too much about how long the query takes. You may have to increase the query timeout, and if you do a lot of concurrent slow queries you may need to use a dedicated broker and leverage query priorities, but long-running queries are OK with appropriate configuration.

  3. DO worry about the size of the result set. Druid is not currently a good solution for single queries with very large result sets (more than a few hundred thousand rows per query). One common workaround for that is to split your single large query into many smaller queries. This could be done through filters, or using different intervals, or using topNs with the “Lexicographic TopNMetricSpec” (see http://druid.io/docs/latest/querying/topnmetricspec.html).

Ok, that sounds great. Well within our usage case. Right now I’m bumping on the timeout because I’m doing a proof of concept on limited hardware just to show the decision makers that Druid can do what we need it to do.

Thanks again!

Ron

Great to hear. Good luck with your POC!