Aggregating data for analytics using postgresql, ruby and sequel

At my dayjob, I’ve been working, for the most of this year of our lord 2022, in a team taking this new flagship product from “alpha” to “general availability”. With products in such an early stage, you don’t know a lot of things: what your users want, how they will use (or you want them to use) the platform, whether the thing you’re building is as valuable as you think it is. In such a stage, the most important skill you should have, as a team building and maintaining a product, is to be able to ship features quickly; the sooner you know what “sticks” with your userbase, the sooner you’ll know how worthwhile will it be to improve it, whether to “pivot” to something else, or whether you’re better off throwing it all away.

How you get to “quickly” is usually a combination of a few factors: team skill, project scope, and a healthy dose of pragmatism. Shipping quickly means refusing perfect solutions; it’s focusing about “0 to 1” before you consider “1 to 100”; it means “good enough” first. And you have to be comfortable absorbing the right amount of “tech debt”. Why the right amount? Because there’s a very big chance that the thing you’re building quickly now is going to be the thing you’ll be maintaining in 1 year (no matter how much product calls it “throw-away POC” or your engineering manager tells you will be able to rewrite it in kubernetes when it “wins”). Given enough experience, you learn how to compartimentalize debt in a way that it doesn’t “leak” too much into other sections of the codebase. You learn how to leverage these limitations, and reuse it to solve other problems. You learn how to do some forecasting, and ask yourself some questions such as “can the way this feature was architected survive the next 3 months, or the next 3 years? How long can this reasonably hold in production until something entirely different needs to be considered?”.

After the initial core features, the next thing we knew was going to be very valuable for our customer base, was providing customer-facing analytics dashboards. The product team wasn’t sure exactly how that would look like, and constantly debated how “can we learn from our users quickly”. A team of engineers was assigned with the task of scoping the technical aspects of the project, and they went about designing a full-fledged “analytics pipeline”, with some of the most “state-of-the-art” technologies, such as Spark, flink or openwhisk, to solve not only the immediate product’s analytics needs, but also aggregating analytics data for all the other products and services from the company. With that, scope only grew, and needless to say, none of this was going to be ready in 2 weeks. Or 2 months. No “quickly”.

The plan of building an analytics pipeline could have other theoretical compound benefits for the company, but it’d take more than 6 months to ship. That’s quite risky, considering company priorities change all the time, and in times of economic uncertainty (hello 2023!), long-running costly projects are quickly thrown into the doghouse when they do not generate immediate revenue, and kept there until the good times roll again. So it could take years, in reality.

I proposed another approach for a shortcut: we could provide a couple of API endpoints for querying data aggregations, around which some dashboards could be built; the analytics team could then focus on building those dashboards immediately, while designing the long term analytics pipeline for when this proposed solution would not scale anymore. When I mentioned this could be shipped in about 2 weeks, everyone was sold on the idea.

So the question was, how could we deliver something in 2 weeks, that would not simply fall off the rails in 2 months, and could potentially still be operational 2 years from now, at a reasonable scale, if need be?

Data

Without going too much into detail, the product revolves around allowing customers defining user journeys, and costumer users running them. This is what an over-simplification of the database would look like:

CREATE TABLE definitions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    client_id UUID,
    # ....
);

CREATE TABLE journeys (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    client_id UUID,
    client_user_id UUID,
    definition_id UUID,
    status varchar(255), -- "pass", "fail", "review"
    error_code varchar(255), -- "network_error", "file_error" ...
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(),
    CONSTRAINT fk_definition FOREIGN KEY(definition_id) REFERENCES definitions(id)
);

The initial requirements revolved around fetching, for example, how many journeys were in-progress, percentage of completed/cancelled, average, or max or median duration, all this for time intervals. The data could be fully aggregated or time-series split (initially, by day).

Planning

Querying product data tables directly was not an option; the required queries would be very complex, and require all kinds of indexes, which would affect write performance; and even if that would be done, certain client accounts volume could render all those optimizations uselss, and even moreso if the requested time intervals stretched more into the past.

We decided it’d be better to pre-aggregate data in a separate table. It’d be aggregated by day, as this would be the requested time range minimal unit (“today”, “last 15 days”…). And we are using Postgresql, so using something like table partitioning as time passes and data grows, gave us enough confidence that this solution could scale well, in case something better never came along.

So there were two things to be done: aggregate data, and serve it via the API.

Aggregating

Aggregation was to be done on the fly. While bulk-aggregating it in cron jobs was certainly possible, we would like to serve data as fresh as possible, as the default time interval would be “current day”. UTC was to be used everywhere.

