Picture of Brian Love wearing black against a dark wall in Portland, OR.

Brian Love

RxJS: The Basics

Learn the basics of functional reactive programming in Angular with RxJS.

Why?

You might be asking:

What does Angular have to do with functional reactive programming?

A lot, actually.
Everything that is asynchronous in Angular is an Observable, which is at the core of reactive programming.

An observable is really pretty simple if you are used to using promises. A simple explanation is that observables are like promises that can emit multiple values over a series of time - otherwise referred to as a stream. Of course, that’s a bit simplified, but it’s a starting point for us.

Meetup Presentation

This article is largely based on a meetup presentation that I gave in February 2018 at the Rocky Mountain Angular meetup. Check out the video on YouTube:

Demo Files

For this post I am going to be referencing some demonstration files that I put together for the presentation. You can download them via:

The demo files use webpack-dev-server. All demos are contained within src/demos directory, and you can toggle which demo is running in the src/index.ts entry point for webpack. Just uncomment the demo you want to execute, and load http://localhost:8080 in your browser. Any changes to the demo or index file and the brower will live reload with the updated changes. Of course, be sure to only have a single demo running at a time (for the sake of clarity).

What is RxJS?

RxJS is the JavaScript implementation of the ReactiveX library. ReativeX enables us as Angular developers to build asynchronous, modular, event-based applications. And, it allows us to develop these applications reactively. This is a little confusing at first, but as you dive into RxJS you will begin to learn how powerful this concept is, and how powerful the many, many operators that are provided by RxJS are.

Back to the concept of streams of data. Where promises enable an asynchronous event or value to be either fulfilled or rejected, RxJS enables multiple asynchronous events or values to be observed.

Is this new?

Yes, and no. Yes, this is likely new to us as JavaScript developers. But, the concept or design pattern is not new.

If you refer to the famous Gang of 4 Design Patterns book you will find the observer design pattern:

Observer Design Pattern implemented by Obserables

To briefly explain the concept:

What are Observables?

Simply put, observables implement the observer design pattern. An observer (or multiple observers) receives a Notification that is emitted by the observable.

In RxJS, this is represented by the aptly-named Observable class, which contains both static and instance methods to create an instance of the class, filtering and transforming values that are emitted by the observable, and more. Further, the Observable methods were designed around the Array.prototype methods that JavaScript developers are familiar with, including: filter(), map(), reduce(), etc.

From Promiseland

For the sake of learning about where we came from and where we are going, let’s quickly review the Promise. Simply uncomment the import './demos/promise'; line in the src/index.ts file. Here is the content of the src/demos/promise.ts file:

/* create a new promise. */
const p = new Promise((resolve) => {
  setTimeout(() => {
    resolve('Hello from Promiseland!');
  }, 1000);
});

/* log single value that is emitted. */
p.then((value) => console.log(value));

To quickly review:

To Observableland

Given that we have promises for a single asynchronous value, observables provide the ability for a stream of values to be emitted over time. Let’s look at a simple example in src/demos/observable.ts:

import { Observable } from 'rxjs/Observable';

/* create a new observable, providing the subscribe function. */
const observable: Observable<string> = new Observable(observer => {
  const interval = setInterval(() => {
    observer.next('Hello from Observableland!');
  }, 1000);

  // teardown
  return () => {
    clearInterval(interval);
  };
});

/* Subscribe to Notifications. */
observable.subscribe(value => console.log(value));

What is a Notification?

The Notification class represents, or wraps, the event or value that is being emitted by the observable stream. Notications are pushed through an observable to all of it’s observers, and include metadata about the type of event or value: if the event or value is the next value in the stream, or if the value is an exception, or if the stream has been completed and will no longer emit any future values.

What is an Observer?

An Observer receives a Notification from an Observable and is managed by a Subscription. An observer will react to the next, error and completion notifications.

In Angular we are often the consumer, or observer, of an asynchronous event or value that emitted by an observable in the framework. Some common use cases of this in Angular are HTTP requests and routing parameters.

What is a Subscription?

A Subscription is the link between an observable and an observer. A subscription object is returned when we invoke the subscribe() method on an Observable. The subscription object has two important methods: unsubscribe() and add(). The unsubscribe() method will remove an observer from the collection of observers in the observable, along with any children subscription objects. And, we can add child subscriptions to an existing subscription using the add() method.

Is this included in ECMAScript?

Not yet.

Observables are currently a stage 1 proposal by the TC39 group within Ecma International for ECMAScript, which JavaScript implememnts. We should expect to see obserables baked into our browser in the “near” future.

Until then, we will need to use a library like RxJS. And, as of this writing, the observable proposal does not include many of the reactive programming operators that are included by RxJS. So, we may likely be using RxJS for the forseable future.

What is a Subject?

A Subject is a specialized observable that enables us to multicast values to many observers, and has several implemented behaviors. Because the Subject class extends Observable it inherit all of the same methods and properties of an observable. So, a subject is both an observable and an observer. This is an important and powerful concept for us to learn.

Let’s take a look at src/demos/subject.ts:

import { Subject } from "rxjs/Subject";

/* create an instance of Subject. */
const s = new Subject<number>();

