Sunday, October 13, 2013

Different strategies to monitor Storm Processor

This topic has come up multiple times in storm mailing list so I figure it would good to put some ideas together as well as sharing some hands on experience on how to monitor storm topologies.
We can group the monitoring metrics into these categories.

  • Independent metrics from each particular topology
  • JVM metrics
  • Storm metrics (i.e: tuples per minutes)
  • Miscellaneous (when using Kafka Spouts)

Independent metrics from each particular topologies

Each topology is different and is going to have different metrics to publish. This is not different than any other way you can monitor you web application. The key here is how to ship application specific metrics into a data collector for plotting and alerting. Just for completeness it's worth mentioning some.

  • Statsd directly (useful when wanting to see aggregated data). Here you would just instrument your code to output specific metrics. Statsd will later on (normally every minute) aggregate the received data and write to a backend such as Ganglia or graphite.


JVM metrics

This category is to group information such as Heap size, Heap used, GC time among others.

Something I have tried in the past was to co-locate JMXTrans in each Storm box and via Chef come up with the right ports to connect to. For example if in a particular box I set up two Storm workers  -6700 and 6701- and enable JMX for each of them as follows (via storm.yaml file):

-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.port=1%ID%

Note that "1%ID%" will be replaced with 16700 and 16701 respectively. This predictability lets me search via chef how many workers are there in each box (the workers are Chef attributes) , co-colocate JMXtrans service in the same box and dynamically set up JMXTrans config file for each worker.

Below is how I tell Chef how many workers I have in each machine.

default[:storm][:workers] = [6700, 6701]

This is an example of a JMXTrans config file for extracting data from the JVM that writes to a Ganglia collector.

{
  "servers" : [ {
    "port" : "<%= @service_port%>",
    "host" : "127.0.0.1",
    "alias" : "<%= node.name%>:<%= node.name%>",
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmheapmemory",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "<%= @service_name%>-heap",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }, {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmcmsoldgen",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:name=CMS Old Gen,type=MemoryPool",
      "resultAlias": "<%= @service_name%>-cmsoldgen",
      "attr" : [ "Usage" ]
    } ],
    "numQueryThreads" : 2
  } ]
}

Some notes about this approach:
  • When a worker is down, JMXTrans errors out so I had to set up a cron job that restarts JMXTrans service every 30 minutes to make sure I don't lose metrics.
  • High maintenance since it requires keeping both JMXTrans config and storm config in sync.


A second approach is to use Yammer Metrics JVM Instrumentation. For it to work you have to use a Reporting Writer to write the metrics to a backend collector. I use Ganglia and Statsd -which does not appear in the list-.  Here there is an adapted version of a Statsd reporter where we suppress specific metrics (Histograms) since statsd already constructs histograms for us. Yammer Metrics only uses gauges to publish the data so suppressig histograms doesn't seem to be a big deal.
The beauty of this is that you no longer have to keep two services config in sync. As soon as storm launches a topology it will start emitting JVM metrics.

This is a screenshot plotted with Graphite which is one of the backends statsd writes data to.

Storm Metrics

By "Storm Metrics" I meant metrics from Storm framework itself, such as Tuples Per minute, Latency, Capacity, etc.
At the time of writing this post we(RelateIQ) are using Storm 0.8.3 which has some support to attach hooks to components (Spouts and Bolts) and storm calls those hooks with information about the tuples that are being emitted, acked and failed.  This is an example of a Hook that writes those metrics to Yammer Metrics meters which are send out to Statds (in our case) every minute.



public class MeteredSpoutHook extends BaseMeteredHook {

 private Meter emittedTuples;
 private Meter ackedTuples;
 private Meter failedTuples;

 public MeteredSpoutHook() {}

 @Override
 public void emit(EmitInfo emitInfo) {
  emittedTuples.mark();
 }

 @Override
 public void spoutAck(SpoutAckInfo spoutAckInfo) {
  ackedTuples.mark();
  // TODO Do something with: spoutAckInfo.completeLatencyMs
 }

 @Override
 public void spoutFail(SpoutFailInfo spoutFailInfo) {
  failedTuples.mark();
  // TODO Do something with:  spoutFailInfo.failLatencyMs
 }

 @Override
 public void init(TopologyContext context, String topologyName, String label) {

  MetricName emittedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "EmittedTuples");
  emittedTuples = Metrics.newMeter(emittedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName ackedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "AckedTuples");
  ackedTuples = Metrics.newMeter(ackedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName failedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "FailedTuples");
  failedTuples = Metrics.newMeter(failedTuplesMetricName, "tuples", TimeUnit.MINUTES);
 }

}