This was to be done using two pieces: one of them would be tobox, a transactional outbox framework I was developing at the time, and which I was already considering integrating to solve other issues in our architecture (which will deserve its own post), and sequel, the best ORM/database toolkit you can find in any stack. Period.

The analytics table was created as per the following sequel migration:

Sequel.migration do
  up do
    create_table?(:journeys_analytics_daily) do
      column :client_id, :uuid, nullable: false
      column :journey_id, :uuid, nullable: false
      column :definition_id, :uuid, nullable: false
      column :day, :date, nullable: false
      column :started_count, :integer, nullable: false, default: 0
      column :completed_count, :integer, nullable: false, default: 0
      column :cancelled_count, :integer, nullable: false, default: 0
      column :status_count, :jsonb, nullable: false, default: Sequel.pg_jsonb_wrap({})
      column :error_code_count, :jsonb, nullable: false, default: Sequel.pg_jsonb_wrap({})
      column :min_duration, :integer, nullable: true, default: 0
      column :max_duration, :integer, nullable: true, default: 0
      column :avg_duration, :integer, nullable: true, default: 0
      column :median_duration, :integer, nullable: true, default: 0
      primary_key %i[client_id journey_id definition_id day]
    end
    # ...

Why were status and error_code data aggregated into JSONB columns, and not have each possible value be mapped into its own column? The reason was that new statuses and error codes would eventually be defined, which would then cause new columns to be added/removed, therefore requiring schema changes. JSONB could satisfy the same requirements without the need for it, with a bit of postgres “JSON function-fu”.

In tobox, two handlers were subscribed to the “journey started” and “journey finished” outbox events:

# tobox config fifle
on("journey_started") do |event|
  Aggregation::JourneyStartedEvent.call(event)
end
on("journey_finished") do |event|
  Aggregation::JourneyFinishedEvent.call(event)
end

# Event structure:
# {
#   "event" => "journey_finished" # or "journey_started"
#   "event_id" => "e30aedaa-8eba-462c-b2c8-086b5c6ee824",
#   "emitted_at" => "2022-12-24T00:00:00Z",
#   "client_id" => "4cffca63-7f1f-48b6-a8bc-5b39b515d854",
#   "journey_id" =>"87536aa8-de39-4428-9567-5824287111ff",
#   "definition_id" => "9ee3d3b9-2930-4212-bbdb-ef4e5852bde4",
#   "status" => "pass" # or "fail", "review", "drop", "delay"...
#   "error_code" => nil # or "network_error", "file_error", etc...
#   "created_at" => "2022-12-23T00:00:00Z",
#   "updated_at" => "2022-12-24T00:00:00Z",
# }

The handlers would then use the data sent in the event to craft an SQL query which would atomically increment counter columns and duration calculations.

Events could be processed “out of order”, and a “started” event could receive the respective “finished” event the following day. UPSERTs could help manage that.

Atomic counter increments for integer columns were a no brainer. But how could this work for JSONB columns? The solution is a combination of Postgresql JSONB functions. Let’s look at the sequel code first:

TABLE_NAME = :journeys_analytics_daily
# Aggregation::JourneyStartedEvent
def call(event)
  event_time = Time.parse(event["emitted_at"])

  DB[TABLE_NAME].insert_conflict(
    constraint: :journeys_analytics_daily_pkey,
    update: { started_count: Sequel[TABLE_NAME][:started_count] + 1}
  ).insert(
    client_id: data["client_id"],
    journey_id: data["journey_id"],
    definition_id: data["definition_id"],
    day: event_time.strftime("%Y-%m-%d"),
    started_count: 1,
  )
end