/* Subscribe to subject. */
s.subscribe(
  next => console.log('before 1:', next),
  error => console.warn(error),
  () => console.log('complete before 1')
);
s.subscribe(
  next => console.log('before 2:', next),
  error => console.warn(error),
  () => console.log('complete before 2')
);

/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);

/* Subscribe late to subject. */
s.subscribe(
  next => console.log('after:', next),
  error => console.warn(error),
  () => console.log('complete after')
);

/* Late subscription will now receive Notification. */
s.next(4);
s.complete();

In this example:

If you examine your console, you should see:

before 1: 1
before 2: 1
before 1: 2
before 2: 2
before 1: 3
before 2: 3
before 1: 4
before 2: 4
after: 4
complete before 1
complete before 2
complete after

Note that the order of subscribing to the observable matters, as the third subscription only receives the fourth value (4). Also note that since a Subject is an observable we can invoke the next() method to emit additional values to observers.

What is an AsyncSubject?

The AsyncSubject class extends Subject and inherits all of the methods and properties of a Subject. And, don’t forget, that Subject extends Obserable.

The AsyncSubject is one of three different Subject behaviors that are implemented by the RxJS library. We’ll take a look at all of these.

Here is an example, located at src/demos/async-subject.ts:

import { AsyncSubject } from "rxjs/AsyncSubject";

/* create an instance of AsyncSubject. */
const s = new AsyncSubject<number>();

/* Subscribe to subject. */
s.subscribe(
  next => console.log('before:', next),
  error => console.warn(error),
  () => console.log('complete before')
);

/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);

/* Subscribe late to subject. */
s.subscribe(
  next => console.log('after:', next),
  error => console.warn(error),
  () => console.log('complete after')
);

/* Complete the observable stream. */
// we must complete so values are emited to subscriptions
s.complete();

This example if very similar to the previous example of the Subject class. The primary difference is that our AsyncSubject will only receive the last value emitted, and only upon completion.

In your console you should see:

before: 3
after: 3
complete before
complete after

Note that the order of subscribing to the observable stream is not relevent. You can also play around with this example by commenting out the complete() method invocation. When you do that, you should see nothing in the console as the observers never receive a notification.

What is a BehaviorSubject?

When an observer subscribes to a BehaviorSubject it receives the most recent notification that was emitted by the observable and then continues to receive future notifications until the stream is complete. In order to accomplish this, we must provide a seed (or default) value when creating a new BehaviorSubject.

Let’s look at an example in the src/demos/behavior-subject.ts file:

import { BehaviorSubject } from 'rxjs/BehaviorSubject';

/* create an instance of BehaviorSubject. */
const s = new BehaviorSubject() < number > 0;

/* Subscribe to subject. */
s.subscribe(
  (next) => console.log('before:', next),
  (error) => console.warn(error),
  () => console.log('complete before'),
);

/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);
// s.complete();

/* Subscribe late to subject. */
s.subscribe(
  (next) => console.log('after:', next),
  (error) => console.warn(error),
  () => console.log('complete after'),
);

To quickly review:

Here is what is logged out to the console:

before: 0
before: 1
before: 2
before: 3
after: 3

A few things to note:

What is a ReplaySubject?

Like the name suggests, a ReplaySubject emits to any observer all of the notifications that were emitted by the source observable, regardless of when the observer subscribes.

Let’s take a look at an example in src/demos/replay-subject.ts:

import { ReplaySubject } from "rxjs/ReplaySubject";

/* create an instance of ReplaySubject. */
const s = new ReplaySubject<number>();

/* Subscribe to subject. */
s.subscribe(
  next => console.log('before:', next),
  error => console.warn(error),
  () => console.log('complete before')
);

/* Emit some values. */
s.next(1);
s.next(2);
s.next(3);

/* Subscribe late to subject. */
s.subscribe(
  next => console.log('after:', next),
  error => console.warn(error),
  () => console.log('complete after')
);

/* Complete the observable stream. */
s.complete();

The code above is very similar to the previous examples. Note that we emit the completion notification after both subscriptions.

Here is what is output to the console:

before: 1
before: 2
before: 3
after: 1
after: 2
after: 3
complete before
complete after

In this example, both subscriptions receive all notifications no matter when the subscription is created.

Unicast vs Multicast

An Observable is unicast because each subscribed observer owns an independent execution of the observable.

Let’s look at an example that will show us that for each subscription we will execute the subscribe function that is provided to the Observable’s constructor() function.

The demo is located in the src/demos/obserable-unicast.ts file:

import { Observable } from 'rxjs/Observable';

