Reactive Extensions for the Go Language
ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.
ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.
An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.
The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
Let's see a concrete example with each box being an operator:
- We create a static Observable based on a fixed list of items using the
Just
operator. - We define a transformation function (convert a circle into a square) using the
Map
operator. - We filter each yellow square using the
Filter
operator.
In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.
Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.
The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.
go get -u github.com/reactivex/rxgo/v2
Let's create our first Observable and consume an item:
observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)
The Just
operator creates an Observable from a static list of items. Of(value)
creates an item from a given value. If we want to create an item from an error, we have to use Error(err)
. This is a difference with the v1 that was accepting a value or an error directly without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.
By the way, the Just
operator uses currying as syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.
Once the Observable is created, we can observe it using Observe()
. By default, an Observable is lazy in the sense that it emits items only once a subscription is made. Observe()
returns a <-chan rxgo.Item
.
We consumed an item from this channel and printed its value of the item using item.V
.
An item is a wrapper on top of a value or an error. We may want to check the type first like this:
item := <-ch
if item.Error() {
return item.E
}
fmt.Println(item.V)
item.Error()
returns a boolean indicating whether an item contains an error. Then, we use either item.E
to get the error or item.V
to get the value.
By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g., OnError
, Retry
, etc.)
It is also possible to consume items using callbacks:
observable.ForEach(func(v interface{}) {
fmt.Printf("received: %v\n", v)
}, func(err error) {
fmt.Printf("error: %e\n", err)
}, func() {
fmt.Println("observable is closed")
})
In this example, we passed three functions:
- A
NextFunc
triggered when a value item is emitted. - An
ErrFunc
triggered when an error item is emitted. - A
CompletedFunc
triggered once the Observable is completed.
ForEach
is non-blocking. Yet, it returns a notification channel that will be closed once the Observable completes. Hence, to make the previous code blocking, we simply need to use <-
:
<-observable.ForEach(...)
Let's say we want to implement a stream that consumes the following Customer
structure:
type Customer struct {
ID int
Name, LastName string
Age int
TaxNumber string
}
We create a producer that will emit Customer
s to a given chan rxgo.Item
and create an Observable from it:
// Create the input channel
ch := make(chan rxgo.Item)
// Data producer
go producer(ch)
// Create an Observable
observable := rxgo.FromChannel(ch)
Then, we need to perform the two following operations:
- Filter the customers whose age is below 18.
- Enrich each customer with a tax number. Retrieving a tax number is done, for example, by an IO-bound function doing an external REST call.
As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines.
Yet, let's imagine that all the Customer
items need to be produced sequentially based on its ID
.
observable.
Filter(func(item interface{}) bool {
// Filter operation
customer := item.(Customer)
return customer.Age > 18
}).
Map(func(_ context.Context, item interface{}) (interface{}, error) {
// Enrich operation
customer := item.(Customer)
taxNumber, err := getTaxNumber(customer)
if err != nil {
return nil, err
}
customer.TaxNumber = taxNumber
return customer, nil
},
// Create multiple instances of the map operator
rxgo.WithPool(pool),
// Serialize the items emitted by their Customer.ID
rxgo.Serialize(func(item interface{}) int {
customer := item.(Customer)
return customer.ID
}), rxgo.WithBufferedChannel(1))
In the end, we consume the items using ForEach()
or Observe()
for example. Observe()
returns a <-chan Item
:
for customer := range observable.Observe() {
if customer.Error() {
return err
}
fmt.Println(customer)
}
In the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable.
In RxGo, there is a similar concept.
First, let's create a hot Observable using FromChannel
operator and see the implications:
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
// First Observer
for item := range observable.Observe() {
fmt.Println(item.V)
}
// Second Observer
for item := range observable.Observe() {
fmt.Println(item.V)
}
The result of this execution is:
< 6D47 div class="snippet-clipboard-content notranslate position-relative overflow-auto" data-snippet-clipboard-copy-content="0 1 2">0
1
2