# Aggregation::JourneyCompletedEvent
def call(event)
  event_time = Time.parse(event["emitted_at"])

  insert_args = {
    client_id: data["client_id"],
    journey_id: data["journey_id"],
    definition_id: data["definition_id"],
    day: event_time.strftime("%Y-%m-%d"),
  }

  update_args = {}

  if error_code = event["error_code"]
    # journeys with errors aren't accounted for in duration metrics
    insert_args[:cancelled_count] = 1
    update_args[:cancelled_count] = Sequel[TABLE_NAME][:cancelled_count] + 1

    error_code_column = Sequel[:error_code_count].pg_jsonb
    insert_args[:error_code_count] = Sequel.pg_json_wrap({error_code => 1})
    update_args[:error_code_count] = error_code_column.set(
      "{#{error_code}})",
      (Sequel.function(:coalesce, error_code_column[error_code], "0").cast_numeric + 1).cast(:text).cast(:jsonb),
      true
    )
  else
    insert_args[:completed_count] = 1
    update_args[:completed_count] = Sequel[TABLE_NAME][:completed_count] + 1

    status = event["status"]
    status_column = Sequel[:status_count].pg_jsonb
    insert_args[:status_count] = Sequel.pg_json_wrap({status => 1})
    update_args[:status_count] = status_column.set(
      "{#{status}})",
      (Sequel.function(:coalesce, status_column[status], "0").cast_numeric + 1).cast(:text).cast(:jsonb),
      true
    )

    # duration
    duration = (Time.parse(event["updated_at"]) - Time.parse(event["created_at"])).to_i
    insert_args[:min_duration] = insert_args[:max_duration] = insert_args[:avg_duration] = insert_args[:median_duration] = duration

    update_args[:min_duration] = Sequel.function(:least, Sequel[TABLE_NAME][:min_duration],  Sequel[:excluded][:min_duration])
    update_args[:max_duration] = Sequel.function(:greatest, Sequel[TABLE_NAME][:max_duration],  Sequel[:excluded][:max_duration])
    update_args[:avg_duration] = (
      ( (Sequel[TABLE_NAME][:avg_duration] * Sequel[TABLE_NAME][:completed_count]) + Sequel[:excluded][:avg_duration])
      / (Sequel[TABLE_NAME][:completed_count] + 1)
    )

    # median calculation is a bit more involved and requires a query to product data
    update_args[:median_duration] = DB[:journeys].where(
      :client_id => data["client_id"],
      :journey_id => data["journey_id"],
      :definition_id => data["definition_id"],
      :error_code => nil
    ).where(Sequel.cast(Sequel[:journeys][:updated_at], :date) => event_time.strftime("%Y-%m-%d"))
      .select(
        Sequel.function(:coalesce,
          Sequel.function(:percentile_cont, 0.5)
            .within_group(Sequel.extract(:epoch, Sequel[:journeys][:updated_at] - Sequel[:journeys][:created_at])),
          0)
      )
  end

  DB[TABLE_NAME].insert_conflict(
    constraint: :journeys_analytics_daily_pkey,
    update: update_args
  ).insert(insert_args)
end

These generate the following SQL:

-- for journey started
INSERT INTO "journeys_analytics_daily"
("client_id", "journey_id", "definition_id", "day", "started_count")
VALUES (
  '4cffca63-7f1f-48b6-a8bc-5b39b515d854',
  'e30aedaa-8eba-462c-b2c8-086b5c6ee824',
  '9ee3d3b9-2930-4212-bbdb-ef4e5852bde4',
  '2022-12-23',
  1
)
ON CONFLICT ON CONSTRAINT "journeys_analytics_daily_pkey"
DO UPDATE SET "started_count" = ("journeys_analytics_daily"."started_count" + 1) RETURNING "client_id"
-- for journey completed with errors
INSERT INTO "journeys_analytics_daily"
("client_id", "journey_id", "definition_id", "day", "cancelled_count", "cancelled_count")
 VALUES (
  '4cffca63-7f1f-48b6-a8bc-5b39b515d854',
  'e30aedaa-8eba-462c-b2c8-086b5c6ee824',
  '9ee3d3b9-2930-4212-bbdb-ef4e5852bde4',
  '2022-12-23',
  1,
  '{"network_error":1}'::json
)
 ON CONFLICT ON CONSTRAINT "journeys_analytics_daily_pkey"
 DO UPDATE SET
  "cancelled_count" = ("journeys_analytics_daily"."cancelled_count" + 1),
  "error_code_count" = jsonb_set(
    "journeys_analytics_daily"."error_code_count",
    '{network_error}',
    CAST(CAST((CAST(coalesce(("journeys_analytics_daily"."error_code_count" -> 'network_error'), '0') AS integer) + 1) AS text) AS jsonb),
    true
  ) RETURNING "client_id"
