Enumerable IO Streams
30 Oct 2019I’ve been recently working on CSV generation with ruby in my day job, in order to solve a bottleneck we found because of a DB table, whose number or rows grew too large for the infrastructure to handle our poorly optimized code. This led me in a journey of discovery on how to use and play with raw ruby APIs to solve a complex problem.
The problem
So, let’s say we have a User
ActiveRecord class, and our routine looks like this:
class UsersCSV
def initialize(users)
@users = users
end
def generate
CSV.generate(force_quotes: true) do |csv|
csv << %w[id name address has_dog]
@users.find_each do |user|
csv << [
user.id,
user.name,
user.address.to_addr,
user.dog.present?
]
end
end
end
payload = UsersCSV.new(User.relevant_for_this_csv).generate
aws_bucket.upload(body: StringIO(payload))
The first thing you might ask is “why are you not using sequel”. That is a valid question, but for the purpose of this article, let’s assume we’re stuck with active record (we really kind of are).
The second might be “dude, that address seems to be a relationship, isn’t that a classic N+1 no-brainer?”. It kind of is, and good for you to notice, I’ll get back to it later.
But the third thing is “dude, what happens if you have like, a million users, and you’re generating a CSV for all of them?”. And touchè! That’s what I wanted you to focus on.
You see, this example is a standard example you find all over the internet on how to generate CSV data using the csv
standard library, so it’s not like I’m doing something out of the ordinary.
I could rewrite the generation to use CSV.open("path/to/file", "wb")
to pipe the data to a file, however if I can send the data to the AWS bucket in chunks, why can’t I just pipe it as I generate? There are many ways to do this, but this put me to think, and I came up with an solution using the Enumerable
module.
Enumerable to the rescue
I’ll change my code to enumerate the CSV rows as they’re generated:
class UsersCSV
include Enumerable
def initialize(users)
@users = users
end
def each
yield line(%w[id name address has_dog])
@users.find_each do |user|
yield line([
user.id,
user.name,
user.address.to_addr,
user.dog.present?
])
end
end
private
def line(row)
CSV.generate(row, force_quotes: true)
end
end
# I can eager-load the payload
payload = UsersCSV.new(User.relevant_for_this_csv).to_a.join
# you can select line by line
csv = UsersCSV.new(User.relevant_for_this_csv).each
headers = csv.next
first_row = csv.next
#...
But this by itself doesn’t solve my issue. If you look at the first example, specifically the line:
aws_bucket.upload(body: StringIO(payload))
I’m wrapping the payload in a StringIO. that’s because my aws client expects an IO interface. And Enumerables aren’t IOs.
The IO interface
An IO-like object must implement a few methods to be usable by certain functions which expect the IO interface. In other more-ruby-words, it must “quack like an IO”. And how does an IO quack? Here are a few examples:
- An IO reader must implement
#read(size, buffer)
- An IO writer must implement
#write(data)
- A duplex IO must implement both
- A closable IO must implement
eof?
and#close
- A rewindable socket must implement
#rewind
- IO wrappers must implement
#to_io
You know some of ruby’s classes which implement a few (some, all) of these APIs: File
, TCPSocket
, and the aforementioned StringIO
.
A few ruby APIs expect arguments which implement the IO interface, but aren’t necessarily instances of IO.
IO.select
can be passed IO wrappersIO.copy_stream(src, dst)
, takes an IO reader and an IO writer as arguments.
Enter Enumerable IO
So, what if our csv generator can turn itself into a readable IO?
I could deal with this behaviour directly in my routine, but I’d argue that this should be a feature provided by Enumerable
, i.e. an enumerable could also be cast into an IO. The expectation is risky: the yield-able data must be strings, for example. But for now, I’ll just monkey-patch the Enumerable
module:
# practical example of a feature proposed to ruby core:
# https://bugs.ruby-lang.org/issues/15549
module Enumerable
def to_readable_stream
Reader.new(self, size)
end
class Reader
attr_reader :bytesize
def initialize(enum, size = nil)
@enum = enum
@bytesize = size
@buffer = "".b
end
def read(bytes, buffer = nil)
@iterator ||= @enum.each
buffer ||= @buffer
buffer.clear
if @rest
buffer << @rest
@rest.clear
end
while buffer.bytesize < bytes
begin
buffer << @iterator.next
rescue StopIteration
return if buffer.empty?
break
end
end
@rest = buffer.slice!(bytes..-1)
buffer
end
end
end
With this extension, I can do the following:
csv = UsersCSV.new(User.relevant_scope_for_this_csv).to_readable_stream
aws_bucket.upload(body: csv)
And voilà! Enumerable and IO APIs for the win!
Using this solution, there’s a performance benefit while using clean ruby APIs.
The main performance benefit is, the payload doesn’t need to be all kept in memory til all the CSV is generated, so we get constant memory usage (in our case, this leak was exacerbated by that N+1 problem; the more you wait for the rows, the longer the csv payload was being retained).
Caveat
Depending of what you’re using to upload the file, you might still need to buffer first to a file; at work, we use fog
to manage our S3 uploads, which requires IO-like request bodies to implement rewind
, therefore the easy way out is to buffer to a tempfile first:
csv = UsersCSV.new(User.relevant_scope_for_this_csv).to_readable_stream
file = Tempfile.new
IO.copy_stream(csv, file)
file.rewind
fog_wrapper.upload(file)
Conclusion
There are many ways to skin this cat, but I argue that this way is the easiest tom maintain: you can tell any developer that their CSV/XML/${insert format here} generator must implement #each
and yield formatted lines, and then you just have to pass it to your uploader. You ensure that the payload will not grow linearly, and no one will ever have to read another tutorial on “How to write CSV files in ruby” ever again.
This doesn’t mean that all of our problems are solved: as the number of records grows, so does the time needed to generate it. And it will become a bottleneck. So how can you guarantee that the time needed to generate the date won’t derail?
I’ll let you know when I have the answer.