Sunday, October 13, 2013

Different strategies to monitor Storm Processor

This topic has come up multiple times in storm mailing list so I figure it would good to put some ideas together as well as sharing some hands on experience on how to monitor storm topologies.
We can group the monitoring metrics into these categories.

  • Independent metrics from each particular topology
  • JVM metrics
  • Storm metrics (i.e: tuples per minutes)
  • Miscellaneous (when using Kafka Spouts)

Independent metrics from each particular topologies

Each topology is different and is going to have different metrics to publish. This is not different than any other way you can monitor you web application. The key here is how to ship application specific metrics into a data collector for plotting and alerting. Just for completeness it's worth mentioning some.

  • Statsd directly (useful when wanting to see aggregated data). Here you would just instrument your code to output specific metrics. Statsd will later on (normally every minute) aggregate the received data and write to a backend such as Ganglia or graphite.


JVM metrics

This category is to group information such as Heap size, Heap used, GC time among others.

Something I have tried in the past was to co-locate JMXTrans in each Storm box and via Chef come up with the right ports to connect to. For example if in a particular box I set up two Storm workers  -6700 and 6701- and enable JMX for each of them as follows (via storm.yaml file):

-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.port=1%ID%

Note that "1%ID%" will be replaced with 16700 and 16701 respectively. This predictability lets me search via chef how many workers are there in each box (the workers are Chef attributes) , co-colocate JMXtrans service in the same box and dynamically set up JMXTrans config file for each worker.

Below is how I tell Chef how many workers I have in each machine.

default[:storm][:workers] = [6700, 6701]

This is an example of a JMXTrans config file for extracting data from the JVM that writes to a Ganglia collector.

{
  "servers" : [ {
    "port" : "<%= @service_port%>",
    "host" : "127.0.0.1",
    "alias" : "<%= node.name%>:<%= node.name%>",
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmheapmemory",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "<%= @service_name%>-heap",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }, {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmcmsoldgen",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:name=CMS Old Gen,type=MemoryPool",
      "resultAlias": "<%= @service_name%>-cmsoldgen",
      "attr" : [ "Usage" ]
    } ],
    "numQueryThreads" : 2
  } ]
}

Some notes about this approach:
  • When a worker is down, JMXTrans errors out so I had to set up a cron job that restarts JMXTrans service every 30 minutes to make sure I don't lose metrics.
  • High maintenance since it requires keeping both JMXTrans config and storm config in sync.


A second approach is to use Yammer Metrics JVM Instrumentation. For it to work you have to use a Reporting Writer to write the metrics to a backend collector. I use Ganglia and Statsd -which does not appear in the list-.  Here there is an adapted version of a Statsd reporter where we suppress specific metrics (Histograms) since statsd already constructs histograms for us. Yammer Metrics only uses gauges to publish the data so suppressig histograms doesn't seem to be a big deal.
The beauty of this is that you no longer have to keep two services config in sync. As soon as storm launches a topology it will start emitting JVM metrics.

This is a screenshot plotted with Graphite which is one of the backends statsd writes data to.

Storm Metrics

By "Storm Metrics" I meant metrics from Storm framework itself, such as Tuples Per minute, Latency, Capacity, etc.
At the time of writing this post we(RelateIQ) are using Storm 0.8.3 which has some support to attach hooks to components (Spouts and Bolts) and storm calls those hooks with information about the tuples that are being emitted, acked and failed.  This is an example of a Hook that writes those metrics to Yammer Metrics meters which are send out to Statds (in our case) every minute.



public class MeteredSpoutHook extends BaseMeteredHook {

 private Meter emittedTuples;
 private Meter ackedTuples;
 private Meter failedTuples;

 public MeteredSpoutHook() {}

 @Override
 public void emit(EmitInfo emitInfo) {
  emittedTuples.mark();
 }

 @Override
 public void spoutAck(SpoutAckInfo spoutAckInfo) {
  ackedTuples.mark();
  // TODO Do something with: spoutAckInfo.completeLatencyMs
 }

 @Override
 public void spoutFail(SpoutFailInfo spoutFailInfo) {
  failedTuples.mark();
  // TODO Do something with:  spoutFailInfo.failLatencyMs
 }