-- for journey completed successfully
INSERT INTO "journeys_analytics_daily"
("client_id", "journey_id", "definition_id", "day", "completed_count", "state_count", "min_duration", "max_duration", "avg_duration", "median_duration")
VALUES (
  '4cffca63-7f1f-48b6-a8bc-5b39b515d854',
  'e30aedaa-8eba-462c-b2c8-086b5c6ee824',
  '9ee3d3b9-2930-4212-bbdb-ef4e5852bde4',
  '2022-12-24',
  1,
  '{"pass":1}'::json,
  3600,
  3600,
  3600,
  3600
)
ON CONFLICT ON CONSTRAINT "journeys_analytics_daily_pkey"
DO UPDATE SET
  "completed_count" = ("journeys_analytics_daily"."completed_count" + 1),
  "state_count" = jsonb_set(
    "journeys_analytics_daily"."state_count",
    '{pass}',
    CAST(CAST((CAST(coalesce(("journeys_analytics_daily"."state_count" -> 'pass'), '0') AS integer) + 1) AS text) AS jsonb),
    true
  ),
  "min_duration" = least("journeys_analytics_daily"."min_duration", "excluded"."min_duration"),
  "max_duration" = greatest("journeys_analytics_daily"."max_duration", "excluded"."max_duration"),
  "avg_duration" = ((("journeys_analytics_daily"."avg_duration" * "journeys_analytics_daily"."completed_count") + "excluded"."avg_duration") / ("journeys_analytics_daily"."completed_count" + 1)),
  "median_duration" = (
    SELECT percentile_cont(0.5) WITHIN GROUP (
      ORDER BY extract(
        epoch FROM ("journeys"."updated_at" - "journeys"."created_at"))
      ) FROM "journeys"
        WHERE (
          ("client_id" = '4cffca63-7f1f-48b6-a8bc-5b39b515d854') AND
          ("journey_id" = 'e30aedaa-8eba-462c-b2c8-086b5c6ee824') AND
          ("definition_id" = '9ee3d3b9-2930-4212-bbdb-ef4e5852bde4') AND
          ("error_code" IS NULL) AND
          (CAST("journeys"."updated_at" AS date) = '2022-12-24'))) RETURNING "client_id"

That’s quite a lot of complex sequel and SQL. Let’s digest the hardest parts:

  • data is aggregated by day, which is achieved by including the date in the primary key; this will become the constraint on which ON CONFLICT DO UPDATE works.
  • “counter columns”, which are initiated “on insert” and incremented “on update”, use SQL atomic increments, in the form of queries such as "completed_count" = ("journeys_analytics_daily"."completed_count" + 1); in this way, there is no need to manage exclusive access for updating rows via techniques such as SELECT FOR UPDATE.
  • the jsonb “counter columns” use a variation of the same technique, however they require some specialization, via the usage of the jsonb_set postgresql function; given that the initial value for a given status/error code may not be present, usage of the “coalesce” function is used to establish a default; what happens afterwards is the operation sequence “convert to integer -> increment -> convert to text -> convert to jsonb”, which requires more overhead than the tradicional integer column increments, but still works without explicit locks and multiple SQL statements.
  • calculating the average on the fly can be described as “multiple current average duration by total number of evaluated journeys, add ingested duration, divide by total + 1”.
  • median duration is calculation by using the technique described here.

And with that, we can start ingesting analytics data.

Querying

Using your framework of choice, it’s only a matter of what to query. The request interface could be handled by something like roda, which takes care of parsing request parameters, and JSON-encoding the analytics data in the response:

class App < Roda
  plugin :json_parser
  plugin :jsons

  route do |r|
    client_id = read_client_id_from_session
    r.is "journey-analytics" do
    # GET /analytics request
      r.get do
        # data fetching delegated to separate module
        query = JourneyAnalyticsQuery.call(
          client_id,
          request.params["journey_id"],
          request.params["definition_id"],
          request.params["after"],
          request.params["before"],
        )

        data = apply_pagination(query)
        json_serialize(data)
      end
    end
  end
end

The actual querying can be handled by a separate module, which takes care of picking up the table we’ve been ingesting data into, and applies the filters as per the parameters the client sent in the request.

# lib/journey_analytics_query.rb

