This is a brief exploration of using Ruby to work with streams and pipelines for processing data.
You've already used these concepts if you've ever done anything like this in a shell:
# find all unique people named "robert" in a file containing
# class enrollments data and display a count
grep "robert" class_enrollments.txt | awk '{print $1}' | uniq | wc -l
The usefulness of this model is best understood in contrast to the "ad-hoc" way in which data processing scripts are often written. Such scripts have the following problems:
- they often load entire sets of data into memory at once, which isn't scalable to large data sets
- they iterate multiple times over data, which is inefficient
- complex interrelationships among its parts force you to understand everything, rather than being able to isolate pieces; this is hard to maintain
- they are difficult to parallelize to improve run times
Because streams and pipelines work sequentially, they address these issues:
- only one record at a time is loaded into memory (usually)
- data passes through the pipeline only once, which is simple to understand
- pipelines make it easier to grasp the data flow and troubleshoot isolated parts without needing to understand everything
- easier to parallelize (at least for operations that don't involve state)
Here's a super simple example of typical iteration in Ruby. We start with a range of numbers from 1 to 5, add 10 to each one, filtering on even ones, and printing the results.
(1..5).map { |i|
puts "adding 10 to #{i}"
i + 10
}.
select { |i|
puts "filtering #{i} on evenness"
i % 2 == 0
}.
each { |i|
puts "in each: #{i}"
}
The output:
adding 10 to 1
adding 10 to 2
adding 10 to 3
adding 10 to 4
adding 10 to 5
filtering 11 on evenness
filtering 12 on evenness
filtering 13 on evenness
filtering 14 on evenness
filtering 15 on evenness
in each: 12
in each: 14
No surprises here. All the data is processed with each method call. #map
produces a 5-item Array in memory. #select
produces a 2-item Array in memory. Then #each
iterates over that 2-item Array to print out the values.
Now here's the same program using a stream of objects. The single difference from the code above is the #lazy
method added to the beginning of the method call chain (we'll explain this below).
(1..5).lazy.map { |i|
puts "adding 10 to #{i}"
i + 10
}.
select { |i|
puts "filtering #{i} on evenness"
i % 2 == 0
}.
each { |i|
puts "in each: #{i}"
}
Now the output is different:
adding 10 to 1
filtering 11 on evenness
adding 10 to 2
filtering 12 on evenness
in each: 12
adding 10 to 3
filtering 13 on evenness
adding 10 to 4
filtering 14 on evenness
in each: 14
adding 10 to 5
filtering 15 on evenness
(1..5).lazy
doesn't return a data structure, but a stream of integers. Each one is processed one at a time through the pipeline. This means we are never storing more than one integer in memory at a time. There are no intermediate data structures, such as Arrays, that are generated; only a "chain" of lazy enumerables that comprise the stream.
In non-trivial cases involving large sets of data and lots of operations, streams are better at optimizing for both space and time: they minimize the amount of memory used, and make it easy to parallelize operations for faster performance.
See the demo.rb
and run.sh
scripts for an illustration of the difference in memory usage.
In a nutshell, lazy Enumerable
objects are how Ruby lets us handle streams of data and write pipelines for processing them.
Ruby's Enumerable module is used everywhere in the stdlib where you need to enumerate things. The only requirement for a class to be an Enumerable
is that it should implement #each
. An Enumerable
provides a LOT of powerful operations on top of #each
, including:
map/collect
: to transform or mutate-in-place itemsselect/reject
: filter itemscycle
: calls a block N times, for each itemdrop
: drops first N items, returning the restdrop_while
: drops elements up to 1st item for which block is true, returning the resttake
: returns first N itemstake_while
: returns elements until block is false for an itemzip
: zip together items from passed-in argsinject/reduce
(a.k.a. fold): combines elements, storing result in an accumulator- and a bunch more...
Lazy enumerables were added in Ruby 2.0.0. Laziness means that items are returned on demand, one at a time. This makes enumerables behave like streams. Unlike most non-lazy enumerables, a stream can only be consumed ONCE.
There is an Enumerable::Lazy
module, and Enumerable
has a #lazy
method to make an existing Enumerable
instance into a lazy one. Many of the above operations on an Enumerable::Lazy
object return an Enumerable::Lazy
object in turn, making it possible to chain operations together to construct a pipeline.
A lazy enumerable doesn't do anything when it is constructed. Try this in irb:
# This does NOT print anything.
x = (1..5).lazy.map { |i|
puts "adding 10 to #{i}"
i + 10
}
# We have to force it to evaluate, in order to see the output from the puts commands:
x.force
Lazy enumerables have to be evaluated, either by calling the #force
method or using one of the methods that forces evaluation, such as #each
or #reduce
.
An important consideration: if #force
evaluates a lazy enumerable into a large array, it will take up a lot of memory, so be careful. Most of the time, you should probably use #each
instead at the end of the pipeline, and deal with each object one at a time (usually storing it to a file or database, or printing it to stdout).
Additionally, you can use the Enumerator and Enumerator::Lazy
classes to create enumerables on-the-fly.
But not every complex data processing problem can be expressed in terms of a single pipeline.
Splitting and merging: Enumerable
doesn't support splitting (i.e. teeing) and merging out of the box. You need to roll your own solution or find a gem to do this.
Storing results: You might have to write the result of a pipeline to a file, before using that data in another pipeline. For example, you need to do this when sorting, or when using the data in several operations in another pipeline.
Stateful operations: Operations may also need to store state in auxiliary objects. For example, an operation that counts the number of records that fall into various buckets would store that information outside the stream.
You can imagine abstracting data sources, individual operations, and entire pipelines.
Data sources can be a database, a flat text file, a CSV file, and even an XML file. Laziness can be achieved by reading one line or record at a time from a file, or fetching one record at a time from a database resultset.
It's possible to build a library of reusable, composable operations for the data you're working with. This also makes the operations easier to understand, unit-test, troubleshoot, since they are highly compartmentalized.
Treating an entire pipeline as an abstraction, similar to how you might use a script file to store piped-together UNIX commands, would allow you to make certain parameters configurable.
Compared to an equivalent iterative solution, lazy enumerables are known to be somewhat slower in Ruby. This is due to implementation overhead. How much slower seems to depend on what the actual pipeline looks like.
Despite the overhead, keep in mind that if you want to support arbitrary-length data streams, laziness is literally your only option, because at some point, you will run out of memory, and performance then becomes a moot issue.
In addition, the overhead is more than offset by the fact that you can easily parallelize operations.
How to design a data processing system with dependencies among streams and pipelines?
Is there a need for some kind of framework, or are these patterns enough?
One of the major drawbacks to Ruby (in my opinion) is the lack of type checking. Java and Scala are two languages that are better at this, though arguably, development is slower using them.
Java 8 added a Streams API and lambda expressions, which make it possible to write code that looks a lot like the Ruby code above. You get all the pros and cons of Java and the JVM.
int[] x = new int[] { 1,2,3,4,5 };
Arrays.stream(x)
.map(i -> {
System.out.println("adding 10 to " + i);
return i + 10;
})
.filter(i -> {
System.out.println("filtering " + i + " on evenness");
return i % 2 == 0;
})
.forEach(i -> {
System.out.println("in each: " + i);
});
Scala is a popular language used for "big data" because it's supported by Apache Spark. A mix of object-oriented and functional programming models, Scala has strong support for streams and concurrency. It interoperates with Java and runs on the JVM, but is less verbose. The learning curve is steep.
val x = Array(1,2,3,4,5)
x.toStream.map(i => {
println("adding 10 to " + i)
i + 10
}).filter(i => {
println("filtering " + i + " on evenness")
i % 2 == 0
}).foreach(i => println("in each: " + i))
Blog post by original author of the Enumerable::Lazy code
Example of using Enumerators to process a large file
Excellent explanation of how Enumerable::Lazy works under the hood
Discussion of a complex use of streams in Python and bash, including tee and merge operations