Tuesday, June 16, 2015

Building a Wired Tiger FIFO Queue in MongoDB.

                  Summary – Wired Tiger is much faster than MMAP but it does need you to start thinking about some new things. Don’t just accept 5 times faster when it can be 10 or 20 times with a little thought. Here's how to improve the speed of queueing code.

Wired Tiger is new, and whilst it is nearly always faster then MMAP as a storage engine it does have a few things you need to take into account when writing high performance code for it. Here is one example, building a queue. This is a real example from a customer – I've not included any code at this point as that's not mine but I have sanitised this to share the results.

   I started with a piece of code to build a first-in first out (FIFO) queue where some processes (producers) add data items to the queue and some (consumers) process then remove items from the queue. A consumer will first uniquely 'claim' an item on the queue and then, after processing it remove it; Items only go to one consumer. There is also code so that if a consumer fails to remove something from the queue – signifying the consumer perhaps crashed during processing – the item would be passed to another consumer after a certain time.  This would possibly be in MongoDB as you are using MongoDB for other things and don’t feel you want to add a dedicated queueing technology to the stack if you don't need to.

  We start by testing the Mongo portion with producers and consumers running as fast as possible to determine the maximum throughput this code can handle. In reality the speed of your consumers or producers will be limited by what they are for but it's right to ensure queue speed is not limiting factor. In fact the key metric here is not the sum of operations but the lesser of the number of produce and consume transactions per second. Not consuming fast enough makes queues grow, consuming too fast means only the producer affect throughput. The consumer needs to be able to run faster than the producer to clear any backlogs though.

  With a limited set of hardware running flat out, obviously producers and consumers compete for resource and many producers could steal all the resource, to that end you have balanced the number of each nicely.

Test Summary

