z The Flat Field Z
[ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ ]

Beam

Processes that move data between one system and another are common. Such processes are called an Extract, Translate, Load (ETL). There are many technologies to author these in, and one of those is Beam. This post will dive into what Beam is, and how to author ETL pipelines in it.

Streams and Tables

There is one foundational bit of terminology that is important to understand before diving into Beam: streams vs tables:

  • Streams: are data in motion
  • Tables: are data at rest

These two things are highly correlated. Take a relational database. The WAL is a stream as it records the various actions that occurred to reach the current state, and the tables are well… tables. The correlation is that you can always replay the stream (WAL) to get the tables again.

Beam can work with either streams or tables as inputs. In Beam parlance a stream is an unbounded input, while a table is a bounded input. If a Beam pipeline is working with unbounded inputs it's a streaming pipeline, and if the pipeline is working with bounded inputs it's a batch pipeline.

It's not always easy to tell which inputs are bounded or unbounded in Beam, but in general unless the input is some kind message queue like PubSub or Kafka it's bounded.

If streams and tables are something that interest you the book Streaming Systems digs into it pretty deeply.

What Beam Brings to the Table

There are a number of qualities that make Beam a particularly interesting technology to author pipelines in:

  • Many Languages: Beam pipelines can be authored in many languages such as Java, Python, and Go (used for this post)
  • Many Runners: Beam can run on many different runners; the most common probably being Dataflow
  • Batch and Streaming: Beam has support for both batch and streaming pipelines
  • Delivery Guarantees: Beam has exactly-once semantics in both batch and streaming mode
  • Massively Parallel: Beam is meant for datasets that would exceed the memory of a single machine; consequently it can go massively parallel to spread the work out

When writing a Beam pipeline there is some boilerplate you will always need:

