Using the MongoDB Oplog to trigger asynchronous work

Background

Over the past few years at Yesware, we’ve settled into a frequent pattern for handling asynchronous chunks of work in various apps in our microservices architecture. Typically thus far in our Ruby applications, the pattern has gone something like this:

  1. An incoming piece of data arrives, through an HTTP endpoint, or a database write, or a read from a message queue. We need to do no more than 1 or 2 quick things with that data before responding or exiting so that we can keep up with the onslaught of incoming data. Usually this means writing the piece of data to a database, then enqueueing to a message queue for further asynchronous processing.

  2. Once the data or a pointer to it has been written to a message queue, a worker reads that job, processes the data, then publishes to some other queue, or takes some kind of direct action.

This pattern is not a bad one, and it has served us well in many cases. However, it adds overhead in the form of extra services to stand up (a message queue), boilerplate code to maintain for things like intermediate data structures, and undesirable complexity. For a recent project we decided to try a new approach. This project involves reading from a high-volume exchange on RabbitMQ (our latest message queueing system of choice), writing much (but not all) of that data to what we expect will soon be a very large MongoDB cluster, then updating a smaller Postgres database with aggregated stats about the data we’ve just written.

The filtering of raw data and storing into MongoDB is fast, but aggregating the data into Postgres will take far too long to allow the RabbitMQ queue to keep up. Now, you haven’t truly lived until you’ve let an overflowing queue send the RabbitMQ cluster into flow control, throttling your publishers such that some of your precious data is dropped instead of published, but we’ve noticed an interesting trend. Our customers tend to like it better when things work and data doesn’t get lost, so we do our best to ensure flow control remains a distant threat. Asynchronous aggregation it is. Since we’re writing the data to a MongoDB replica set, why not use Mongo’s inherent replication functionality to trigger the downstream processing?

MongoDB Replication

Replication in a MongoDB cluster is handled by secondary nodes requesting records from the primary node’s oplog, then applying those changes as they come in. Since the primary stores its oplog in a Mongo collection, any other process can read that collection and do whatever it likes with the changes as they occur. There is some prior art for this on the web, but not much in the Ruby world. However Stripe developed a nifty gem a while back called mongoriver that does just that. It reads from the oplog, maintains its position in the oplog with a timestamp stored back into Mongo, and uses an evented model to issue callbacks when various types of operations occur. Sounds great, right? It kind of is, but we encountered a few bumps during implementation.

Setup

To use mongoriver, you need a MongoDB replica set. Without a replica set, there is no replication (duh!), which means no oplog (doh!). We usually develop against a single node Mongo in development, but to get this working in a development environment, we had to set up a couple more nodes and convert them into a replica set. This is as simple as creating new locations for your extra clusters, then using the mongo console to start the replica set. MongoDB has a great summary of that here.

Once that’s going, you’ll need a couple of classes in your Ruby app to handle the oplog: an outlet that is triggered when new operations occur, and a worker to set up the tailer and stream the oplog. The outlet is the easy part, let’s start with that.

class FilteredThingOutlet < Mongoriver::AbstractOutlet
 
  # This method will be called on every insert in the oplog, and will be given 3
  # params: the DB name, the collection name, and a hash that is the document being
  # inserted.
  def insert(db_name, collection_name, document)
 
    # We only want to publish documents of the right type that have a user_id
    if collection_name == "filtered_things" && document.keys.include?('user_id')
 
      # Publish the full document (in our case this also wraps the document in a Thrift
      # struct) for downstream processing
      RabbitMQPublisher.publish(document)
    end
  end
end

Easy. The worker is a bit more complex, as it needs to first set up a tailer that can read from the oplog, then stream output from that tailer to the outlet. Also, in order to maintain its position in the oplog, we use a PersistentTailer that knows how to save that position to a live mongo connection. In a simple development environment, this can usually be the same connection that the oplog is reading from.

