Sunday, October 13, 2013

Different strategies to monitor Storm Processor

This topic has come up multiple times in storm mailing list so I figure it would good to put some ideas together as well as sharing some hands on experience on how to monitor storm topologies.
We can group the monitoring metrics into these categories.

  • Independent metrics from each particular topology
  • JVM metrics
  • Storm metrics (i.e: tuples per minutes)
  • Miscellaneous (when using Kafka Spouts)

Independent metrics from each particular topologies

Each topology is different and is going to have different metrics to publish. This is not different than any other way you can monitor you web application. The key here is how to ship application specific metrics into a data collector for plotting and alerting. Just for completeness it's worth mentioning some.

  • Statsd directly (useful when wanting to see aggregated data). Here you would just instrument your code to output specific metrics. Statsd will later on (normally every minute) aggregate the received data and write to a backend such as Ganglia or graphite.


JVM metrics

This category is to group information such as Heap size, Heap used, GC time among others.

Something I have tried in the past was to co-locate JMXTrans in each Storm box and via Chef come up with the right ports to connect to. For example if in a particular box I set up two Storm workers  -6700 and 6701- and enable JMX for each of them as follows (via storm.yaml file):

-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.port=1%ID%

Note that "1%ID%" will be replaced with 16700 and 16701 respectively. This predictability lets me search via chef how many workers are there in each box (the workers are Chef attributes) , co-colocate JMXtrans service in the same box and dynamically set up JMXTrans config file for each worker.

Below is how I tell Chef how many workers I have in each machine.

default[:storm][:workers] = [6700, 6701]

This is an example of a JMXTrans config file for extracting data from the JVM that writes to a Ganglia collector.

{
  "servers" : [ {
    "port" : "<%= @service_port%>",
    "host" : "127.0.0.1",
    "alias" : "<%= node.name%>:<%= node.name%>",
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmheapmemory",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "<%= @service_name%>-heap",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }, {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmcmsoldgen",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:name=CMS Old Gen,type=MemoryPool",
      "resultAlias": "<%= @service_name%>-cmsoldgen",
      "attr" : [ "Usage" ]
    } ],
    "numQueryThreads" : 2
  } ]
}

Some notes about this approach:
  • When a worker is down, JMXTrans errors out so I had to set up a cron job that restarts JMXTrans service every 30 minutes to make sure I don't lose metrics.
  • High maintenance since it requires keeping both JMXTrans config and storm config in sync.


A second approach is to use Yammer Metrics JVM Instrumentation. For it to work you have to use a Reporting Writer to write the metrics to a backend collector. I use Ganglia and Statsd -which does not appear in the list-.  Here there is an adapted version of a Statsd reporter where we suppress specific metrics (Histograms) since statsd already constructs histograms for us. Yammer Metrics only uses gauges to publish the data so suppressig histograms doesn't seem to be a big deal.
The beauty of this is that you no longer have to keep two services config in sync. As soon as storm launches a topology it will start emitting JVM metrics.

This is a screenshot plotted with Graphite which is one of the backends statsd writes data to.

Storm Metrics

By "Storm Metrics" I meant metrics from Storm framework itself, such as Tuples Per minute, Latency, Capacity, etc.
At the time of writing this post we(RelateIQ) are using Storm 0.8.3 which has some support to attach hooks to components (Spouts and Bolts) and storm calls those hooks with information about the tuples that are being emitted, acked and failed.  This is an example of a Hook that writes those metrics to Yammer Metrics meters which are send out to Statds (in our case) every minute.



public class MeteredSpoutHook extends BaseMeteredHook {

 private Meter emittedTuples;
 private Meter ackedTuples;
 private Meter failedTuples;

 public MeteredSpoutHook() {}

 @Override
 public void emit(EmitInfo emitInfo) {
  emittedTuples.mark();
 }

 @Override
 public void spoutAck(SpoutAckInfo spoutAckInfo) {
  ackedTuples.mark();
  // TODO Do something with: spoutAckInfo.completeLatencyMs
 }

 @Override
 public void spoutFail(SpoutFailInfo spoutFailInfo) {
  failedTuples.mark();
  // TODO Do something with:  spoutFailInfo.failLatencyMs
 }

 @Override
 public void init(TopologyContext context, String topologyName, String label) {

  MetricName emittedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "EmittedTuples");
  emittedTuples = Metrics.newMeter(emittedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName ackedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "AckedTuples");
  ackedTuples = Metrics.newMeter(ackedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName failedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "FailedTuples");
  failedTuples = Metrics.newMeter(failedTuplesMetricName, "tuples", TimeUnit.MINUTES);
 }

}

Miscellaneous

If you happen to be using Kafka Spouts as one of your spouts implementation, a project I found very useful was stormkafkamon (original and fork which fixes some bugs). It is intended to work in conjunction with Kafka spout from storm-contrib. This Spout stores a watermark in Zookeeper and stormkafkamon reads the latest offset from Kafka broker and from Zookeeper and shows the delta in a nice formatted way. 

+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
|    Broker    |  Topic   | Partition |   Earliest   |    Latest    |    Depth    |   Spout  |   Current    | Delta |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
| kafka-broker | TOPIC_1  |     0     | 17314225754  | 18492471767  |  1178246013 | TOPIC_1  | 18492470390  |  1377 |
| kafka-broker | TOPIC_2  |     0     | 85228601970  | 89208988484  |  3980386514 | TOPIC_2  | 89208987752  |  732  |
| kafka-broker | TOPIC_3  |     0     | 457686650224 | 484187159862 | 26500509638 | TOPIC_3  | 484187157164 |  2698 |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+

Number of brokers:       1
Number of partitions:    3
Total broker depth:      31659142165
Total delta:             4807


And with some bash script foo you could add a cron job that runs it once a minute, parse the data and send it to statsd (Cheers to my coworker Jón Grétarsson for writing this script).




If you feel like the explanation is not accurate, have specific questions or have any other feedback please shoot me an email to patricioe at gmail dot com.