/* create a new observable, providing the subscribe function. */
let i = 0;
const observable: Observable<number> = new Observable(observer => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');
  i++;

  const interval = setInterval(() => {
    observer.next(i);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

/* Each subscription receives a copy of Observer. */
const subscription = observable.subscribe(value =>
  console.log('First subscription', value)
);
subscription.add(
  observable.subscribe(value => console.log('Second subscription', value))
);

/* Unsubscribe after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);

In this example we should see that the subscribe function is invoked twice, for each observer to the observable stream:

Obserables are unicast by default

What we observe (ha) is that two new subscriptions are created, and the resulting value of i is 2.

This is important to note as the code that is in the subscribe function will be invoked for each subscription. If we are doing a task such as an HTTP request or a long running block of code, this will be executed each time a new subscription is created. If we want to avoid this, we can use a Subject to multicast the same notifications to all observers using a single subscription to the source observable stream.

Let’s take a look at multicasting in the src/demos/subject-multicast.ts:

import { Observer } from 'rxjs/Observer';
import { Observable } from 'rxjs/Observable';
import { publish, takeWhile } from 'rxjs/operators';
import { ConnectableObservable } from 'rxjs/observable/ConnectableObservable';
import 'rxjs/add/observable/interval';

/* Component state. */
let alive = true;

/* create a new observable, providing the subscribe function. */
let i = 0;
const observable =
  new Observable() <
  number >
  (observer => {
    console.log(
      '%cNew subscription created',
      'background: #222; color: #bada55'
    );
    i++;

    const interval = setInterval(() => {
      observer.next(i);
    }, 1000);

    return () => {
      clearInterval(interval);
    };
  }).pipe(takeWhile(() => alive));

const multicasted: ConnectableObservable<number> = observable.publish();

/* Create two subscriptions */
multicasted.subscribe(value => console.log('First subscription', value));
multicasted.subscribe(value => console.log('Second subscription', value));

/* Connect the subject to the observabe. */
multicasted.connect();

/* Complete the observable after 5 seconds. */
setTimeout(() => (alive = false), 5000);

This time we notice that the subscribe function is only executed a single time, and all observers recieve the notifications from the source observable:

Multicasted Subject

Hot vs Cold

The last concept that we will discuss is determining if an observable is consiered to be “hot” or “cold”. An observable is considered cold if the producer (or the source of the events or values) will only produce an event or value upon a subscription to the observable. On the other hand, an observable is considered hot if the producer is outside of the observable’s subscribe function. In other words, the observable closes around an already producing producer.

Let’s take a look at an example of a cold observable in the src/demos/observable-cold.ts file:

import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import * as io from 'socket.io-client';

// Be sure to start socket.io server via `yarn start:server`

/* create a new observable, providing the subscribe function. */
const messages = Observable.create((observer: Observer<any>) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  const url = 'localhost:3000';
  const socket: SocketIOClient.Socket = io(url);

  socket.on('message', (data: any) => {
    observer.next(data);
  });

  return () => {
    socket.disconnect();
  };
});

/* An Observable is cold until subscribed. */
messages.subscribe((message: any) => console.log(message));

Before you can execute the example above, be sure to start the socket.io server via:

$ yarn start:server

In the example above we are:

Because this is a cold observer, the producer, which in this case is a connection to our socket server, is only created when we invoke the subscribe() method. If you comment out the line that creates the new subscription then our producer will not produce a value (in this case a message from the socket server).

Conversely, let’s examine what is considered to be a hot observable in src/demos/observable-hot-1.ts:

import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import * as io from 'socket.io-client';

// Be sure to start socket.io server via `yarn start:server`

/* Create single connection outside of observer to avoid multiple connections. */
let socket: SocketIOClient.Socket;
const url = 'localhost:3000';
socket = io(url);

/* create a new observable, providing the subscribe function. */
const messages = Observable.create((observer: Observer<any>) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  socket.on('message', (data: any) => observer.next(data));
});

/* Multiple subscriptions will open single connection. */
// const subscription = messages.subscribe((message: any) => console.log('First subscription', message));
// subscription.add(messages.subscribe((message: any) => console.log('Second subscription', message)));
// setTimeout(() => subscription.unsubscribe(), 6000);

In the code above:

This is because the observable is already “hot” and producing values.

How does this apply to Angular? One example is the HttpClientModule. If we invoke the get() method on the HttpClient, we will observe that a GET network request is only created when we subscribe() to the observable that is returned from the get() method. Further, we will also observe that for each new subscription that is created, an additional request is made. This is because subscribing to the observable multiple times will create multiple instances of the producer.

Finally, we can create a hot observable from a cold observable using the share() operator. Let’s take a look at this in the src/demos/obserable-hot-2.ts file:

import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import { share } from 'rxjs/operators';
import * as io from 'socket.io-client';

// Be sure to start socket.io server via `yarn start:server`

/* create a new observable, providing the subscribe function. */
let socket: SocketIOClient.Socket;
const url = 'localhost:3000';
const messages = Observable.create((observer: Observer<any>) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  socket = io(url);

  socket.on('message', (data: any) => {
    observer.next(data);
  });

  return () => {
    socket.disconnect();
  };
}).pipe(share());

/* Multiple subscriptions will open single connection. */
const subscription = messages.subscribe((message: any) =>
  console.log('First subscription', message)
);
subscription.add(
  messages.subscribe((message: any) =>
    console.log('Second subscription', message)
  )
);
setTimeout(() => subscription.unsubscribe(), 6000);

In the example above we use the pipe() method, providing the share() operator. The share() operator returns a new observable that is multicasted and hot.

For more details, check out Ben’s post on hot vs cold observables.