RealtimeMetrics on indexing task

Hi all,

I’m trying to emit metrics from the indexing tasks. I’m using tranquility to send the tasks.

I tried setting this config on the middleManager properties file:

druid.indexer.fork.property.druid.monitoring.monitors=[“com.metamx.metrics.JvmMonitor”, “io.druid.segment.realtime.RealtimeMetricsMonitor”]

But I got this error msg when the task tries to start:

2015-05-04T10:02:24,183 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.server.DruidNode] from props[druid.] as [DruidNode{serviceName=‘middleManager’, host=‘rb8ccqeoh69v’, port=7081}]
2015-05-04T10:02:24,188 INFO [main] io.druid.server.metrics.MetricsModule - Adding monitor[com.metamx.metrics.JvmMonitor@49bcb0d6]
Exception in thread “main” com.google.inject.CreationException: Guice creation errors:

  1. No implementation for java.util.List<io.druid.segment.realtime.FireDepartment> was bound.
    while locating java.util.List<io.druid.segment.realtime.FireDepartment>
    for parameter 0 at io.druid.segment.realtime.RealtimeMetricsMonitor.(RealtimeMetricsMonitor.java:41)
    while locating io.druid.segment.realtime.RealtimeMetricsMonitor
    at io.druid.server.metrics.MetricsModule.getMonitorScheduler(MetricsModule.java:78)
    at io.druid.server.metrics.MetricsModule.getMonitorScheduler(MetricsModule.java:78)
    while locating com.metamx.metrics.MonitorScheduler
    at io.druid.server.metrics.MetricsModule.configure(MetricsModule.java:63)
    while locating com.metamx.metrics.MonitorScheduler annotated with @com.google.inject.name.Named(value=ForTheEagerness)
    1 error
    at com.google.inject.internal.Errors.throwCreationExceptionIfErrorsExist(Errors.java:448)
    at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:184)
    at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:110)
    at com.google.inject.Guice.createInjector(Guice.java:96)
    at com.google.inject.Guice.createInjector(Guice.java:73)
    at com.google.inject.Guice.createInjector(Guice.java:62)
    at io.druid.initialization.Initialization.makeInjectorWithModules(Initialization.java:369)
    at io.druid.cli.GuiceRunnable.makeInjector(GuiceRunnable.java:55)
    at io.druid.cli.CliPeon.run(CliPeon.java:207)
    at io.druid.cli.Main.main(Main.java:88)

Any ideas about how could I enable those metrics?

Thanks

Hi Carlos,
Can you share your task spec file for more info ?

Hi Nishant,

I can’t give you the task spec file nor the task json from the task log file since the task crashes before writing that data.

Following you can find the beam code that we use on tranquility. I hope it will be useful.

@Override
public Beam<Object> makeBeam(SystemStream stream, Config config)
{
    final int maxRows = Integer.valueOf(config.get("redborder.beam.flow.silver.maxrows", "200000"));
    final int partitions = Integer.valueOf(config.get("redborder.beam.flow.silver.partitions", "2"));
    final int replicas = Integer.valueOf(config.get("redborder.beam.flow.silver.replicas", "1"));
    final String intermediatePersist = config.get("redborder.beam.flow.silver.intermediatePersist", "PT20m");
    final String zkConnect = config.get("systems.kafka.consumer.zookeeper.connect");
    final String dataSource = stream.getStream();

    final List<String> dimensions = ImmutableList.of(
            APPLICATION_ID_NAME, BITFLOW_DIRECTION, CONVERSATION, DIRECTION,
            ENGINE_ID_NAME, HTTP_USER_AGENT_OS, HTTP_HOST, HTTP_SOCIAL_MEDIA,
            HTTP_SOCIAL_USER, HTTP_REFER_L1, L4_PROTO, IP_PROTOCOL_VERSION,
            SENSOR_NAME, SENSOR_IP, SCATTERPLOT, SRC_IP, SRC_COUNTRY_CODE, SRC_NET_NAME,
            SRC_PORT, SRC_AS_NAME, CLIENT_MAC, CLIENT_ID, CLIENT_MAC_VENDOR,
            DOT11STATUS, SRC_VLAN, SRC_MAP, SRV_PORT, DST_IP,
            DST_COUNTRY_CODE, DST_NET_NAME, DST_AS_NAME, DST_PORT,
            DST_VLAN, DST_MAP, INPUT_SNMP, OUTPUT_SNMP, TOS,
            CLIENT_LATLNG, COORDINATES_MAP, CLIENT_CAMPUS,
            CLIENT_BUILDING, CLIENT_FLOOR, WIRELESS_ID, CLIENT_RSSI, CLIENT_RSSI_NUM,
            CLIENT_SNR, CLIENT_SNR_NUM, WIRELESS_STATION, HNBLOCATION, HNBGEOLOCATION, RAT,
            DARKLIST_SCORE_NAME, DARKLIST_CATEGORY, DARKLIST_PROTOCOL,
            DARKLIST_DIRECTION, DARKLIST_SCORE);

    final List<AggregatorFactory> aggregators = ImmutableList.of(
            new CountAggregatorFactory(EVENTS_AGGREGATOR),
            new LongSumAggregatorFactory(SUM_BYTES_AGGREGATOR, BYTES),
            new LongSumAggregatorFactory(SUM_PKTS_AGGREGATOR, PKTS),
            new LongSumAggregatorFactory(SUM_RSSI_AGGREGATOR, CLIENT_RSSI_NUM),
            new LongSumAggregatorFactory(SUM_DL_SCORE_AGGREGATOR, DARKLIST_SCORE),
            new HyperUniquesAggregatorFactory(CLIENTS_AGGREGATOR, CLIENT_MAC),
            new HyperUniquesAggregatorFactory(WIRELESS_STATIONS_AGGREGATOR, WIRELESS_STATION)
    );

    // The Timestamper should return the timestamp of the class your Samza task produces. Samza envelopes contain
    // Objects, so you'll generally have to cast them here.
    final Timestamper<Object> timestamper = new Timestamper<Object>()
    {
        @Override
        public DateTime timestamp(Object obj)
        {
            final Map<String, Object> theMap = (Map<String, Object>) obj;
            Long date = Long.parseLong(theMap.get(TIMESTAMP).toString());
            date = date * 1000;
            return new DateTime(date.longValue());
        }
    };

    final CuratorFramework curator = CuratorFrameworkFactory.builder()
            .connectString(zkConnect)
            .retryPolicy(new ExponentialBackoffRetry(500, 15, 10000))
            .build();

    curator.start();

    return DruidBeams
            .builder(timestamper)
            .curator(curator)
            .discoveryPath("/druid/discoveryPath")
            .location(DruidLocation.create("overlord", "druid:local:firehose:%s", dataSource))
            .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularity.MINUTE))
            .druidTuning(DruidTuning.create(maxRows, new Period(intermediatePersist), 0))
            .tuning(ClusteredBeamTuning.builder()
                    .partitions(partitions)
                    .replicants(replicas)
                    .segmentGranularity(Granularity.HOUR)
                    .warmingPeriod(new Period("PT5M"))
                    .windowPeriod(new Period("PT15M"))
                    .build())
            .timestampSpec(new TimestampSpec(TIMESTAMP, "posix"))
            .buildBeam();
}

Thanks

You don’t need to include the RealtimeMetricsMonitor in your “monitors” when you’re using the indexing service. It’s included automatically for realtime tasks. So, if you take it out of that list, things should work OK.

You’re right, I took it out of the list and everything worked OK.
It seems like I didn’t wait long enough to see the metrics so I thought it was a configuration issue.

My fault then. Anyway, thanks for the help!