Miscellaneous

If you happen to be using Kafka Spouts as one of your spouts implementation, a project I found very useful was stormkafkamon (original and fork which fixes some bugs). It is intended to work in conjunction with Kafka spout from storm-contrib. This Spout stores a watermark in Zookeeper and stormkafkamon reads the latest offset from Kafka broker and from Zookeeper and shows the delta in a nice formatted way. 

+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
|    Broker    |  Topic   | Partition |   Earliest   |    Latest    |    Depth    |   Spout  |   Current    | Delta |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
| kafka-broker | TOPIC_1  |     0     | 17314225754  | 18492471767  |  1178246013 | TOPIC_1  | 18492470390  |  1377 |
| kafka-broker | TOPIC_2  |     0     | 85228601970  | 89208988484  |  3980386514 | TOPIC_2  | 89208987752  |  732  |
| kafka-broker | TOPIC_3  |     0     | 457686650224 | 484187159862 | 26500509638 | TOPIC_3  | 484187157164 |  2698 |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+

Number of brokers:       1
Number of partitions:    3
Total broker depth:      31659142165
Total delta:             4807


And with some bash script foo you could add a cron job that runs it once a minute, parse the data and send it to statsd (Cheers to my coworker Jón Grétarsson for writing this script).




If you feel like the explanation is not accurate, have specific questions or have any other feedback please shoot me an email to patricioe at gmail dot com.


Saturday, June 15, 2013

Notes From The Cassandra Summit 2013

It was nice to see my former Datastax coworkers during the summit that took place at fort Mason in San Francisco. As expected, each Cassandra summit is becoming more and more crowded which proves that the technology is getting more popular in the industry. I was very impressed and happy at the same time to see that all sort of companies are moving towards using Cassandra for different areas of their stack. Few of them include Instagram replacing their sharded Redis cluster with a Cassandra one running on SSDs serving peaks of 20k write operations 15k reads per second, and Spotify serving their 24+ million users out of Cassandra cluster with about 300 Cassandra nodes between the 24 different services that use the database to store 50TB of data.
Solid state Drives (SSDs) give really high throughput specially if your access pattern is mostly random. If you manage your own hardware you should definitely consider SSDs for the data partition. If AWS is your call then hi1.4xlarge is your friend.

Saturday, June 01, 2013

Distributed Cassandra-based Locks in Hector Client

After almost 3 years of not updating this blog I decided that writing about the latest interesting feature in Hector was a good excuse to break the ice. I wrote the first implementation of distributed lock support for Hector on July 15th 2012 and Todd Nine took it to the next step.
The feature is an implementation of Dominic Williams's Wait Chain with minor adjustments, backed 100% by Cassandra, which means that it is horizontally scalable.

The framework is composed by three main entities:
  • HLock : Self explanatory. It is the lock we are trying to acquire.
  • HLockManager : The entity responsible to acquiring and releasing the lock, and
  • HLockManagerConfigurator : Responsible to configuring the lock system. HLMC from now on.

HLMC defines important properties needed for the normal operation of the lock system. Hector implements this feature by storing information in HLocks column family under a specific keyspace  HLockingManager with a default replication factor of 3. Additionally row cache is enabled by default and the locks last 5 seconds after which the lock will expire.

All the above mentioned properties can be change via HLMC.

How to initialize the locking system


The following snippet of code shows how to initialize the framework and can be place along to where you set up Hector's CassandraHostConfigurator
   
// Initialize Locking Framework
cluster = getOrCreateCluster("MyCluster", getCHCForTest());
HLockManagerConfigurator hlc = new HLockManagerConfigurator();
hlc.setReplicationFactor(1);
lm = new HLockManagerImpl(cluster, hlc);
lm.init();

Acquiring and Releasing Locks


This snippet shows to to use the locks. It assumes you hold an instance of LockManager somewhere. Guice and Spring are good frameworks to solve this problem.
HLock lock = lm.createLock("/Users/patricioe");
try {
    lm.acquire(lock);

    // Do something ...
} finally {
    lm.release(lock)
}

Thread safety


The implementation of HLockManager (HLockManagerImp) is thread safe and thus can be share across different threads. Instances of HLock (HLockImp) on the other hand are state-full  and should not be share across threads. They are meant to be created and release within a short period of time (5 Seconds by default).

Miscellaneous 


Besides the fact that some people are using this feature I recommend to you to give it a try and send us feedback or questions to hector-users@googlegroups.com or here in this blog. Hope you enjoyed the reading.

You should follow me on Twitter @patricioe