class FilteredThingOplogWorker
  def self.run
    # Get the MongoDB connection from the MongoMapper model (more about MongoMapper in a bit...)
    mongo_connection = FilteredThing.collection.db.client
 
    # This will persist the oplog position to the DB every 60s into a collection called
    # ‘oplog-tailers’ by default
    tailer = Mongoriver::PersistentTailer.
      new([mongo_connection],
          :existing, # Use an existing MongoDB connection (instead of creating a new one)
          'filtered_things' # A name for the position persistence to use. Using something
                            # similar to the data in the collection being tailed makes
                            # sense here.
      )
 
    # Hook up the oplog stream to our handler class, FilteredThingOutlet
    stream = Mongoriver::Stream.new(tailer, FilteredThingOutlet.new)
 
    # Stream 4ever!
    stream.run_forever
  end
end

We then wrap this in a simple rake task that calls FilteredThingOplogWorker.run and watch the streaming happen. In a development environment, this works swimmingly. But in a production environment, there are typically separate users for each database, even if those users have the same name and password. In our case, the databases are filtered_data, where the data lives, admin, where the oplog is, and _mongoriver, which is the default name of the DB to which Mongoriver will persist its position. Unfortunately, this means using separate authentications for each database, but we can at least authenticate multiple times on the same connection. In addition, the default 60 second persistence is perhaps a little conservative for our tastes, but that’s also easy to change by passing an option to the tailer. The worker then becomes a little more complicated.

class FilteredThingOplogWorker
  def self.run
    # This will persist the oplog position to the DB every 10s with the
    # :save_frequency option
    tailer = Mongoriver::PersistentTailer.
      new([mongo_connection], # Now defined by the method below
          :existing,
          'filtered_things',
          {
            save_frequency: 10, # Persist position every 10s (overriding the 60s default)
            db: '_mongoriver' # Store the position in this DB
          })
 
    stream = Mongoriver::Stream.new(tailer, FilteredThingOutlet.new)
 
    stream.run_forever
  end
 
  # Get a Mongo connection that has permissions to tail the oplog, and to store
  # the state in the _mongoriver DB. This means authenticating with 2 additional
  # DBs. The user/pass combos are the same on all 3 DBs (admin, _mongoriver, and
  # filtered_data), so no extra config is necessary.
  def self.mongo_connection
    # Only need the extra authentication in production
    if Rails.env.production?
      # 'hosts', 'user', and 'password' should be pulled in from the environment, or
      # from a Mongo configuration (ie, mongo.yml)
      Mongo::ReplSetConnection.new(hosts).tap do |conn|
        conn.db('admin').authenticate(user, password)
        conn.db('_mongoriver').authenticate(user, password)
      end
    else
      # Everywhere except prod, just reuse the FilteredThing Mongo connection
      FilteredThing.collection.db.connection
    end
  end
end

There are a couple of things worth pointing out. First, we’ve specified the database _mongoriver for storing the position. Technically it can be called whatever you like, and it could even be the same database from which you’re reading the oplog. However, if it is, then you have to deal with the fact that the outlet callbacks will fire when the position is written, since it’s just another insert. I think it’s cleaner to have a separate database for the position, even if it only has 1 collection – oplog-tailers – with only 1 document. Incidentally, the oplog-tailers collection name can also be overridden via the :collection option.

In addition, because there is a set frequency at which the tailer will save its position, the outlet callbacks may fire on duplicate oplog entries in the case where the worker restarts or crashes in the middle of the window. We’ve designed our downstream aggregation processing to gracefully handle duplicate publishes, so that isn’t a problem. In other use cases, it might require extra work to ensure the downstream processing is idempotent, since there will certainly be duplicates at some point.

Results

As you can see, this new pattern requires very little code, which of course means much less maintenance overhead and general complexity. In addition, since it relies on MongoDB’s existing replication framework, which has to be fast in order for replication to function properly, the total throughput from document write to RabbitMQ publish is nearly immediate and not subject to any additional dependencies.