module JourneyAnalyticsQuery
  module_function

  def call(
    client_id,
    journey_id = nil,
    definition_id = nil,
    after = nil,
    before = nil
  )
    query = DB[:journeys_analytics_daily]

    query = query.where(journey_id: journey_id) if journey_id
    query = query.where(definition_id: definition_id) if definition_id
    query = query.where(Sequel.expr(Sequel[:journeys_analytics_daily][:date] => Time.parse(after)) if after
    query = query.where(Sequel.expr(Sequel[:journeys_analytics_daily][:date] <= Time.parse(before)) if before

    # COUNTERS
    #
    # aggregate sum of normalized columns
    selectors = %wi[started completed cancelled].map do |key|
      DB[:journeys_analytics_daily].sum(:"#{key}_count").as(:"#{key}_count")
    end

    # aggregate sum of denormalized values
    #
    # this expects the full set of values to be stored in static variables
    status_column = Sequel[:status_count].pg_jsonb
    selectors += STATUSES.map do |status|
      Sequel.function(:sum, Sequel.function(:coalesce, status_column[status].cast(:integer), 0)).as(:"status_#{status}_count")
    end
    # this expects the full set of values to be stored in static variables
    error_code_column = Sequel[:error_code_count].pg_jsonb
    selectors += ERROR_CODES.map do |error_code|
      Sequel.function(:sum, Sequel.function(:coalesce, error_code_column[error_code].cast(:integer), 0)).as(:"error_code_#{error_code}_count")
    end

    # DURATION
    #
    selectors += %i[min max avg].map do |agg|
      Sequel.function(agg, "#{agg}_duration").as("#{agg}_duration")
    end

    selectors << Sequel.func(:percentile_cont).within_group(:median_duration).as("median_duration")

    query.select(selectors).reverse(:day)
  end
end

This generates queries such as:

SELECT
  SUM(journeys_analytics_daily.started_count) AS started_count,
  SUM(journeys_analytics_daily.completed_count) AS completed_count,
  SUM(journeys_analytics_daily.cancelled_count) AS cancelled_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.state_count -> 'pass' AS INTEGER), 0) AS status_pass_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.state_count -> 'fail' AS INTEGER), 0) AS status_fail_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.state_count -> 'review' AS INTEGER), 0) AS status_review_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.state_count -> 'drop' AS INTEGER), 0) AS status_drop_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.error_code_count -> 'network_error' AS INTEGER), 0) AS error_code_network_error_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.error_code_count -> 'file_error' AS INTEGER), 0) AS error_code_file_error_count,
  SUM(COALESCE(CAST(journeys_analytics_daily.error_code_count -> 'mailroom_error' AS INTEGER), 0) AS error_code_mailroom_error_count,
  MIN(journeys_analytics_daily.min_duration) AS min_duration,
  MAX(journeys_analytics_daily.max_duration) AS max_duration,
  AVG(journeys_analytics_daily.avg_duration) AS avg_duration,
  PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY journeys_analytics_daily.median_duration) AS median_duration
FROM journeys_analytics_daily
WHERE
  workflow_runs_analytics_daily.client_id = '4cffca63-7f1f-48b6-a8bc-5b39b515d854' AND
  workflow_runs_analytics_daily.journey_id IN '87536aa8-de39-4428-9567-5824287111ff' AND
  -- and so on

Time-series

One thing you may want to show your customers is the progress over time. If your metric is “per-day”, the data’s already aggregated by day! One easy way to accomplish it is then to set a "by" parameter, and allow "day", or even "definition_id" (if you’d rather want to show statistics by definition) as possible values:

# api
route do |r|
  client_id = read_client_id_from_session
  r.is "journey-analytics" do
  # GET /analytics request
    r.get do
      # data fetching delegated to separate module
      JourneyAnalyticsQuery.call(
        client_id,
        request.params["journey_id"],
        request.params["definition_id"],
        request.params["by"],
      )
    end
  end
end

# module
module JourneyAnalyticsQuery
  module_function

  def call(
    client_id,
    journey_id = nil,
    definition_id = nil,
    before = nil.
    afer = nil,
    by = nil, # or ["day", "definition_id"]
  )
    query = DB[:journeys_analytics_daily]

    # ...

    query = query.group_by(*by.map(&:to_sym)) if by

    # ...
  end
end

this will apply a GROUP BY clause to the query above, generating a distribution per row of the select grouping keys.

With such an endpoint, you can start creating a few useful dashboards and features!

Going forward

If you manage to get here, congratulations! Now go do that MVP!

I hope this post shows how powerful the ruby/sequel/postgresql combo is, and how much adaptability it provides as your requirements change. This is, after all, the foundation on top of which you’ll build everything else.

And now it’s up to you to decide what to do next: is “by day” too big of an aggregation interval? You can adjust the aggregation time index interval. You can, i.e. choose to aggregate per hour; or you can use the same strategy to aggregate to separate tables strategically and aggregate, i.e. hourly, daily and/or weekly, thereby ensuring performance of your queries according to the desired range. You can ingest to one table, and ingest “indirectly” into the other by using database triggers; or you can aggregate periodically using cronjobs, if you don’t need “soft real time”.

In time, Postgres range partitioning can further help you keeping your queries performing responsively. You can then follow the instructions of this blog post, which explains how to do range partitioning using sequel, which is just another example of these two technologies working in harmony.

And when none of that works anymore, time to build the spaceship. Hope you made some money by then!