Learn the basics of functional reactive programming in Angular with RxJS.
Why?
You might be asking:
What does Angular have to do with functional reactive programming?
A lot, actually.
Everything that is asynchronous in Angular is an Observable
, which is at the core of reactive programming.
An observable is really pretty simple if you are used to using promises. A simple explanation is that observables are like promises that can emit multiple values over a series of time - otherwise referred to as a stream. Of course, that’s a bit simplified, but it’s a starting point for us.
Meetup Presentation
This article is largely based on a meetup presentation that I gave in February 2018 at the Rocky Mountain Angular meetup. Check out the video on YouTube:
Demo Files
For this post I am going to be referencing some demonstration files that I put together for the presentation. You can download them via:
The demo files use webpack-dev-server. All demos are contained within src/demos directory, and you can toggle which demo is running in the src/index.ts entry point for webpack. Just uncomment the demo you want to execute, and load http://localhost:8080 in your browser. Any changes to the demo or index file and the brower will live reload with the updated changes. Of course, be sure to only have a single demo running at a time (for the sake of clarity).
What is RxJS?
RxJS is the JavaScript implementation of the ReactiveX library. ReativeX enables us as Angular developers to build asynchronous, modular, event-based applications. And, it allows us to develop these applications reactively. This is a little confusing at first, but as you dive into RxJS you will begin to learn how powerful this concept is, and how powerful the many, many operators that are provided by RxJS are.
Back to the concept of streams of data. Where promises enable an asynchronous event or value to be either fulfilled or rejected, RxJS enables multiple asynchronous events or values to be observed.
Is this new?
Yes, and no. Yes, this is likely new to us as JavaScript developers. But, the concept or design pattern is not new.
If you refer to the famous Gang of 4 Design Patterns book you will find the observer design pattern:
To briefly explain the concept:
- A
Subject
class contains a collection of observers. - Each
ConcreteObserver
extends an abstractObserver
class, implementing theupdate()
method. - An observer registers itself with the
Subject
, and can also unregister itself. - The
Subject
notifies all observers of an event or value.
What are Observables?
Simply put, observables implement the observer design pattern.
An observer (or multiple observers) receives a Notification
that is emitted by the observable.
In RxJS, this is represented by the aptly-named Observable
class, which contains both static and instance methods to create an instance of the class, filtering and transforming values that are emitted by the observable, and more.
Further, the Observable
methods were designed around the Array.prototype
methods that JavaScript developers are familiar with, including: filter()
, map()
, reduce()
, etc.
From Promiseland
For the sake of learning about where we came from and where we are going, let’s quickly review the Promise
.
Simply uncomment the import './demos/promise';
line in the src/index.ts file.
Here is the content of the src/demos/promise.ts file:
/* create a new promise. */
const p = new Promise((resolve) => {
setTimeout(() => {
resolve('Hello from Promiseland!');
}, 1000);
});
/* log single value that is emitted. */
p.then((value) => console.log(value));
To quickly review:
- The
Promise
class is baked into our browsers today. So, we do not need to import anything from a library. - We create a new
Promise
, providing the executor function, which accepts two arguments, aresolve
function and areject
function. In this demonstration we are only using theresolve
function. - An asynchronous event will cause the promise to be either fulfilled or rejected.
- In this example, we are simply using the
setTimeout()
function to resemble an asynchronous event. After one second, the string value “Hello from Promiseland” is emitted.
To Observableland
Given that we have promises for a single asynchronous value, observables provide the ability for a stream of values to be emitted over time. Let’s look at a simple example in src/demos/observable.ts:
import { Observable } from 'rxjs/Observable';
/* create a new observable, providing the subscribe function. */
const observable: Observable<string> = new Observable(observer => {
const interval = setInterval(() => {
observer.next('Hello from Observableland!');
}, 1000);
// teardown
return () => {
clearInterval(interval);
};
});
/* Subscribe to Notifications. */
observable.subscribe(value => console.log(value));
- First, we need to import the
Observable
class from the RxJS module as observables are not yet part of the ECMAScript (JavaScript) language. - Then, we create a new
Observable
, providing the subscribe function that is invoked when anobserver
subscribes to the observable. - We can use the
next()
method on anObserver
object to emit values to the observer. TheObserver
interface requires three methods to be implemented by the object:next()
,catch()
andcomplete()
. - In this example we are using the
setInterval()
function to mimick asynchronous events that will emit values over time. Every second we are emiting the string “Hello from Observableland!“. - We then return a teardown function that is invoked when all observers have unsubscribed from the observable.
- Finally, we invoke the
subscribe()
method providing a function that is invoked each time thenext()
value is emitted to all observers.
What is a Notification
?
The Notification
class represents, or wraps, the event or value that is being emitted by the observable stream.
Notications are pushed through an observable to all of it’s observers, and include metadata about the type of event or value: if the event or value is the next value in the stream, or if the value is an exception, or if the stream has been completed and will no longer emit any future values.
What is an Observer
?
An Observer
receives a Notification
from an Observable
and is managed by a Subscription
.
An observer will react to the next, error and completion notifications.
In Angular we are often the consumer, or observer, of an asynchronous event or value that emitted by an observable in the framework. Some common use cases of this in Angular are HTTP requests and routing parameters.
What is a Subscription
?
A Subscription
is the link between an observable and an observer.
A subscription object is returned when we invoke the subscribe()
method on an Observable
.
The subscription object has two important methods: unsubscribe()
and add()
.
The unsubscribe()
method will remove an observer from the collection of observers in the observable, along with any children subscription objects.
And, we can add child subscriptions to an existing subscription using the add()
method.
Is this included in ECMAScript?
Not yet.
Observables are currently a stage 1 proposal by the TC39 group within Ecma International for ECMAScript, which JavaScript implememnts. We should expect to see obserables baked into our browser in the “near” future.
Until then, we will need to use a library like RxJS. And, as of this writing, the observable proposal does not include many of the reactive programming operators that are included by RxJS. So, we may likely be using RxJS for the forseable future.
What is a Subject
?
A Subject
is a specialized observable that enables us to multicast values to many observers, and has several implemented behaviors.
Because the Subject
class extends Observable
it inherit all of the same methods and properties of an observable.
So, a subject is both an observable and an observer.
This is an important and powerful concept for us to learn.
Let’s take a look at src/demos/subject.ts:
import { Subject } from "rxjs/Subject";
/* create an instance of Subject. */
const s = new Subject<number>();
/* Subscribe to subject. */
s.subscribe(
next => console.log('before 1:', next),
error => console.warn(error),
() => console.log('complete before 1')
);
s.subscribe(
next => console.log('before 2:', next),
error => console.warn(error),
() => console.log('complete before 2')
);
/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);
/* Subscribe late to subject. */
s.subscribe(
next => console.log('after:', next),
error => console.warn(error),
() => console.log('complete after')
);
/* Late subscription will now receive Notification. */
s.next(4);
s.complete();
In this example:
- We import the
Subject
class, and then create a new instance. - I am using TypeScript, so I have specified the generic type for this subject to be of type
number
. This means that I can expect the values to be emitted by the observable stream to be numbers. - Next, we create two new subscriptions. We provide a callback function for all three notifications: next, catch and completion, in this order as arguments to the
subscribe()
method. - I am simply logging out the result of the notification to the console.
- Then, we emit three values using the
next()
method: 1, 2 and 3. - I then add a third subscription after the first three values are emitted.
- Then, we emit a fourth value using the
next()
method: 4. - Finally, we
complete()
the observable stream, notifying the observers that there will be no more future values emitted by the stream.
If you examine your console, you should see:
before 1: 1
before 2: 1
before 1: 2
before 2: 2
before 1: 3
before 2: 3
before 1: 4
before 2: 4
after: 4
complete before 1
complete before 2
complete after
Note that the order of subscribing to the observable matters, as the third subscription only receives the fourth value (4).
Also note that since a Subject
is an observable we can invoke the next()
method to emit additional values to observers.
What is an AsyncSubject
?
The AsyncSubject
class extends Subject
and inherits all of the methods and properties of a Subject
.
And, don’t forget, that Subject
extends Obserable
.
The AsyncSubject
is one of three different Subject
behaviors that are implemented by the RxJS library.
We’ll take a look at all of these.
Here is an example, located at src/demos/async-subject.ts:
import { AsyncSubject } from "rxjs/AsyncSubject";
/* create an instance of AsyncSubject. */
const s = new AsyncSubject<number>();
/* Subscribe to subject. */
s.subscribe(
next => console.log('before:', next),
error => console.warn(error),
() => console.log('complete before')
);
/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);
/* Subscribe late to subject. */
s.subscribe(
next => console.log('after:', next),
error => console.warn(error),
() => console.log('complete after')
);
/* Complete the observable stream. */
// we must complete so values are emited to subscriptions
s.complete();
This example if very similar to the previous example of the Subject
class.
The primary difference is that our AsyncSubject
will only receive the last value emitted, and only upon completion.
In your console you should see:
before: 3
after: 3
complete before
complete after
Note that the order of subscribing to the observable stream is not relevent.
You can also play around with this example by commenting out the complete()
method invocation.
When you do that, you should see nothing in the console as the observers never receive a notification.
What is a BehaviorSubject
?
When an observer subscribes to a BehaviorSubject
it receives the most recent notification that was emitted by the observable and then continues to receive future notifications until the stream is complete.
In order to accomplish this, we must provide a seed (or default) value when creating a new BehaviorSubject
.
Let’s look at an example in the src/demos/behavior-subject.ts file:
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
/* create an instance of BehaviorSubject. */
const s = new BehaviorSubject() < number > 0;
/* Subscribe to subject. */
s.subscribe(
(next) => console.log('before:', next),
(error) => console.warn(error),
() => console.log('complete before'),
);
/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);
// s.complete();
/* Subscribe late to subject. */
s.subscribe(
(next) => console.log('after:', next),
(error) => console.warn(error),
() => console.log('complete after'),
);
To quickly review:
- First, we import the
BehaviorSubject
class and then create a new instance providing the seed value. - Next, we subscribe to the observable stream.
- Then, we emit three values: 1, 2, 3.
- And then we subscribe again (after the three values have been emitted).
Here is what is logged out to the console:
before: 0
before: 1
before: 2
before: 3
after: 3
A few things to note:
- Notice that the subscription that was created before we invoked
next()
received the seed value of 0. - And notice that the subscription that was created after we emited the third value (3) also received the latest notification.
- If you uncomment the line that invokes
complete()
, note that the second subscription never receives a next notification, but still receives the completion notification.
What is a ReplaySubject
?
Like the name suggests, a ReplaySubject
emits to any observer all of the notifications that were emitted by the source observable, regardless of when the observer subscribes.
Let’s take a look at an example in src/demos/replay-subject.ts:
import { ReplaySubject } from "rxjs/ReplaySubject";
/* create an instance of ReplaySubject. */
const s = new ReplaySubject<number>();
/* Subscribe to subject. */
s.subscribe(
next => console.log('before:', next),
error => console.warn(error),
() => console.log('complete before')
);
/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);
/* Subscribe late to subject. */
s.subscribe(
next => console.log('after:', next),
error => console.warn(error),
() => console.log('complete after')
);
/* Complete the observable stream. */
s.complete();
The code above is very similar to the previous examples. Note that we emit the completion notification after both subscriptions.
Here is what is output to the console:
before: 1
before: 2
before: 3
after: 1
after: 2
after: 3
complete before
complete after
In this example, both subscriptions receive all notifications no matter when the subscription is created.
Unicast vs Multicast
An Observable
is unicast because each subscribed observer owns an independent execution of the observable.
Let’s look at an example that will show us that for each subscription we will execute the subscribe
function that is provided to the Observable
’s constructor()
function.
The demo is located in the src/demos/obserable-unicast.ts file:
import { Observable } from 'rxjs/Observable';
/* create a new observable, providing the subscribe function. */
let i = 0;
const observable: Observable<number> = new Observable(observer => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
i++;
const interval = setInterval(() => {
observer.next(i);
}, 1000);
return () => {
clearInterval(interval);
};
});
/* Each subscription receives a copy of Observer. */
const subscription = observable.subscribe(value =>
console.log('First subscription', value)
);
subscription.add(
observable.subscribe(value => console.log('Second subscription', value))
);
/* Unsubscribe after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);
In this example we should see that the subscribe function is invoked twice, for each observer to the observable stream:
What we observe (ha) is that two new subscriptions are created, and the resulting value of i
is 2.
This is important to note as the code that is in the subscribe function will be invoked for each subscription.
If we are doing a task such as an HTTP request or a long running block of code, this will be executed each time a new subscription is created.
If we want to avoid this, we can use a Subject
to multicast the same notifications to all observers using a single subscription to the source observable stream.
Let’s take a look at multicasting in the src/demos/subject-multicast.ts:
import { Observer } from 'rxjs/Observer';
import { Observable } from 'rxjs/Observable';
import { publish, takeWhile } from 'rxjs/operators';
import { ConnectableObservable } from 'rxjs/observable/ConnectableObservable';
import 'rxjs/add/observable/interval';
/* Component state. */
let alive = true;
/* create a new observable, providing the subscribe function. */
let i = 0;
const observable =
new Observable() <
number >
(observer => {
console.log(
'%cNew subscription created',
'background: #222; color: #bada55'
);
i++;
const interval = setInterval(() => {
observer.next(i);
}, 1000);
return () => {
clearInterval(interval);
};
}).pipe(takeWhile(() => alive));
const multicasted: ConnectableObservable<number> = observable.publish();
/* Create two subscriptions */
multicasted.subscribe(value => console.log('First subscription', value));
multicasted.subscribe(value => console.log('Second subscription', value));
/* Connect the subject to the observabe. */
multicasted.connect();
/* Complete the observable after 5 seconds. */
setTimeout(() => (alive = false), 5000);
This time we notice that the subscribe function is only executed a single time, and all observers recieve the notifications from the source observable:
- First, we see that the
subscribe
function is only invoked a single time, and as a result the value ofi
is 1. - Then, each observer receives next notifications with the same value: 1.
- Finally, the observable is complete after 5 seconds.
Hot vs Cold
The last concept that we will discuss is determining if an observable is consiered to be “hot” or “cold”.
An observable is considered cold if the producer (or the source of the events or values) will only produce an event or value upon a subscription to the observable.
On the other hand, an observable is considered hot if the producer is outside of the observable’s subscribe
function. In other words, the observable closes around an already producing producer.
Let’s take a look at an example of a cold observable in the src/demos/observable-cold.ts file:
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import * as io from 'socket.io-client';
// Be sure to start socket.io server via `yarn start:server`
/* create a new observable, providing the subscribe function. */
const messages = Observable.create((observer: Observer<any>) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
const url = 'localhost:3000';
const socket: SocketIOClient.Socket = io(url);
socket.on('message', (data: any) => {
observer.next(data);
});
return () => {
socket.disconnect();
};
});
/* An Observable is cold until subscribed. */
messages.subscribe((message: any) => console.log(message));
Before you can execute the example above, be sure to start the socket.io server via:
$ yarn start:server
In the example above we are:
- Creating an observerable using the static
create()
method, providing thesubscribe
function. - Creating a socket.io server within the
subscribe
function so that the server is only created upon a new observer to the observable. - Emiting the next notification when a new message is received from the socket server.
- Disconnecting from the socket server in the teardown function.
Because this is a cold observer, the producer, which in this case is a connection to our socket server, is only created when we invoke the subscribe()
method.
If you comment out the line that creates the new subscription then our producer will not produce a value (in this case a message from the socket server).
Conversely, let’s examine what is considered to be a hot observable in src/demos/observable-hot-1.ts:
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import * as io from 'socket.io-client';
// Be sure to start socket.io server via `yarn start:server`
/* Create single connection outside of observer to avoid multiple connections. */
let socket: SocketIOClient.Socket;
const url = 'localhost:3000';
socket = io(url);
/* create a new observable, providing the subscribe function. */
const messages = Observable.create((observer: Observer<any>) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
socket.on('message', (data: any) => observer.next(data));
});
/* Multiple subscriptions will open single connection. */
// const subscription = messages.subscribe((message: any) => console.log('First subscription', message));
// subscription.add(messages.subscribe((message: any) => console.log('Second subscription', message)));
// setTimeout(() => subscription.unsubscribe(), 6000);
In the code above:
- We create a single connection to the socket server outside of the observable’s
subscribe
function. - We emit the next notification each time the socket emits a message.
- Right now we are not creating any subscriptions. However, our socket server connection has still been established.
- If you uncomment the last few lines of code, we will observe that only a single connection to our socket server is established even though we are creating two subscriptions.
This is because the observable is already “hot” and producing values.
How does this apply to Angular?
One example is the HttpClientModule
.
If we invoke the get()
method on the HttpClient
, we will observe that a GET network request is only created when we subscribe()
to the observable that is returned from the get()
method.
Further, we will also observe that for each new subscription that is created, an additional request is made.
This is because subscribing to the observable multiple times will create multiple instances of the producer.
Finally, we can create a hot observable from a cold observable using the share()
operator.
Let’s take a look at this in the src/demos/obserable-hot-2.ts file:
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import { share } from 'rxjs/operators';
import * as io from 'socket.io-client';
// Be sure to start socket.io server via `yarn start:server`
/* create a new observable, providing the subscribe function. */
let socket: SocketIOClient.Socket;
const url = 'localhost:3000';
const messages = Observable.create((observer: Observer<any>) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
socket = io(url);
socket.on('message', (data: any) => {
observer.next(data);
});
return () => {
socket.disconnect();
};
}).pipe(share());
/* Multiple subscriptions will open single connection. */
const subscription = messages.subscribe((message: any) =>
console.log('First subscription', message)
);
subscription.add(
messages.subscribe((message: any) =>
console.log('Second subscription', message)
)
);
setTimeout(() => subscription.unsubscribe(), 6000);
In the example above we use the pipe()
method, providing the share()
operator.
The share()
operator returns a new observable that is multicasted and hot.
For more details, check out Ben’s post on hot vs cold observables.