Introducing tobox

tobox is a framework-as-a-gem I’ve been developing over the last year, to solve a particular requirement: guarantee that callback/post-action tasks and emission of events resulting from a business transaction stored in the database happen 100% of the time.

In order to talk about its value, and defend some of choices made, some background is required.

Context

For the problem of offloading processing resulting from a given business transaction, the ruby community defaults to using background jobs. Most of us have used sidekiq at one point or another in the last 10 years, while the elders among us may also be familiar with resque or delayed_job, and here’s an honourable mention to shoryuken, as integration with SQS is something that every other framework lacks.

These frameworks have mostly commoditized the “how do I defer this business flow after another one completes, while not making the client wait for it to finish” problem for us all. They achieve this by providing some sort of simple DSL to delegate the execution of a routine, by serializing and writing the required state into some broker, only to have another process read and and execute it:

class Foo
  def activate
    # heavy duty
  end
end

# then

foo.async.activate

# service object style, most common nowadays
class ActivateJob < SpecialFrameworkSubclass
  def perform(user)
    user.activate
  end
end

ActivateJob.perform_async(user)

The solution is fairly similar for all of them (they mostly “stole” features from each other), so they differentiate themselves on other aspects, such as performance of the execution model (process/thread based), choice of broker (database, redis, SQS, rabbitMQ…), or advanced features (plugins, retry configuration, on complete callbacks, etc…).

One problem that is common to all of them, is how one needs to be aware of the storage and execution characteristics of the deferred routines, in order not to be surprised by some unexpected behavior. Argument serialization is one: while rails provides a solution for serializing model instances for activejob, most complex objects can’t be serialized, so documentation and FAQ sections will contain caveat warnings and recommendations about which types of objects can be used. Primitive types tend to be supported, however simple objects such as symbols aren’t supported everywhere (as an example, sidekiq only accepts primitives which can be serialized into json).

But the main problem that gets everyone at some point in their careers, is when the state being stored in the database before deferring a function, is not available once it gets executed. In fact, one of sidekiq wiki FAQ oldest entries contains the following:

Why am I seeing a lot of “Can’t find ModelName with ID=12345” errors with Sidekiq?

Your client is creating the Model instance within a transaction and pushing a job to Sidekiq. Sidekiq is trying to execute your job before the transaction has actually committed. Use Rails’s after_commit :on => :create hook or move the job creation outside of the transaction block.

Database transactions

Most rubyists building web services are using an ACID-compliant database, usually over their favorite ORM; mine is sequel, but the majority probably knows activerecord the most. For the context of this post, the most important property of the ACID family is Atomicity, which ensures that all operations from a group all completes, or not at all. This includes errors in the operations themselves, but also “out of our control” events such as power outages or computer crashes. This is achieved by wrapping this group of operations (or SQL statements) in a database transaction:

BEGIN; -- transaction starts here
-- UPDATE / INSERT / DELETE statements here
COMMIT; -- or ROLLBACK; transaction ends here

A transaction is a first-class citizen of your business logic, as it has to be explicitly started and finished. Ruby ORMs usually expose block-based DSLs to manage transactions:

# using sequel
DB.transaction do # BEGIN
  DB[:foo].insert(bar: 1) # INSERT
end # COMMIT; ROLLBACK if an error is raise inside the block

# using activerecord
ActiveRecord::Base.transaction do
  Foo.create(bar: 1)
end

Transactions are also managed via other features, such as model callbacks, and one has to be aware of it when using deferred routines:

class User
  after_save :activate

  def activate
    ActivateJob.perform_async(self)
    # TRANSACTION DID NOT COMMIT YET HERE!
  end
end

(The above is fine if you’re using delayed_job, as the broker is your database; not as fine if you’re using sidekiq or shoryuken though.)

And then there are some other 3rd party gems which hide these calls under layers of DSL (looking at you, state machine gems). Given all the options available, and how convenient these deferred DSLs seem, it’s no wonder that, when using them, one is either oblivious, or lost, on whether a transaction is open. Specially if this feature needs to be shipped by next Friday.

And if you deferred a function before a transaction is committed, and you need the state you’re writing into the database, and that transaction either fails, or takes too long to commit, you’ll see yourselves staring at some similar FAQ like the one I shared above.

# service object style, most common nowadays
class ActivateJob < SpecialFrameworkSubclass
  def perform(user)
    user.activate #=> Exception raised, RecordNotFound
  end
end

But let’s say you lived to fight another day, you learned your lesson, untangled that 3rd party code you don’t own, and now you’re sure that the deferred function call happens after the transaction successfully commits. Problem solved, right?

Storage/Broker consistency

So you’re committing a database transaction to fully store the state of your business transaction, and then you’re invoking the “defer function” routine, which will push the serialized state into your broker:

foo = ActiveRecord::Base.transaction do
  Foo.create(bar: 1)
end
ActivateJob.perform_async(foo)

What if there’s an outage between the transaction committing and the job being enqueued? It’s terrible, given that your “jobs to be done” will probably be silently lost.

Such a conundrum is only possible to avoid if the database and the broker are protected by the same transaction guarantees, i.e. if the broker is the same database where your business resources are stored. From the background job alternatives mentioned above, only delayed_job fits the bill, given that the queue is a database table. Everything else (yes, including sidekiq) is vulnerable to this problem.

This has been discussed at length in this 2017 blog post.

Transactional outbox pattern