 @Override
 public void init(TopologyContext context, String topologyName, String label) {

  MetricName emittedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "EmittedTuples");
  emittedTuples = Metrics.newMeter(emittedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName ackedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "AckedTuples");
  ackedTuples = Metrics.newMeter(ackedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName failedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "FailedTuples");
  failedTuples = Metrics.newMeter(failedTuplesMetricName, "tuples", TimeUnit.MINUTES);
 }

}

Miscellaneous

If you happen to be using Kafka Spouts as one of your spouts implementation, a project I found very useful was stormkafkamon (original and fork which fixes some bugs). It is intended to work in conjunction with Kafka spout from storm-contrib. This Spout stores a watermark in Zookeeper and stormkafkamon reads the latest offset from Kafka broker and from Zookeeper and shows the delta in a nice formatted way. 

+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
|    Broker    |  Topic   | Partition |   Earliest   |    Latest    |    Depth    |   Spout  |   Current    | Delta |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
| kafka-broker | TOPIC_1  |     0     | 17314225754  | 18492471767  |  1178246013 | TOPIC_1  | 18492470390  |  1377 |
| kafka-broker | TOPIC_2  |     0     | 85228601970  | 89208988484  |  3980386514 | TOPIC_2  | 89208987752  |  732  |
| kafka-broker | TOPIC_3  |     0     | 457686650224 | 484187159862 | 26500509638 | TOPIC_3  | 484187157164 |  2698 |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+

Number of brokers:       1
Number of partitions:    3
Total broker depth:      31659142165
Total delta:             4807


And with some bash script foo you could add a cron job that runs it once a minute, parse the data and send it to statsd (Cheers to my coworker Jón Grétarsson for writing this script).




If you feel like the explanation is not accurate, have specific questions or have any other feedback please shoot me an email to patricioe at gmail dot com.


Saturday, June 15, 2013

Notes From The Cassandra Summit 2013

It was nice to see my former Datastax coworkers during the summit that took place at fort Mason in San Francisco. As expected, each Cassandra summit is becoming more and more crowded which proves that the technology is getting more popular in the industry. I was very impressed and happy at the same time to see that all sort of companies are moving towards using Cassandra for different areas of their stack. Few of them include Instagram replacing their sharded Redis cluster with a Cassandra one running on SSDs serving peaks of 20k write operations 15k reads per second, and Spotify serving their 24+ million users out of Cassandra cluster with about 300 Cassandra nodes between the 24 different services that use the database to store 50TB of data.
Solid state Drives (SSDs) give really high throughput specially if your access pattern is mostly random. If you manage your own hardware you should definitely consider SSDs for the data partition. If AWS is your call then hi1.4xlarge is your friend.

Saturday, June 01, 2013

Distributed Cassandra-based Locks in Hector Client

After almost 3 years of not updating this blog I decided that writing about the latest interesting feature in Hector was a good excuse to break the ice. I wrote the first implementation of distributed lock support for Hector on July 15th 2012 and Todd Nine took it to the next step.
The feature is an implementation of Dominic Williams's Wait Chain with minor adjustments, backed 100% by Cassandra, which means that it is horizontally scalable.

The framework is composed by three main entities:
  • HLock : Self explanatory. It is the lock we are trying to acquire.
  • HLockManager : The entity responsible to acquiring and releasing the lock, and
  • HLockManagerConfigurator : Responsible to configuring the lock system. HLMC from now on.

HLMC defines important properties needed for the normal operation of the lock system. Hector implements this feature by storing information in HLocks column family under a specific keyspace  HLockingManager with a default replication factor of 3. Additionally row cache is enabled by default and the locks last 5 seconds after which the lock will expire.

All the above mentioned properties can be change via HLMC.

How to initialize the locking system


The following snippet of code shows how to initialize the framework and can be place along to where you set up Hector's CassandraHostConfigurator
   
// Initialize Locking Framework
cluster = getOrCreateCluster("MyCluster", getCHCForTest());
HLockManagerConfigurator hlc = new HLockManagerConfigurator();
hlc.setReplicationFactor(1);
lm = new HLockManagerImpl(cluster, hlc);
lm.init();

