Beam
Processes that move data between one system and another are common. Such processes are called an Extract, Translate, Load (ETL). There are many technologies to author these in, and one of those is Beam. This post will dive into what Beam is, and how to author ETL pipelines in it.
Streams and Tables
There is one foundational bit of terminology that is important to understand before diving into Beam: streams vs tables:
- Streams: are data in motion
- Tables: are data at rest
These two things are highly correlated. Take a relational database. The WAL is a stream as it records the various actions that occurred to reach the current state, and the tables are well… tables. The correlation is that you can always replay the stream (WAL) to get the tables again.
Beam can work with either streams or tables as inputs. In Beam parlance a stream is an unbounded input, while a table is a bounded input. If a Beam pipeline is working with unbounded inputs it's a streaming pipeline, and if the pipeline is working with bounded inputs it's a batch pipeline.
It's not always easy to tell which inputs are bounded or unbounded in Beam, but in general unless the input is some kind message queue like PubSub or Kafka it's bounded.
If streams and tables are something that interest you the book Streaming Systems digs into it pretty deeply.
What Beam Brings to the Table
There are a number of qualities that make Beam a particularly interesting technology to author pipelines in:
- Many Languages: Beam pipelines can be authored in many languages such as Java, Python, and Go (used for this post)
- Many Runners: Beam can run on many different runners; the most common probably being Dataflow
- Batch and Streaming: Beam has support for both batch and streaming pipelines
- Delivery Guarantees: Beam has exactly-once semantics in both batch and streaming mode
- Massively Parallel: Beam is meant for datasets that would exceed the memory of a single machine; consequently it can go massively parallel to spread the work out
When writing a Beam pipeline there is some boilerplate you will always need:
import (
"flag"
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
pipe := beam.NewPipeline()
scope := pipe.Root()
// your code here
if err := beamx.Run(ctx, pipe); err != nil {
panic(err)
}
}
You can then just go run
the thing.
PCollections
Pipelines in Beam are built around PCollections. A PCollection
is simply a bag of things (think a linked list or array). The flow of a pipeline is generally:
- read from sources to get
PCollections
- transform those
PCollections
into otherPCollections
- write those
PCollections
to sources
There are some special properties of PCollections
that we will dig into further on in this post.
Map, Filter, and Reduce
So we have collections of things (PCollections
), but how do we operate on them? John Backus (the author of Fortran) argued that 90% of for loops could be replaced with just three functional programming primitives:
- Map: transform each element of the source collection
- Filter: remove some elements of the source collection
- Reduce: transform the source collection itself
It seem like John Backus was right since even Imperative languages will often have these functional primitives. Thankfully Beam has strong support for these functional primitives as well with via DoFns and CombineFns.
For our examples we will use a document database for our source. It will have a Pets
collection with documents that look like:
{
"name": "string",
"breed": "string"
}
And the sink database will be a SQL database with a schema like:
The in-code models for this would look like:
// SourcePet represents a pet in the source database.
type SourcePet struct {
Name `bson:"name"` // the name of the pet
Breed `bson:"breed"` // the breed of the pet
}
// SinkPet represents a pet in the sink database.
type SinkPet struct {
ID `db:"id"` // the unique identifier of the pet
Name `db:"name"` // the name of the pet
BreedID `db:"breed_id"` // ID of the breed of the pet
}
// SinkBreed represents a breed in the sink database.
type SinkBreed struct {
ID `db:"id"` // the unique identifier of the breed
Name `db:"name"` // the name of the breed
}
Map
The map operation is done with a DoFn
. Here is an example of one:
// SourcePetToSinkBreed is a DoFn that transforms a `SourcePet` into a `SinkBreed`.
type SourcePetToSinkBreed struct {}
func (f *SourcePetToSinkBreed) ProcessElement(source SourcePet, emit func(SinkBreed)) {
emit(SinkBreed{
ID: uuid.NewString(),
Name: source.Breed,
})
}
It really comes down to defining a ProcessElement
method on a struct. The simplest form of a DoFn
takes a source element and then transforms it. The call to emit
is what actually returns the transformed element.
You can invoke the DoFn
like this:
sinkBreeds := beam.ParDo(scope, &SourceBreedToSinkBreed{}, sourcePets)
By calling ParDo
we have registered a Step in our pipeline. This step transforms each SourcePet
in sourcePets
into a SinkBreed
. This exactly how map
works. One thing to be aware of is that a DoFn
can have multiple emit
arguments; which is to say a DoFn
can return multiple PCollections
.
Filter
Since our DoFn
returns elements using an emit
function implementing filter is trivial:
func (f *SourcePetToSinkBreed) ProcessElement(source SourcePet, emit func(SinkBreed)) {
if strings.Contains(source.Breed, "Retriever") {
emit(SinkBreed{
ID: uuid.NewString(),
Name: source.Breed,
})
}
}
Now the resulting PCollection
will only be the breeds that contain the string "Retriever".
Reduce
Reduce cannot be done with a DoFn
; for that we need a CombineFn. Let's implement a CombineFn
that counts the number of pets of each breed:
type BreedAccumulator struct {
Breed string
Count int64
}
// BreedsCombine is a CombineFn that counts the number of pets of each breed.
type BreedsCombine struct{}
func (fn *BreedsCombine) CreateAccumulator() BreedAccumulator {
return BreedAccumulator{}
}
func (fn *BreedsCombine) AddInput(accum BreedAccumulator, source SourcePet) BreedAccumulator {
accum.Breed = source.Breed
accum.Count += 1
return accum
}
func (fn *BreedsCombine) MergeAccumulators(a, b BreedAccumulator) BreedAccumulator {
return BreedAccumulator{Breed: a.Breed, Count: a.Count + b.Count}
}
func (fn *BreedsCombine) ExtractOutput(accum BreedsAccum) BreedsAccum {
return accum
}
This is a bit more complex then a DoFn
. To define a CombineFn
the struct must have the following methods:
- CreateAccumulator: Creates a default instance of the accumulator (think the initial value for reduce)
- AddInput: Adds a single element from the input to an accumulator
- MergeAccumulators: Combines two accumulators
- ExtractOutput: Transforms an accumulator into the final object that the resulting
PCollection
will be composed of; a noop in this case
In Beam elements can optionally have a Key, and many operations will require this. Adding a key to an element can be done with a DoFn
:
// SourcePetByBreed is a DoFn that adds a `Breed` key to a `SourcePet`.
type SourcePetByBreed struct {}
func (f *SourcePetByBreed) ProcessElement(source SourcePet, emit func(string, SourcePet)) {
emit(source.Breed, source)
}
That DoFn
will add key of Breed
to each SourcePet
. In Beam an element with a key is a KV. Now we have everything we need to invoke the CombineFn
:
keyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
counts := beam.CombinePerKey(scope, &BreedsCombine{}, keyed)
With that we have performed a reduce
. Some caveats apply:
- The result is still a
PCollection
- A given accumulator should fit in memory; don't build a LUT with a billion entries with this
Astute readers will also have noticed that this technique can do SQL style Count operations.
Towards SQL
While map
, filter
, and reduce
are very powerful there are commonly used primitives in SQL queries that are at an even higher level of abstraction:
- Group By: group a KV
PCollection
by key - Distinct: filter a
PCollection
down to its distinct elements - Join: join multiple
PCollections
together
Group By
Beam has built in support for Group By. Take the following:
keyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
groups := beam.GroupByKey(scope, keyed)
This will create KV elements where the key is a breed, and the value is pets of that breed.
Distinct
Beam also has built in support for Distinct. Take the following:
distinct := filter.Distinct(scope, sourcePets)
That will filter the source pets down to distinct elements. There are a number of other useful things in the filter package as well.
Joins
Beam has broad support for joins, but the syntax for them is a bit lower level than you might be accustomed to. Let's add one more document to the mix here. We'll add a new Shelters
collection with documents like:
{
"name": "string",
"breed": "string
}
Here are the related models for this new document:
// SourceShelter represents a shelter for pets in the source database.
type SourceShelter struct {
Name `bson:"name"` // the name of the pet
Breed `bson:"breed"` // the breed the shelter can accomodate
}
// SinkShelter represents a shelter in the sink database.
type SinkShelter struct {
ID `db:"id"` // the unique identifier of the shelter
BreedID `db:"breed_id"` // the ID of the breed this shelter can accomdate
}
Say we decide that we want to join the Pets
and Shelters
collections. First we have to add keys to both (these keys are thing you will be joining the PCollections
on). Here is how that looks:
// SourceSheltersByBreed is a DoFn that adds a `Breed` key to a `SourceShelter`.
type SourceShelpterByBreed struct {}
func (f *SourceShelterByBreed) ProcessElement(source SourceShelter, emit func(string, SourceShelter)) {
emit(source.Breed, source)
}
petsKeyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
sheltersKeyed := beam.ParDo(scope, &SourceSheltersByBreed{}, sourcePets)
With both of the PCollections
keyed by breed we can "join" with CoGroupByKey
:
result := beam.CoGroupByKey(scope, petsKeyed, sheltersKeyed)
The best way to understand the shape of the resulting PCollection
is to see how you would define a DoFn
to process it:
type AvailableSheltersForPetsFn struct {}
func (f *AvailableSheltersForPetsFn) ProcessElement(breed string, petsIter func(*SourcPet) bool, sheltersIter func(*SourceShelter) bool) {
...
}
The DoFn
receives:
- breed: a breed that had one or more pets or shelters
- petsIter: an iterator for the pets of the key breed
- sheltersIter: an iterator for the shelters that accept the key breed
Because you have access to both of the iterators you can choose to require an entry from each iterator, only one iterator, etc. This means you can simulate whatever kind of join you please.
The DAG and Massive Parallelism
As discussed before Beams goal is to run pipelines as massively parallel as it can. Let's define the higher level steps of a pipeline for our example source and sink:
- readPets: reads the pets from the source database
- readShelters: reads the shelters from the source database
- petToBreed: transforms a
SourcePet
to aSinkBreed
- petToPet: transforms a
SourcePet
to aSinkPet
; this will need the breed IDs - shelterToShelter: transforms a
SourceShelter
toSinkShelter
; this will need the breed IDs - writeBreeds: write the breeds to the sink database
- writePets: writes the pets to the sink database; the breeds will need to be written first
- writeShelters: writes the shelters to the sink database; the breeds will need to be written first
When Beam runs it will build a directed acyclic graph (DAG) of the steps you have registered. First each step will become a node. Then edges are formed between the nodes:
- An edge is formed between two nodes if the output from one node is an input for another
- An edge is formed between two nodes if the output from one node is used as a Side Input for another node
Given those rules an example DAG for the above steps could look like:
Beam can analyze this DAG to make decisions like:
readPets
andreadShelters
can be done at the same timepetToPet
andshelterToShelter
can be done at the same timewritePets
andwriteShelters
can be done at the same time
Beam knows these things are safe to do in parallel because it has the DAG. But the parallelism doesn't stop there. Each PCollection can also be split up into Bundles and these bundles can be processed in parallel.
And best of all Beam does this for you automatically.
Bonus - IO Connectors
This post has glossed over how we're actually writing and reading from the databases. This is done using what Beam calls IO Connectors. There are lots of these that are already baked into the Beam SDKs.
However you should not fear writing your own. In a batch context what the IO connectors do is actually quite simple: they are just DoFns
. Which is to say there is no real magic there, and if something isn't supported it won't be too hard to add it.
While IO connectors for batch pipelines aren't too crazy there is some real trickiness for the unbounded inputs used by streaming pipelines. In that case you should just stick to the built in connectors. If you want to know more about why this is hard research watermarking in relation to data streaming.
Bonus - More About DoFns
It's important to understand the things you can do with DoFns
since they are the core of Beam transforms. First of all a DoFn
is a struct, and you can add fields to a struct like you would expect. In the case that the value your DoFn needs is known at time of pipeline construction you can just inject it then:
type SomeDoFn struct {
SomeField string
}
beam.ParDo(scope, SomeDoFn{SomeField: "SomeValue"}, source)
An example of when that wouldn't be useful is a client library for some service. In that case you will need make use of Setup and Teardown.
type SomeDoFn struct {
client service.SomeClient // note the casing; not serialized
}
func (f *SomeDoFn) Setup() {
f.client = service.NewClient()
}
func (f *SomeDoFn) Teardown() {
f.client.Close()
}
Here the client is instantiated only once per worker. This way connections can be reused and whatnot.
Finally there is the idea of side inputs. Sometimes you want to provide data to a DoFn
that is not the primary source data, and you also don't want to do a join. In that case you can use a side input:
beam.ParDo(scope, SomeDoFn{}, source, beam.SideInput{Input: side})
When you do this it will create an edge between two nodes in the DAG. This gives you a way to exert exacting control over the DAG even when you are only generating side effects. Bear in mind that a side input must always be small. It needs to fit in memory after all.
Conclusion
Beam is a very powerful tool for writing ETLs. The high level of abstraction and strong guarantees it provides allows you to focus more on the what, and less on the how.
This post focused more on the batch pipeline side of things. There will be a future post that talks more about building streaming pipelines. Thankfully you get to use the same Beam primitives in either one.