Monday, October 18, 2010

Evaluating timestamp resolution algorithms in Hector Framework (Cassandra DB)

Motivation:
As many of you might know, Java does not provide microseconds granularity. When using Cassandra DB you need to make sure that your clients have their clocks in sync. That is the requirement. Other non-relational databases like Voldemort, this is not necessary (they use Vector Clocks).
If we decided to use milliseconds resolution (what Java provides), sometimes if there are two inserts (or one insert and one delete) within the same millisecond (typical example in Lucandra when updating a row),
the second operation gets discarded as it happens at the same time than the previous request. This is why we had to come up with a way to provide every thread that requests a time a unique timestamp.

Goal:
 Evaluate the performance of the three Hector time resolution algorithms with Microseconds accuracy.
-          The fully synchronized algorithm guarantees unique timestamp in microseconds per threads and across threads.[1]
-          The second one uses Atomic Longs to accomplish the same goal than the first one, but presents some race condition and the results might not be guaranteed across threads. I contributed this code and it is in Hector branch 6 but not main branch. [2]
-          The third one does not guaranteed that a thread in two consecutives calls obtain the same timestamp. (risk of missing operations as Cassandra discard them if they contain the  same timestamp). Currently in hector main branch. [3]

Conclusion:
The fully synchronized timestamp resolution algorithm present a decent TPS, almost imperceptible comparing with the non-synchronized one and providing a “bullet proof” mechanisms to guarantee  unique time resolution within the same thread and across threads as well.

Environment:
-          Machine: rwc-devlinux5
-          CPUs:  2 X QC x 3.0gh
-          RAM: 32 GB
-          SO:  Red Hat (RHEL 5.3)

About the test:
I included the result of 3 benchmarks. With 8 threads, with 50 and 100(with no warm-up).
Per benchmark, there are 3 sub-benchmark where I compare the behavior of 3 algorithms to calculate the timestamp:
-          Fully synchronized
-          Partially synchronized, and  (use AtomicLong)
-          Not synchronized at all
Each thread execute a couple of thousand operations.
Each operation does 3 things:

  double res = Math.random() * 13;
  String s = "something" + res;
  TimestampResolutionNotSync.MICROSECONDS.createTimestamp();

Before the benchmark, I let the test run 5k operation with no time calculation to warm-up. Without it, the first set of test to be executed always shows a slightly less performance. So I consider it was fair to include the warm-up first. The 100 threads test has not warm-up.

The overall operation is 5,000,000. Regardless the number of threads. This way I can see how fast or slow  it works.

How to interpret the results:
-          Look at the attached files. There 2 files per benchmark. (one shows the TPS and media, and the other one shows the CPU usage)
-          For example: 100Threads-15secs-50kops means:
o   100 threads
o   Graphic has timeframes of 15 secs
o   Each thread perform 50 K operations
-          There are three colors indicating the performance of the three algorithms (Blue, Red and Green)
        

BLUE: the createTimestamp method is fully synchronized
GREEN: Partially synchronized using atomic long with possible race conditions.
RED: Not synchronized at all with high risk of getting the same timestamp within and across threads.




Case 1: 8 Threads - 1000 K operations.
Note: Lower values are better. It shows the media of each algorithm.




Case 1: 8 Threads - 1000 K operations.
Note: TPS (transactions per second). Higher values are better here.



Case 1: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).



Case 1b: 8 Threads - 1000 K operations with warmup (5K operation per thread and non-synchronized is first)
Note: Including a warmup and switching the non-synchronized operation in the first place seems to make a different. The lowest media is of course for the non-synchronized algorithm, the highest value is for the fully synchronized algorithm and as expected, the algorithm that uses AtomicLong is slightly in between (lower values are better).



Case 1b: 8 Threads - 1000 K operations with warmup (5K operation per thread and non-synchronized is first)
Note:  In this second graph second case (TPS), the three algorithms seem to perform just as good. (highest values are better)




Case 2: 50 Threads - 100 K operations with warmup (lower values are better).



Case 2: 50 Threads - 100 K operations with warmup(higher values are better).


Case 2: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).





Case 2: 100 Threads - 50 K operations.



Case 3: 100 Threads - 50 K operations.


Case 3: CPU and memory. Notice how the cpu usage stays constant during the execution of the three test cases (a spike down indicates the end and beginning of the next test).


Source code
[1]