Acquiring and Releasing Locks


This snippet shows to to use the locks. It assumes you hold an instance of LockManager somewhere. Guice and Spring are good frameworks to solve this problem.
HLock lock = lm.createLock("/Users/patricioe");
try {
    lm.acquire(lock);

    // Do something ...
} finally {
    lm.release(lock)
}

Thread safety


The implementation of HLockManager (HLockManagerImp) is thread safe and thus can be share across different threads. Instances of HLock (HLockImp) on the other hand are state-full  and should not be share across threads. They are meant to be created and release within a short period of time (5 Seconds by default).

Miscellaneous 


Besides the fact that some people are using this feature I recommend to you to give it a try and send us feedback or questions to hector-users@googlegroups.com or here in this blog. Hope you enjoyed the reading.

You should follow me on Twitter @patricioe



Thursday, December 16, 2010

How to create a custom ClockResolution in Hector (0.7.0-x)


Create your own ClockResolution in Hector (as of 0.7.0-22) is easy. We have modified the code a bit to allow Hector clients to define their own implementation if needed. Let's assume we want to create a clock resolution class that generates negative numbers (this is not a useful example. It is only to show how it is possible).
The way to do it is by creating a class that implements me.prettyprint.hector.api.ClockResolution



    // Define my own clock resolution.
    class SequentialClockResolution implements ClockResolution {
        @Override
        public long createClock() {
            return System.currentTimeMillis() * -1;
        }
    }

And now we have to tell Hector to use it.

    CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost");

    cassandraHostConfigurator.setClockResolution(new SequentialClockResolution());

As an additional information, this is the class diagram involved in ClockResolution.

Monday, October 18, 2010

Evaluating timestamp resolution algorithms in Hector Framework (Cassandra DB)

Motivation:
As many of you might know, Java does not provide microseconds granularity. When using Cassandra DB you need to make sure that your clients have their clocks in sync. That is the requirement. Other non-relational databases like Voldemort, this is not necessary (they use Vector Clocks).
If we decided to use milliseconds resolution (what Java provides), sometimes if there are two inserts (or one insert and one delete) within the same millisecond (typical example in Lucandra when updating a row),
the second operation gets discarded as it happens at the same time than the previous request. This is why we had to come up with a way to provide every thread that requests a time a unique timestamp.

Goal:
 Evaluate the performance of the three Hector time resolution algorithms with Microseconds accuracy.
-          The fully synchronized algorithm guarantees unique timestamp in microseconds per threads and across threads.[1]
-          The second one uses Atomic Longs to accomplish the same goal than the first one, but presents some race condition and the results might not be guaranteed across threads. I contributed this code and it is in Hector branch 6 but not main branch. [2]
-          The third one does not guaranteed that a thread in two consecutives calls obtain the same timestamp. (risk of missing operations as Cassandra discard them if they contain the  same timestamp). Currently in hector main branch. [3]

Conclusion:
The fully synchronized timestamp resolution algorithm present a decent TPS, almost imperceptible comparing with the non-synchronized one and providing a “bullet proof” mechanisms to guarantee  unique time resolution within the same thread and across threads as well.

Environment:
-          Machine: rwc-devlinux5
-          CPUs:  2 X QC x 3.0gh
-          RAM: 32 GB
-          SO:  Red Hat (RHEL 5.3)

About the test:
I included the result of 3 benchmarks. With 8 threads, with 50 and 100(with no warm-up).
Per benchmark, there are 3 sub-benchmark where I compare the behavior of 3 algorithms to calculate the timestamp:
-          Fully synchronized
-          Partially synchronized, and  (use AtomicLong)
-          Not synchronized at all
Each thread execute a couple of thousand operations.
Each operation does 3 things:

  double res = Math.random() * 13;
  String s = "something" + res;
  TimestampResolutionNotSync.MICROSECONDS.createTimestamp();

Before the benchmark, I let the test run 5k operation with no time calculation to warm-up. Without it, the first set of test to be executed always shows a slightly less performance. So I consider it was fair to include the warm-up first. The 100 threads test has not warm-up.