import (
	"flag"
	"context"
	
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func main() {
	flag.Parse()
	beam.Init()

	ctx := context.Background()
	
	pipe := beam.NewPipeline()
	scope := pipe.Root()

	// your code here
	
	if err := beamx.Run(ctx, pipe); err != nil {
		panic(err)
	}
}

You can then just go run the thing.

PCollections

Pipelines in Beam are built around PCollections. A PCollection is simply a bag of things (think a linked list or array). The flow of a pipeline is generally:

  1. read from sources to get PCollections
  2. transform those PCollections into other PCollections
  3. write those PCollections to sources

There are some special properties of PCollections that we will dig into further on in this post.

Map, Filter, and Reduce

So we have collections of things (PCollections), but how do we operate on them? John Backus (the author of Fortran) argued that 90% of for loops could be replaced with just three functional programming primitives:

  • Map: transform each element of the source collection
  • Filter: remove some elements of the source collection
  • Reduce: transform the source collection itself

It seem like John Backus was right since even Imperative languages will often have these functional primitives. Thankfully Beam has strong support for these functional primitives as well with via DoFns and CombineFns.

For our examples we will use a document database for our source. It will have a Pets collection with documents that look like:

{
  "name": "string",
  "breed": "string"
}

And the sink database will be a SQL database with a schema like:

#+begin_src mermaid :file "sink-schema.svg" :pupeteer-config-file "~/.emacs.d/pupeteer-config.json" :mermaid-config-file "~/.emacs.d/mermaid-config.json" :background-color "transparent" erDiagram breeds }o--|| pets : breed pets { string id string name string breed_id } breeds { string id string name } #+end_src #+RESULTS: [[file:sink-schema.svg]]

The in-code models for this would look like:

// SourcePet represents a pet in the source database.
type SourcePet struct {
	Name  `bson:"name"`  // the name of the pet
	Breed `bson:"breed"` // the breed of the pet
}

// SinkPet represents a pet in the sink database.
type SinkPet struct {
	ID      `db:"id"`       // the unique identifier of the pet
	Name    `db:"name"`     // the name of the pet
	BreedID `db:"breed_id"` // ID of the breed of the pet
}

// SinkBreed represents a breed in the sink database.
type SinkBreed struct {
	ID      `db:"id"`    // the unique identifier of the breed
	Name    `db:"name"`  // the name of the breed
}

Map

The map operation is done with a DoFn. Here is an example of one:

// SourcePetToSinkBreed is a DoFn that transforms a `SourcePet` into a `SinkBreed`.
type SourcePetToSinkBreed struct {}

func (f *SourcePetToSinkBreed) ProcessElement(source SourcePet, emit func(SinkBreed)) {
	emit(SinkBreed{
		ID: uuid.NewString(),
		Name: source.Breed,
	})
}

It really comes down to defining a ProcessElement method on a struct. The simplest form of a DoFn takes a source element and then transforms it. The call to emit is what actually returns the transformed element.

You can invoke the DoFn like this:

sinkBreeds := beam.ParDo(scope, &SourceBreedToSinkBreed{}, sourcePets)

By calling ParDo we have registered a Step in our pipeline. This step transforms each SourcePet in sourcePets into a SinkBreed. This exactly how map works. One thing to be aware of is that a DoFn can have multiple emit arguments; which is to say a DoFn can return multiple PCollections.

Filter

Since our DoFn returns elements using an emit function implementing filter is trivial:

func (f *SourcePetToSinkBreed) ProcessElement(source SourcePet, emit func(SinkBreed)) {
	if strings.Contains(source.Breed, "Retriever") {
		emit(SinkBreed{
			ID: uuid.NewString(),
			Name: source.Breed,
		})
	} 
}

Now the resulting PCollection will only be the breeds that contain the string "Retriever".

Reduce

Reduce cannot be done with a DoFn; for that we need a CombineFn. Let's implement a CombineFn that counts the number of pets of each breed:

type BreedAccumulator struct {
	Breed string
	Count int64
}

// BreedsCombine is a CombineFn that counts the number of pets of each breed.
type BreedsCombine struct{}

func (fn *BreedsCombine) CreateAccumulator() BreedAccumulator {
	return BreedAccumulator{}
}


func (fn *BreedsCombine) AddInput(accum BreedAccumulator, source SourcePet) BreedAccumulator {
	accum.Breed = source.Breed
	accum.Count += 1

	return accum
}

func (fn *BreedsCombine) MergeAccumulators(a, b BreedAccumulator) BreedAccumulator {
	return BreedAccumulator{Breed: a.Breed, Count: a.Count + b.Count}
}


func (fn *BreedsCombine) ExtractOutput(accum BreedsAccum) BreedsAccum {
	return accum
}

This is a bit more complex then a DoFn. To define a CombineFn the struct must have the following methods:

  • CreateAccumulator: Creates a default instance of the accumulator (think the initial value for reduce)
  • AddInput: Adds a single element from the input to an accumulator
  • MergeAccumulators: Combines two accumulators
  • ExtractOutput: Transforms an accumulator into the final object that the resulting PCollection will be composed of; a noop in this case

In Beam elements can optionally have a Key, and many operations will require this. Adding a key to an element can be done with a DoFn:

// SourcePetByBreed is a DoFn that adds a `Breed` key to a `SourcePet`.
type SourcePetByBreed struct {}

func (f *SourcePetByBreed) ProcessElement(source SourcePet, emit func(string, SourcePet)) {
	emit(source.Breed, source)
}

That DoFn will add key of Breed to each SourcePet. In Beam an element with a key is a KV. Now we have everything we need to invoke the CombineFn:

keyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
counts := beam.CombinePerKey(scope, &BreedsCombine{}, keyed)

With that we have performed a reduce. Some caveats apply:

  • The result is still a PCollection
  • A given accumulator should fit in memory; don't build a LUT with a billion entries with this

Astute readers will also have noticed that this technique can do SQL style Count operations.

Towards SQL

While map, filter, and reduce are very powerful there are commonly used primitives in SQL queries that are at an even higher level of abstraction:

  • Group By: group a KV PCollection by key
  • Distinct: filter a PCollection down to its distinct elements
  • Join: join multiple PCollections together

Group By

Beam has built in support for Group By. Take the following:

keyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
groups := beam.GroupByKey(scope, keyed)

This will create KV elements where the key is a breed, and the value is pets of that breed.

Distinct

Beam also has built in support for Distinct. Take the following:

distinct := filter.Distinct(scope, sourcePets)

That will filter the source pets down to distinct elements. There are a number of other useful things in the filter package as well.

Joins

Beam has broad support for joins, but the syntax for them is a bit lower level than you might be accustomed to. Let's add one more document to the mix here. We'll add a new Shelters collection with documents like:

{
  "name": "string",
  "breed": "string
}

Here are the related models for this new document:

// SourceShelter represents a shelter for pets in the source database.
type SourceShelter struct {
	Name  `bson:"name"`  // the name of the pet
	Breed `bson:"breed"` // the breed the shelter can accomodate
}

// SinkShelter represents a shelter in the sink database.
type SinkShelter struct {
	ID      `db:"id"`       // the unique identifier of the shelter
	BreedID `db:"breed_id"` // the ID of the breed this shelter can accomdate
}

Say we decide that we want to join the Pets and Shelters collections. First we have to add keys to both (these keys are thing you will be joining the PCollections on). Here is how that looks:

// SourceSheltersByBreed is a DoFn that adds a `Breed` key to a `SourceShelter`.
type SourceShelpterByBreed struct {}

func (f *SourceShelterByBreed) ProcessElement(source SourceShelter, emit func(string, SourceShelter)) {
	emit(source.Breed, source)
}

petsKeyed := beam.ParDo(scope, &SourcePetByBreed{}, sourcePets)
sheltersKeyed := beam.ParDo(scope, &SourceSheltersByBreed{}, sourcePets)

With both of the PCollections keyed by breed we can "join" with CoGroupByKey:

result := beam.CoGroupByKey(scope, petsKeyed, sheltersKeyed)

The best way to understand the shape of the resulting PCollection is to see how you would define a DoFn to process it:

type AvailableSheltersForPetsFn struct {}

func (f *AvailableSheltersForPetsFn) ProcessElement(breed string, petsIter func(*SourcPet) bool, sheltersIter func(*SourceShelter) bool) {
	...
}

The DoFn receives:

  • breed: a breed that had one or more pets or shelters
  • petsIter: an iterator for the pets of the key breed
  • sheltersIter: an iterator for the shelters that accept the key breed

Because you have access to both of the iterators you can choose to require an entry from each iterator, only one iterator, etc. This means you can simulate whatever kind of join you please.

The DAG and Massive Parallelism

As discussed before Beams goal is to run pipelines as massively parallel as it can. Let's define the higher level steps of a pipeline for our example source and sink:

  • readPets: reads the pets from the source database
  • readShelters: reads the shelters from the source database
  • petToBreed: transforms a SourcePet to a SinkBreed
  • petToPet: transforms a SourcePet to a SinkPet; this will need the breed IDs
  • shelterToShelter: transforms a SourceShelter to SinkShelter; this will need the breed IDs
  • writeBreeds: write the breeds to the sink database
  • writePets: writes the pets to the sink database; the breeds will need to be written first
  • writeShelters: writes the shelters to the sink database; the breeds will need to be written first

When Beam runs it will build a directed acyclic graph (DAG) of the steps you have registered. First each step will become a node. Then edges are formed between the nodes:

  • An edge is formed between two nodes if the output from one node is an input for another
  • An edge is formed between two nodes if the output from one node is used as a Side Input for another node

Given those rules an example DAG for the above steps could look like:

#+begin_src mermaid :file "dag.svg" :pupeteer-config-file "~/.emacs.d/pupeteer-config.json" :mermaid-config-file "~/.emacs.d/mermaid-config.json" :background-color "transparent" flowchart TB readPets readShelters petToBreed petToPet shelterToShelter writeBreeds writePets writeShelters readPets-->petToBreed readPets-->petToPet petToBreed-->petToPet readShelters-->shelterToShelter petToBreed-->shelterToShelter petToBreed-->writeBreeds petToPet-->writePets shelterToShelter-->writeShelters writeBreeds-->writePets writeBreeds-->writeShelters #+end_src #+RESULTS: [[file:dag.svg]]

Beam can analyze this DAG to make decisions like:

  • readPets and readShelters can be done at the same time
  • petToPet and shelterToShelter can be done at the same time
  • writePets and writeShelters can be done at the same time

Beam knows these things are safe to do in parallel because it has the DAG. But the parallelism doesn't stop there. Each PCollection can also be split up into Bundles and these bundles can be processed in parallel.

And best of all Beam does this for you automatically.

Bonus - IO Connectors

This post has glossed over how we're actually writing and reading from the databases. This is done using what Beam calls IO Connectors. There are lots of these that are already baked into the Beam SDKs.

However you should not fear writing your own. In a batch context what the IO connectors do is actually quite simple: they are just DoFns. Which is to say there is no real magic there, and if something isn't supported it won't be too hard to add it.

While IO connectors for batch pipelines aren't too crazy there is some real trickiness for the unbounded inputs used by streaming pipelines. In that case you should just stick to the built in connectors. If you want to know more about why this is hard research watermarking in relation to data streaming.

Bonus - More About DoFns

It's important to understand the things you can do with DoFns since they are the core of Beam transforms. First of all a DoFn is a struct, and you can add fields to a struct like you would expect. In the case that the value your DoFn needs is known at time of pipeline construction you can just inject it then:

type SomeDoFn struct {
	SomeField string
}

beam.ParDo(scope, SomeDoFn{SomeField: "SomeValue"}, source)

An example of when that wouldn't be useful is a client library for some service. In that case you will need make use of Setup and Teardown.

type SomeDoFn struct {
	client service.SomeClient // note the casing; not serialized
}

func (f *SomeDoFn) Setup() {
	f.client = service.NewClient()
}

func (f *SomeDoFn) Teardown() {
	f.client.Close()
}

Here the client is instantiated only once per worker. This way connections can be reused and whatnot.

Finally there is the idea of side inputs. Sometimes you want to provide data to a DoFn that is not the primary source data, and you also don't want to do a join. In that case you can use a side input:

beam.ParDo(scope, SomeDoFn{}, source, beam.SideInput{Input: side})

When you do this it will create an edge between two nodes in the DAG. This gives you a way to exert exacting control over the DAG even when you are only generating side effects. Bear in mind that a side input must always be small. It needs to fit in memory after all.

Conclusion

Beam is a very powerful tool for writing ETLs. The high level of abstraction and strong guarantees it provides allows you to focus more on the what, and less on the how.

This post focused more on the batch pipeline side of things. There will be a future post that talks more about building streaming pipelines. Thankfully you get to use the same Beam primitives in either one.