Introducing tobox
29 Apr 2023tobox is a framework-as-a-gem I’ve been developing over the last year, to solve a particular requirement: guarantee that callback/post-action tasks and emission of events resulting from a business transaction stored in the database happen 100% of the time.
In order to talk about its value, and defend some of choices made, some background is required.
Context
For the problem of offloading processing resulting from a given business transaction, the ruby community defaults to using background jobs. Most of us have used sidekiq at one point or another in the last 10 years, while the elders among us may also be familiar with resque or delayed_job, and here’s an honourable mention to shoryuken, as integration with SQS is something that every other framework lacks.
These frameworks have mostly commoditized the “how do I defer this business flow after another one completes, while not making the client wait for it to finish” problem for us all. They achieve this by providing some sort of simple DSL to delegate the execution of a routine, by serializing and writing the required state into some broker, only to have another process read and and execute it:
class Foo
def activate
# heavy duty
end
end
# then
foo.async.activate
# service object style, most common nowadays
class ActivateJob < SpecialFrameworkSubclass
def perform(user)
user.activate
end
end
ActivateJob.perform_async(user)
The solution is fairly similar for all of them (they mostly “stole” features from each other), so they differentiate themselves on other aspects, such as performance of the execution model (process/thread based), choice of broker (database, redis, SQS, rabbitMQ…), or advanced features (plugins, retry configuration, on complete callbacks, etc…).
One problem that is common to all of them, is how one needs to be aware of the storage and execution characteristics of the deferred routines, in order not to be surprised by some unexpected behavior. Argument serialization is one: while rails provides a solution for serializing model instances for activejob, most complex objects can’t be serialized, so documentation and FAQ sections will contain caveat warnings and recommendations about which types of objects can be used. Primitive types tend to be supported, however simple objects such as symbols aren’t supported everywhere (as an example, sidekiq only accepts primitives which can be serialized into json).
But the main problem that gets everyone at some point in their careers, is when the state being stored in the database before deferring a function, is not available once it gets executed. In fact, one of sidekiq wiki FAQ oldest entries contains the following:
Why am I seeing a lot of “Can’t find ModelName with ID=12345” errors with Sidekiq?
Your client is creating the Model instance within a transaction and pushing a job to Sidekiq. Sidekiq is trying to execute your job before the transaction has actually committed. Use Rails’s after_commit :on => :create hook or move the job creation outside of the transaction block.
Database transactions
Most rubyists building web services are using an ACID-compliant database, usually over their favorite ORM; mine is sequel, but the majority probably knows activerecord the most. For the context of this post, the most important property of the ACID family is Atomicity, which ensures that all operations from a group all completes, or not at all. This includes errors in the operations themselves, but also “out of our control” events such as power outages or computer crashes. This is achieved by wrapping this group of operations (or SQL statements) in a database transaction:
BEGIN; -- transaction starts here
-- UPDATE / INSERT / DELETE statements here
COMMIT; -- or ROLLBACK; transaction ends here
A transaction is a first-class citizen of your business logic, as it has to be explicitly started and finished. Ruby ORMs usually expose block-based DSLs to manage transactions:
# using sequel
DB.transaction do # BEGIN
DB[:foo].insert(bar: 1) # INSERT
end # COMMIT; ROLLBACK if an error is raise inside the block
# using activerecord
ActiveRecord::Base.transaction do
Foo.create(bar: 1)
end
Transactions are also managed via other features, such as model callbacks, and one has to be aware of it when using deferred routines:
class User
after_save :activate
def activate
ActivateJob.perform_async(self)
# TRANSACTION DID NOT COMMIT YET HERE!
end
end
(The above is fine if you’re using delayed_job
, as the broker is your database; not as fine if you’re using sidekiq
or shoryuken
though.)
And then there are some other 3rd party gems which hide these calls under layers of DSL (looking at you, state machine gems). Given all the options available, and how convenient these deferred DSLs seem, it’s no wonder that, when using them, one is either oblivious, or lost, on whether a transaction is open. Specially if this feature needs to be shipped by next Friday.
And if you deferred a function before a transaction is committed, and you need the state you’re writing into the database, and that transaction either fails, or takes too long to commit, you’ll see yourselves staring at some similar FAQ like the one I shared above.
# service object style, most common nowadays
class ActivateJob < SpecialFrameworkSubclass
def perform(user)
user.activate #=> Exception raised, RecordNotFound
end
end
But let’s say you lived to fight another day, you learned your lesson, untangled that 3rd party code you don’t own, and now you’re sure that the deferred function call happens after the transaction successfully commits. Problem solved, right?
Storage/Broker consistency
So you’re committing a database transaction to fully store the state of your business transaction, and then you’re invoking the “defer function” routine, which will push the serialized state into your broker:
foo = ActiveRecord::Base.transaction do
Foo.create(bar: 1)
end
ActivateJob.perform_async(foo)
What if there’s an outage between the transaction committing and the job being enqueued? It’s terrible, given that your “jobs to be done” will probably be silently lost.
Such a conundrum is only possible to avoid if the database and the broker are protected by the same transaction guarantees, i.e. if the broker is the same database where your business resources are stored. From the background job alternatives mentioned above, only delayed_job fits the bill, given that the queue is a database table. Everything else (yes, including sidekiq) is vulnerable to this problem.
This has been discussed at length in this 2017 blog post.
Transactional outbox pattern
While the description of the problem above mostly focuses on the background jobs ruby frameworks use-case, the same type of problem happens if your business transaction requires to perform some rpc call (HTTP, GRPC) to a separate system, which happens a lot if you’re using microservices.
A solution for this general problem was formalized in the transaction outbox pattern. The gist of it is, business transactions store their “events to be emitted” in a separate database table (typically called “outbox”) within the same database transaction. This in itself ensures that the events associated with the business resources will always be stored if the resources are stored successfully. Then there is a separate worker (thread in same process, separate process…) reading entries from the “outbox” table, and doing the actual publishing of the event (or enqueuing of the job) before deleting the entry.
tobox
So what is tobox again? In a nutshell, it’s a “transactional outbox” framework.
I built it because I needed its properties, and I couldn’t find a transactional outbox implementation for any programming language, just blog posts on how to hypothetically do your own.
The DSL is declarative and “event-based”, which means that one can register handlers bound to specific events:
# this is the config DSL
# tobox.rb
on("order_processed") do |event|
Payment::Start.call(event)
end
on("order_cancelled") do |event|
CustomerSupport::Notify.call(event)
end
# if handling multiple events
on("order_processed", "order_cancelled") do |event|
Logs::Order.call(event)
end
### app/handlers/payment_start.rb
module Payment::Start
module_function
def call(event)
data = event[:after]
# do something with the event data hash, perhaps enqueue it as a background job?
end
end
An entry point script is also provided, to start a separate process to consume events from the outbox
table:
> tobox -r ./app.rb —config tobox.rb
# if you’re using rails
> tobox -r ./config/environment.rb —config tobox-dsl.rb
In the process, it handles the complexity of the “plumbing” involved in building a transactional outbox consumer, using a set of conventions and tricks:
Thread and Fiber-based worker pools
tobox, by default, uses threads to handle many events at the same time in the same process, just like sidekiq’s. You can tweak the number of threads in the config:
# tobox.rb
concurrency 25
You can, however, switch to using fibers instead of threads, if your event handling is very IO-bound (if you’re just relaying the events to SNS, it is):
# tobox.rb
worker :fiber # :thread by default
concurrency 100 # max 100 fibers running at the same time
(This requires using the fiber_scheduler gem).
SKIP LOCKED
When enabling multiple consumers for a given queue, one has to have the guarantee that a given event won’t be processed more than once by separate workers at the same time. One way to achieve that using the database is by locking the row where the event is stored, and delete it after it has been handled. However, if two workers try to lock the same row, one of them will remain idle, instead of picking up the next available event.
While the database row-level locking model wasn’t built to support the queue use-case, some recent features were added to some of the most popular database engines to accommodate it. One of these features is the SKIP LOCKED clause, a non-standard SQL clauuse which can be used with SELECT …. FOR UPDATE
, and will result in already locked rows being ignored (“skipped”) by the SELECT
statement.
This feature is core to how tobox works, which is why it only supports databases including the SKIP LOCKED
feature.
(Supporting this many databases is only possible thanks to the sequel gem, by the way).
Plugin DSL
tobox
ships with a simple plugin system which supports intercepting handlers before and after they’re handled (or error out). It’s the foundation of a few plugins which already ship with the gem:
# tobox.rb
plugin(:zeitwerk)
plugin(:datadog)
plugin(:sentry)
multilang support
Until now, I didn’t show how to insert events into the queue. That’s because, for now, tobox
does not provide any DSL for it. The reason is, working with database objects is probably already such a big part of your day-to-day work, that moving that concern into a 3rd party gem may end up having more drawbacks than benefits. Moreover, perhaps this way it’s clear that you can use a transactional outbox even if your application is not made in ruby.
For instance, here are several examples of how to write an event into the outbox:
ruby
# using sequel dataset API
DB[:outbox].insert(type: "order_created", data_after: to_json(order))
# or an ActiveRecord model
OutboxEvent.create(type: "order_created", data_after: to_json(order))
python
# SQLAlchemy
event = OutboxEvent(type="order_created", data_after=to_json(order))
db.session.add(event)
elixir
OutboxRepo.insert %OutboxEvent{
type: "order_created",
data_after=to_json(order)
} do …
database triggers
There are also other ways to “go implicit”, if that fits your use-case. One way you can do it is by using database triggers, such as this postgresql example:
CREATE OR REPLACE FUNCTION order_created_outbox_event()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
BEGIN
INSERT INTO outbox(event_type, data_after)
VALUES('order_created', row_to_json(NEW.*));
RETURN NEW;
END;
$$
CREATE TRIGGER order_created_outbox_event
AFTER INSERT
ON orders
FOR EACH ROW
EXECUTE PROCEDURE order_created_outbox_event();
Conclusion
tobox
is a lightweight tool that you can use to ensure robustness and guarantee at-least-once semantics in your business workflows with little to no performance impact. It’s therefore not a silver bullet: it trades off some E2E latency (the extra step of putting and taking the event from the database) to achieve that robustness.
While it may “quack” like a background job framework, it is not designed to be one. Its features (do check the README) are more focused on the transactional outbox use-case, so if you require background job features, you should use tobox
alongside such a framework.
The declarative DSL is a departure from the current “standard” for background jobs, IMO leaner, and eliminates the antipattern of creating a job class, only to call some other service object in the #perform
method.
Some edges are still rough, and some features are still missing (no web dashboard yet, for example). But it already does “one thing well”, so that’s the 80% right there.