The overall operation is 5,000,000. Regardless the number of threads. This way I can see how fast or slow  it works.

How to interpret the results:
-          Look at the attached files. There 2 files per benchmark. (one shows the TPS and media, and the other one shows the CPU usage)
-          For example: 100Threads-15secs-50kops means:
o   100 threads
o   Graphic has timeframes of 15 secs
o   Each thread perform 50 K operations
-          There are three colors indicating the performance of the three algorithms (Blue, Red and Green)
        

BLUE: the createTimestamp method is fully synchronized
GREEN: Partially synchronized using atomic long with possible race conditions.
RED: Not synchronized at all with high risk of getting the same timestamp within and across threads.




Case 1: 8 Threads - 1000 K operations.
Note: Lower values are better. It shows the media of each algorithm.




Case 1: 8 Threads - 1000 K operations.
Note: TPS (transactions per second). Higher values are better here.



Case 1: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).



Case 1b: 8 Threads - 1000 K operations with warmup (5K operation per thread and non-synchronized is first)
Note: Including a warmup and switching the non-synchronized operation in the first place seems to make a different. The lowest media is of course for the non-synchronized algorithm, the highest value is for the fully synchronized algorithm and as expected, the algorithm that uses AtomicLong is slightly in between (lower values are better).



Case 1b: 8 Threads - 1000 K operations with warmup (5K operation per thread and non-synchronized is first)
Note:  In this second graph second case (TPS), the three algorithms seem to perform just as good. (highest values are better)




Case 2: 50 Threads - 100 K operations with warmup (lower values are better).



Case 2: 50 Threads - 100 K operations with warmup(higher values are better).


Case 2: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).





Case 2: 100 Threads - 50 K operations.



Case 3: 100 Threads - 50 K operations.


Case 3: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).


Source code
[1]


public enum TimestampResolutionFullySynchronized {
  SECONDS, MILLISECONDS, MICROSECONDS;

  /**
   * The last time value issued. Used to try to prevent duplicates.
   */
  private static long lastTime = -1;

  private static final long ONE_THOUSAND = 1000L;

  public synchronized  long createTimestamp() {
    long current = System.currentTimeMillis();
    switch(this) {
    case MICROSECONDS:
      // The following simulates a microseconds resolution 
      // by advancing a static counter every time
      // a client calls the createClock method, simulating a tick.
      long us = current * ONE_THOUSAND;
      if (us > lastTime) {
        lastTime = us;
      } else {  
        // the time i got from the system is equals or less 
        // (hope not - clock going backwards)
        // One more "microsecond"
        us = ++lastTime;
      }
      return us;
    case MILLISECONDS:
      return current;
    case SECONDS:
      return current / 1000;
    };
    return current;
  }
}



[2]

public enum TimestampResolutionPartiallySynchronized {
      SECONDS, MILLISECONDS, MICROSECONDS;

      /**
       * The last time value issued. Used to try to prevent duplicates.
       */
      private static final AtomicLong lastTime = new AtomicLong(Long.MIN_VALUE);

      public long createTimestamp() {
        long current = System.currentTimeMillis();
        switch(this) {
        case MICROSECONDS:
          // The following simmulates a microsec resolution 
          // by advancing a static counter every time
          // a client calls the createClock method, simulating a tick.
          long us = current * 1000;
          if (us > lastTime.longValue()) {
            lastTime.set(us);
          } else
            // the time i got from the system is equals or less 
            // (hope not - clock going backwards)
            // One more "microsecond"
            us = lastTime.incrementAndGet();
          }
          return us;
        case MILLISECONDS:
          return current;
        case SECONDS:
          return current / 1000;
        };
        return current;
      }
}



[3]


public enum TimestampResolutionSimpleMicro {
      SECONDS, MILLISECONDS, MICROSECONDS;

      private static final long ONE_THOUSAND = 1000L;

      public long createTimestamp() {
        long current = System.currentTimeMillis();
        switch(this) {
        case MICROSECONDS:
          return current * ONE_THOUSAND;
        case MILLISECONDS:
          return current;
        case SECONDS:
          return current / 1000;
        };
        return current;
      }
}