While the description of the problem above mostly focuses on the background jobs ruby frameworks use-case, the same type of problem happens if your business transaction requires to perform some rpc call (HTTP, GRPC) to a separate system, which happens a lot if you’re using microservices.

A solution for this general problem was formalized in the transaction outbox pattern. The gist of it is, business transactions store their “events to be emitted” in a separate database table (typically called “outbox”) within the same database transaction. This in itself ensures that the events associated with the business resources will always be stored if the resources are stored successfully. Then there is a separate worker (thread in same process, separate process…) reading entries from the “outbox” table, and doing the actual publishing of the event (or enqueuing of the job) before deleting the entry.

tobox

So what is tobox again? In a nutshell, it’s a “transactional outbox” framework.

I built it because I needed its properties, and I couldn’t find a transactional outbox implementation for any programming language, just blog posts on how to hypothetically do your own.

The DSL is declarative and “event-based”, which means that one can register handlers bound to specific events:

# this is the config DSL
# tobox.rb
on("order_processed") do |event|
  Payment::Start.call(event)
end
on("order_cancelled") do |event|
  CustomerSupport::Notify.call(event)
end

# if handling multiple events
on("order_processed", "order_cancelled") do |event|
  Logs::Order.call(event)
end

### app/handlers/payment_start.rb
module Payment::Start
  module_function

  def call(event)
    data = event[:after]
    # do something with the event data hash, perhaps enqueue it as a background job?
  end
end

An entry point script is also provided, to start a separate process to consume events from the outbox table:

> tobox -r ./app.rb —config tobox.rb

# if you’re using rails
> tobox -r ./config/environment.rb —config tobox-dsl.rb

In the process, it handles the complexity of the “plumbing” involved in building a transactional outbox consumer, using a set of conventions and tricks:

Thread and Fiber-based worker pools

tobox, by default, uses threads to handle many events at the same time in the same process, just like sidekiq’s. You can tweak the number of threads in the config:

# tobox.rb
concurrency 25

You can, however, switch to using fibers instead of threads, if your event handling is very IO-bound (if you’re just relaying the events to SNS, it is):

# tobox.rb
worker :fiber # :thread by default
concurrency 100 # max 100 fibers running at the same time

(This requires using the fiber_scheduler gem).

SKIP LOCKED

When enabling multiple consumers for a given queue, one has to have the guarantee that a given event won’t be processed more than once by separate workers at the same time. One way to achieve that using the database is by locking the row where the event is stored, and delete it after it has been handled. However, if two workers try to lock the same row, one of them will remain idle, instead of picking up the next available event.

While the database row-level locking model wasn’t built to support the queue use-case, some recent features were added to some of the most popular database engines to accommodate it. One of these features is the SKIP LOCKED clause, a non-standard SQL clauuse which can be used with SELECT …. FOR UPDATE, and will result in already locked rows being ignored (“skipped”) by the SELECT statement.

This feature is core to how tobox works, which is why it only supports databases including the SKIP LOCKED feature.

(Supporting this many databases is only possible thanks to the sequel gem, by the way).

Plugin DSL

tobox ships with a simple plugin system which supports intercepting handlers before and after they’re handled (or error out). It’s the foundation of a few plugins which already ship with the gem:

# tobox.rb
plugin(:zeitwerk)
plugin(:datadog)
plugin(:sentry)

multilang support

Until now, I didn’t show how to insert events into the queue. That’s because, for now, tobox does not provide any DSL for it. The reason is, working with database objects is probably already such a big part of your day-to-day work, that moving that concern into a 3rd party gem may end up having more drawbacks than benefits. Moreover, perhaps this way it’s clear that you can use a transactional outbox even if your application is not made in ruby.

For instance, here are several examples of how to write an event into the outbox:

ruby

# using sequel dataset API
DB[:outbox].insert(type: "order_created", data_after: to_json(order))
# or an ActiveRecord model
OutboxEvent.create(type: "order_created", data_after: to_json(order))

python

# SQLAlchemy
event = OutboxEvent(type="order_created", data_after=to_json(order))
db.session.add(event)

elixir

OutboxRepo.insert %OutboxEvent{
  type: "order_created",
  data_after=to_json(order)
} do 

database triggers

There are also other ways to “go implicit”, if that fits your use-case. One way you can do it is by using database triggers, such as this postgresql example:

CREATE OR REPLACE FUNCTION order_created_outbox_event()
  RETURNS TRIGGER
  LANGUAGE PLPGSQL
  AS
$$
BEGIN
	INSERT INTO outbox(event_type, data_after)
		 VALUES('order_created', row_to_json(NEW.*));
	RETURN NEW;
END;
$$

CREATE TRIGGER order_created_outbox_event
  AFTER INSERT
  ON orders
  FOR EACH ROW
  EXECUTE PROCEDURE order_created_outbox_event();

Conclusion

tobox is a lightweight tool that you can use to ensure robustness and guarantee at-least-once semantics in your business workflows with little to no performance impact. It’s therefore not a silver bullet: it trades off some E2E latency (the extra step of putting and taking the event from the database) to achieve that robustness.

While it may “quack” like a background job framework, it is not designed to be one. Its features (do check the README) are more focused on the transactional outbox use-case, so if you require background job features, you should use tobox alongside such a framework.

The declarative DSL is a departure from the current “standard” for background jobs, IMO leaner, and eliminates the antipattern of creating a job class, only to call some other service object in the #perform method.

Some edges are still rough, and some features are still missing (no web dashboard yet, for example). But it already does “one thing well”, so that’s the 80% right there.