public enum TimestampResolutionFullySynchronized {
  SECONDS, MILLISECONDS, MICROSECONDS;

  /**
   * The last time value issued. Used to try to prevent duplicates.
   */
  private static long lastTime = -1;

  private static final long ONE_THOUSAND = 1000L;

  public synchronized  long createTimestamp() {
    long current = System.currentTimeMillis();
    switch(this) {
    case MICROSECONDS:
      // The following simulates a microseconds resolution 
      // by advancing a static counter every time
      // a client calls the createClock method, simulating a tick.
      long us = current * ONE_THOUSAND;
      if (us > lastTime) {
        lastTime = us;
      } else {  
        // the time i got from the system is equals or less 
        // (hope not - clock going backwards)
        // One more "microsecond"
        us = ++lastTime;
      }
      return us;
    case MILLISECONDS:
      return current;
    case SECONDS:
      return current / 1000;
    };
    return current;
  }
}



[2]

public enum TimestampResolutionPartiallySynchronized {
      SECONDS, MILLISECONDS, MICROSECONDS;

      /**
       * The last time value issued. Used to try to prevent duplicates.
       */
      private static final AtomicLong lastTime = new AtomicLong(Long.MIN_VALUE);

      public long createTimestamp() {
        long current = System.currentTimeMillis();
        switch(this) {
        case MICROSECONDS:
          // The following simmulates a microsec resolution 
          // by advancing a static counter every time
          // a client calls the createClock method, simulating a tick.
          long us = current * 1000;
          if (us > lastTime.longValue()) {
            lastTime.set(us);
          } else
            // the time i got from the system is equals or less 
            // (hope not - clock going backwards)
            // One more "microsecond"
            us = lastTime.incrementAndGet();
          }
          return us;
        case MILLISECONDS:
          return current;
        case SECONDS:
          return current / 1000;
        };
        return current;
      }
}



[3]


public enum TimestampResolutionSimpleMicro {
      SECONDS, MILLISECONDS, MICROSECONDS;

      private static final long ONE_THOUSAND = 1000L;

      public long createTimestamp() {
        long current = System.currentTimeMillis();
        switch(this) {
        case MICROSECONDS:
          return current * ONE_THOUSAND;
        case MILLISECONDS:
          return current;
        case SECONDS:
          return current / 1000;
        };
        return current;
      }
}

9 comments:

Matías S. Sulik said...

Awesome... Nice Blog!

jtruelove said...

If you have multiple writers in different processes though you could still run the risk of getting the same time for the same key right? This would probably be rare though.

Patricio Echagüe said...

Hi, yes that is correct. Do you have that scenario?

In other words, this algorithm fails when two process access to the same row concurrently and your application is timestamp dependent.

jtruelove said...

No I don't have it in my use case currently. If say you load balance flow across a VIP and you have someone hitting you frequently from an outside service it's possible, which sounds rare but bots can cause this pretty easily. I like the solution though for a single process or one where your writers are logically partitioned.

Anonymous said...

A better algorithm might be,

static AtomicInteger microSecondParts = new AtomicLong();


return System.currentTimeMillis() + microSecondPart.incrementAndGet();


No chance of collision unless you are doing more than 1000 ops per ms, simpler, and no locking.

Ray said...

You seem to making the assumption that all writes will be going through a single JVM. That doesn't seem like a restriction that will work in practice at internet scales. Am I missing something?

I'm new to Cassandra, so forgive me on this, but two writes can't occur at the same microsecond? Is that two writes to the same key+column, or two writes, period?

Patricio Echagüe said...

@Ray it is only valid per process of course.
These algorithms guarantees uniqueness only per process/JVM.

If you expect concurrent access to the same columns across machines and you care about it, you have to either tolerate those inconsistencies or re-design the schema.

This examples I posted help you to deal with for instance Lucandra operation where you delete+ insert(update) columns. Normally that happens so fast that you might miss deletes as the delete and insert happens at the same relative time.

See the introduction.

Thank you for reading and leaving your feedback!

T.U.C. said...
This comment has been removed by the author.
maasg said...

>static AtomicInteger microSecondParts = new AtomicLong();

+1

But you need to limit it to contribute to the microsecond part with a mod op:

return System.currentTimeMillis()*ONE_THOUSAND+microSecondPart.incrementAndGet()%ONE_THOUSAND;