However, there is a substantial, though not insurmountable downside to this new style. We’ve historically used MongoMapper instead of Mongoid as our MongoDB ORM of choice at Yesware. We’ve written many plugins for it and lean on it pretty heavily across many of our microservices. But version 0.13.1 came out in 2014, and while there does seems to be some recent activity on master, it doesn’t appear to have an active maintainer anymore. In addition, now that Mongoid no longer requires its own driver (Moped) but instead uses the default MongoDB Ruby driver version 2, the choice of MongoMapper for our ORM has been questioned. Some of our recent microservices have used Mongoid with success, although in a pretty basic capacity – they aren’t attempting to use more advanced features like covered queries, for instance, that are not supported by Mongoid. Among other things, the connection objects were completely rewritten in version 2 of the Ruby driver, and Mongoriver doesn’t work with them, which means Mongoriver and Mongoid are not compatible (possibly older, Moped-based versions of Mongoid work, but that does not interest us). In fact, like MongoMapper, Mongoriver looks like it may have been abandoned of late; its most recent comment is also from 2014.

This means that we’re using a seemingly unmaintained gem (mongoriver) which relies on another seemingly unmaintained gem (mongo_mapper), and neither can be updated to use the most recent Ruby driver. This is fine for now, but will eventually be a problem, since in the future we’ll probably need to upgrade to a Mongo too new to support the older driver. We’ve considered starting our own forks of MongoMapper and Mongoriver to get around this problem, and we may well do that, but the potential burden of that extra work is definitely a downside with this strategy. It’s close to a turnkey solution for now, but may not be for long. For anyone considering adopting Mongoriver, this is a meaningful consideration.

Despite the potential maintenance downside, the addition of Mongoriver to our workflow seems like a smashing success so far, and I expect we’ll be looking to it to make data passing easier in other places where we can piggyback on the existing replication infrastructure that MongoDB already provides.

Extra Credit

At Yesware, we love to be woken up at 3 am because something went terribly wrong. Wait, that’s not right. We love it when something goes wrong at 3 am and pages us. No, that doesn’t sound right either. We hate it when things go wrong, but on the rare occasion when it does, we want to know about it ASAP. (Preferably not at 3am. Yeah, that’s it.) This means we love monitoring, and nearly all of our features involve some degree of monitoring so that we know their health at all times. This feature was no different. Since we’re storing the tailer’s position in the oplog every 10 seconds, why not record a metric there so we can alert on any potential lag? Let’s take a look at the record that gets written to oplog-tailers with the tailer’s position.

{
    "_id": {
        "$oid": "55cd8d4ef4da36b0fe2aad19"
    },
    "service": "filtered_things",
    "state": {
        "time": {
            "$date": "2016-03-10T02:36:29.000Z"
        },
        "position": {
            "$ts": 1457577389,
            "$inc": 1
        }
    },
    "v": 1
}

It’s a pretty simple document, basically containing an _id like every MongoDB doc, the service name we told the tailer to use, a timestamp, and a position. We created a rake task that we can call every 10 minutes to fetch this document, compute how far behind realtime the timestamp position is and record that to our statsd server.

task :filtered_things_oplog_worker_delay => :environment do
   include YetiLogger # https://github.com/Yesware/yeti_logger
   mongo = FilteredThingOplogWorker.mongo_connection
 
   # Fetch the current record
   record = mongo.db("_mongoriver").collection("oplog-tailers").
              find(service: "filtered_things").first
 
   # Determine the last time this job ran, and where it is in the oplog
   last_run_at = record["state"]["time"]
   oplog_at = Time.at(record["state"]["position"].seconds)
   seconds_behind = last_run_at - oplog_at
 
   log_info(worker: FilteredThingOplogWorker, msg: "oplog tailing state", seconds: seconds_behind)
   Metrics.gauge("work.FilteredThingOplogWorker.oplog_behind", seconds_behind)
 end

Then we set an alert when the lag falls above a threshold that concerns us, and our confidence is increased. And hopefully we’re all sleeping soundly at 3am.