I ran my tests on 2 servers on AWS. These were C4.2XL (8 CPU's, 15GB RAM) w. 2000 PIOPS .

I used one server for the client and one for the server to correctly demonstrate the impact of going via the network rather than in-kernel to the server and to limit any effects of the client code (or isolate any issues)

MongoDB was set up as a replica set to accurately reflect the cost of the opLog (replication transaction log) and it's implication for client code. Running with replication is not the same as running without and if sharding were added that also has performance implications so it's important to test with a real setup.

Running the initial code gave the following results.


Benchmark                                       Mode  Cnt     Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  8131.002 ± 580.113  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10  5079.490 ± 139.047  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10  3051.512 ± 473.569  ops/s

And the following hardware usage

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          24.60    0.00   18.68    0.09    0.36   56.27

Device:            tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda              0.20         0.00         2.40          0         24
xvdb             39.00         0.00      9637.60          0      96376

This is failing to use all the CPU on the server and the throughput is 3000 ops/s – this may well be all you need for production however I looked into what could be done to improve this. It's also using only 2% of the I/O capacity!

Just to verify I also run the same code on a C4.4xl ( 16 CPU's,  30GB RAM )server with the following results.

Benchmark                                       Mode  Cnt     Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  8382.719 ± 240.753  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10  5022.430 ± 140.730  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10  3360.289 ± 150.755  ops/s

This is going no faster on double the hardware !! – so clearly not scaling although the results are more consistent across 10 tests. How can something not scale on one server? The truth is scaling can also be about your workload.

Code Review

At this point I reviewed the code again. It was running with one producer thread and 4 consumer threads.

The limiting factor for the producer is having a single thread and not doing any bulk inserts. This was resulting in network delays however this is not really an issue in this scenario as it's the consumer is slow.

The producer code adds a 'lastchanged' timestamp to records on adding and a status of 0 (unprocessed).

The consumer code uses the following logic.

  If you find any record where state = 0 OR
( state = 1 and lastchanged < currentTime() – 300s )
         The set its state to 'processing' and set it's lastchanged to currentTime
         Then get its identifier (_id) , do your work then delete it using its identifier.

In this a producer does one database operation (insert), a consumer two – claim (findAndModify) then remove (remove).

In the original code there were three indexes, one on _id that is required, one on state and one on lastchanged.                 

My first observation is that the index on last changed will never be used – MongoDB will, in a simple case like this, use only a single index, index intersection is only used where the overhead of using it outweighs the benefit. If you wanted to index optimally for that query you should have a compound index on state and lastchanged. Having an additional index slows the entire system by the time and resources required to update it.

However I realised, before changing that index that there was a logic bug in the code – this is NOT a FIFO queue as expected but a priority queue. MongoDB will use the index on state and will always find the unprocessed items first, because of the way the query is arranged and the numeric values used in it. This means as long as there are any unprocessed items on the queue, abandoned items will never be processed. This behaviour actually let to a nice solution for the next part of the problem.

In this sort of queue you really to want to just get the oldest thing on the queue that is not being processed, as only a relatively small number of items are currently being processed the answer is to look at the front of the queue. To this end the correct approach here is to remove ALL indexes apart from _id and simply read the queue collection from the start.

Doing this and removing both indexes gave the following result.

Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  12681.337 ± 681.165  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   4606.581 ± 205.275  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   8074.756 ± 773.367  ops/s

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          15.75    0.00   16.12    0.85    0.42   66.87

Device:            tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda              0.20         0.00         2.40          0         24
xvdb             77.02         0.00     18452.75          0     184712


Which seems better although the limiting factor in this is the insert speed (bounded by it's single thread and network round-trip) this is considerably better at consuming AND now is handling both type of item, new and abandoned.

At this point I increased the thread count, we were only using 40% CPU still so perhaps we just need the client to push harder – clients not pushing the server is the most common performance issue we see at MongoDB.

With 8 consumer and 4 producer threads (versus 4 and 1) We see the following

Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  14040.196 ± 494.540  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10  11020.286 ± 256.231  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   3019.910 ± 413.103  ops/s

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          34.25    0.00   23.87    0.46    0.40   41.02

Device:            tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda              0.60         0.00         5.60          0         56
xvdb            103.30         0.00     26104.80          0     261048

We have gone to 60% CPU but now writes, whilst better, are significantly impacting read performance. So I tried again with 8 consumer and 2 producer threads:


Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  10499.855 ± 609.064  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   6880.779 ± 142.662  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   3619.076 ± 539.684  ops/s

Now production is down but consumption is not up significantly L

Checking logs

My next port of call was checking the MongoDB log file – it logs any operation taking more than 100ms (or less if configured) as a problematic 'slow' operation.

It had many entries like this both in the original code and the index-free version.


2015-06-13T11:43:01.800+0000 I COMMAND  [conn8] command perftest.$cmd command: findAndModify { findandmodify: "messages1", query: { $or: [ { ps: 0 }, { ps: 1, lastProcessDate: { $lte: new Date(1434195481678) } } ] }, update: { $set: { ps: 1, lastProcessDate: new Date(1434195781678) } } } update: { $set: { ps: 1, lastProcessDate: new Date(1434195781678) } } nscanned:1 nscannedObjects:1 nMatched:1 nModified:1 keyUpdates:0 writeConflicts:28 numYields:0 reslen:479 locks:{ Global: { acquireCount: { w: 30 } }, Database: { acquireCount: { w: 30 } }, Collection: { acquireCount: { w: 29 } }, oplog: { acquireCount: { w: 1 } } } 106ms

The notable item here is the writeConflicts item, this is new in Wired Tiger which rather then pessimistically locking the entire table, limiting updates to one at a time, optimistically changes a record, writes a new one then verifies nothing else has modified it in the meantime before flipping the definitive version to the new one. In both cases, indexed and unindexed, the problem was that many consumers are attempting to claim the same record at the same time (the first one), writing a changed version then finding that they lost out in the race and trying again, this is really wasteful.

The solution therefore is to try and have each consumer grab a different item, near the front of the queue but not always the first one. How deep into the queue they jump to some extent depends on the number of consumers but we can work round that.

I therefore tagged each record as it is added with a random value between 1 and N called subq , I also changed the consumer to choose a random value of subq, this would allow each to walk down the queue until it found one suitable and not just all claim the first item on the queue. I chose an initial value of 16 for N.

This gave me the following result:


Benchmark                                       Mode  Cnt     Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  4540.941 ± 195.867  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10  2880.929 ±  65.881  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10  1660.012 ± 159.391  ops/s


Which is clearly worse ! – checking the logs again showed.

2015-06-13T11:51:53.832+0000 I COMMAND  [conn7] command perftest.$cmd command: findAndModify { findandmodify: "messages1", query: { subq: 8, $or: [ { ps: 0 }, { ps: 1, lastProcessDate: { $lte: new Date(1434196013796) } } ] }, update: { $set: { ps: 1, lastProcessDate: new Date(1434196313796) } } } update: { $set: { ps: 1, lastProcessDate: new Date(1434196313796) } } nscanned:1 nscannedObjects:1 nMatched:1 nModified:1 keyUpdates:0 writeConflicts:6 numYields:112 reslen:479 locks:{ Global: { acquireCount: { w: 120 } }, Database: { acquireCount: { w: 120 } }, Collection: { acquireCount: { w: 119 } }, oplog: { acquireCount: { w: 1 } } } 23ms

We have reduced the conflicts but the high yields means that during the search phase, scanning the queue, we are being held up but other items being modified underneath us, essentially checking each item in the queue for eligibility pauses if something is in the process of claiming it. Row level locking is biting us almost as hard as collection level!!!

The answer to this is to index the subq so we can just straight to the first item in the queue with that subq value. Adding an index on subq gives the following.


Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  21706.080 ± 736.475  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   5709.433 ± 124.869  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10  15996.647 ± 739.221  ops/s

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          29.01    0.00   28.76    0.64    0.58   41.01

Device:            tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda              0.00         0.00         0.00          0          0
xvdb             98.60         0.00     24568.00          0     122840


This is much more impressive except really it's limited by the producer, many of those 16,000 consumers are not finding anything with their subq value, I first tried increasing the writer threads and that got me to the following:


Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  15097.634 ± 287.554  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   8409.507 ± 481.190  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   6688.128 ± 391.623  ops/s

 Nice, but can we improve on that as potentially having a consumer not find a record when there may be other there is not great use of resources

In fact we can by changing our query from subq = X to subq >= x, i.e. if we don’t find anything tagged for us look on the next value up – each consumer should now always find things. There is no really efficient way to do a circular buffer here , you would need >=X OR

Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  13286.569 ± 180.155  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   5688.509 ± 227.395  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   7598.060 ± 358.331  ops/s

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          52.53    0.00   31.41    0.06    0.22   15.79

Device:            tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda              0.40         0.00         9.60          0         48
xvdb             52.20         0.00     12704.00          0      63520

Consumer is now beating out the inserter again – which is preferable but what else can we do. Given we are now using >= for the queue we can actually increase the number of subq values too to further reduce any collisions. I upped it from 8 to 256 and got the following results.

Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  13805.202 ± 504.619  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   5741.364 ± 316.827  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   8063.838 ± 258.587  ops/s

Still not great but lets turn up the producers and see how well the consumer count survives now. Moving to 8 producers and 16 consumers:

Benchmark                                       Mode  Cnt      Score      Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  17814.696 ± 1583.704  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10   9889.345 ±  604.698  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10   7925.351 ± 1002.229  ops/s

Right – that's a better result, ~8000 consumers/s and surviving a heavier write load.

At this point I decided to do a longer run to verify that when the queues filled up this would still work nicely.

# Run complete. Total time: 00:42:22

Benchmark                                       Mode  Cnt      Score     Error  Units
FindAndModifyThenDeleteTest.test               thrpt  500  16056.475 ± 322.935  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt  500   8862.228 ± 182.037  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt  500   7194.247 ± 142.258  ops/s
That's not too bad, a small slowdown when the queues are really quite full, 1,700/s more in the producer for 2542s = 4.3M waiting in the queue.

Finally I wanted to be sure this scaled appropriately so changed the server instance type to a C4.4xl, double the CPU's.

Benchmark                                       Mode  Cnt      Score      Error  Units
FindAndModifyThenDeleteTest.test               thrpt   10  30020.700 ± 2108.152  ops/s
FindAndModifyThenDeleteTest.test:offerMessage  thrpt   10  16044.580 ±  974.911  ops/s
FindAndModifyThenDeleteTest.test:takeMessage   thrpt   10  13976.120 ± 1235.469  ops/s

That's close enough to linear scaling, at least on the consumer side that it's not worth worrying about.

Conclusion

I can't put the code here right now, but in summary, to make a queue in mongodb – with producer and consumer threads.


  • Remove state or time indexes.
  • Add a random subq value on insert to each.
  • Add a greater than random subq value to the query on retrieval
  • Index subq

That gave me a 2.5 times speed increase, makes the code correctly handle abandoned records and makes it scale with the hardware size.

Hope you made it this farreading this it's been an interesting problem to solve, Wired Tiger being new there will be a few of these coming along.