Picture of Brian Love wearing black against a dark wall in Portland, OR.

Brian Love

RxJS: Master the Operators

Learn to master RxJS operators with Angular 5 and NgRx.

Meetup Presentation

This blog article is largely based on a Rocky Mountain Angular meetup presentation from March 2018. A big thanks to Jon Rista who helped out with the presentation!

Example Application

In this post I’ll be referring to a sample NgRx Tour of Heroes app, specifically the *ngrx-refactor-2** branch.

You can also clone it and checkout the ngrx-refactor-2 branch:

$ git clone git@github.com:blove/ngrx-tour-of-heros.git
$ git checkout ngrx-refactor-2

What are Operators?

When I refer to the operators for RxJS I am refering to the methods of the Observable class. They are pure functions that transform information in the observable stream that create new observables, often based on the current observable. Most importantly, the operators in RxJS allow for complex asynchronous code that can be easily composed in a declarative manner.

The operators can be broken down in multiple categories. There are creation operators that create a new observable optionally from a source, such as a promise or a value, Transformation operators will transform the data in the stream, and filtration operators will act as a gate for the observable stream.

How are they composable?

All operators return an Observable, making them chainable (or pipeable, but not lettable). This enables us to compose complex logic using a series of operators in a pipe that is synchronous. Finally, when we subscribe to the output observable, it will in turn subscribe to the input observable.

Versions?

As of this writing there are two common versions of RxJS being used in Angular applications. Version 5.4.x is used in Angular 2 and Angular 4, while version 5.5.x of RxJS is being used in Angular 5. There is also an beta release of RxJS 6 that you an check out.

How do I import?

While you can import all of the operators, this is not suggested, and your users will not be happy. There is no need to ship all 75+ operators (and their symbol variations) to your users when you are only using a handful.

If you’re using RxJS v5.4.x then you will want to import using the prototype-based imports:

import 'rxjs/add/operators/switchMap';

This will append the switchMap method on the Observable.prototype.

If you’re using RxJS v5.5.x then you will use an ES6-style import of the exported function, for example:

import { switchMap } from 'rxjs/operators';

How do I chain operators?

Again, the answer varies based on the version of RxJS that you are using. RxJS 5.5.x introduced the lettable operator syntax using the Observable.pipe() method.

First, let’s look at an example of chaining operators in RxJS 5.x (and Angular 2/4):

class PostsComponent {

  private user: Observable<User>;

  ngOnInit() {
    this.posts = this.user
      .map(user => user.id)
      .switchMap(id =>
        this.postsService.getPosts(id)
      );
  }
}

As you can see in the example above we chain the operators together using a dot-notation.

And here is an example of pipeable operators in RxJS 5.5.x (and Angular 5):

class PostsComponent {

  private user: Observable<User>;

  ngOnInit() {
    this.posts = this.user.pipe(
      map(user => user.id),
      switchMap(id =>
        this.postsService.getPosts(id)
      )
    );
  }
}

When using RxJS 5.5.x we use the pipe() method, passing all of the operators into the method in the same order.

What about Angular?

I thought you’d never ask. If you’re a fan of the observer design pattern, reactive programming and the power of RxJS, then you’re in luck as an Angular developer.

Angular 💟 RxJS

All asynchronous events in Angular use observables, which are currently implemented using the RxJS library. Finally, Angular also gives us the powerful AsyncPipe to easily subscribe to observable streams in component templates. It’s incredibly easy and powerful to use.

How many operators?

If you’ve browsed the RxJS Observable documentation you likely first noticed something: wow, there are a lot of operators! 😲

Yeah, there’s a lot. I would say there are about 75 operators in RxJS as of this writing, not counting the variations of signatures.

How do I choose?

Good question. I hear that a lot from developers.

First, check out the “Find the right operator” wizard located at the bottom of the RxJS home page. A lot of people miss this, and it’s incredibly helpful when learning the operators.

Secondly, learn to read the marble diagrams; as this will help you to decipher the behavior of the operator you are considering. Speaking of which, let’s quickly review marble diagrams.

What are the diagrams?

The colorful diagram that we use to represent an observable stream is called a “marble diagram”.

Observer Design Pattern implemented by Obserables

We can reference the marble diagrams when determining the operator or operators that we should use in our application.

map()

The map() operator is an easy starting point for a JavaScript developer, as it behaves much like the Array.prototype.array() method:

getCharacters(name: string): Observable<Array<Character>> {
  if (name.length === 0) {
    return Observable.of([]);
  }
  return this.charactersService.getCharacters(name)
    .pipe(
      map(marvelResponse => marvelResponse.data.results)
    );
}

In the example above the getCharacters() method uses the charactersService instance to retreive the characters that match the name specified. This example is using the Marvel API to obtain an array of Marvel characters based on the name the user enters. The getCharacters() method returns an Observable. We use the map() operator to return the results property that is part of the response object that is returned from the Marvel API. The operator enables us to map the response of the observable stream to another value, in this case the array of character results.

filter()

The filter() operator behaves much like the Array.prototype.filter() method:

filter(name: string): Observable<Array<Character>> {
  if (name.length === 0) {
    return Observable.of([]);
  }
  return this.charactersService.getCharacters(name)
    .pipe(
      filter(marvelResponse => marvelResponse.code === 200),
      map(marvelResponse => marvelResponse.data.results)
    );
}

In the example above we use the filter() operator to only emit a notification to observers of the observable stream when the status code of the HTTP response is 200.

tap() or do()

The do() operator was renamed to tap() in RxJS v5.5.x as part of the upgrade to lettable operators to avoid a confict with the reserved word do (part of the do-while loop).

The tap() operator is used to perform side effects. The operator receives the observable notification, so we can use the notification’s value to perform a side effect; such as dispatching an action to change the state of our application. And, the operator always returns the notification that it received. In other words, it does not transform or filter the notification (or if you prefer, the this value).

Let’s look at an example:

export class EditComponent implements OnInit {
  power: Observable<Power>;

  constructor(
    private activatedRoute: ActivatedRoute,
    private snackBar: MatSnackBar,
    private store: Store<PowersState>
  ) {}

  ngOnInit() {
    this.power = this.activatedRoute.paramMap.pipe(
      tap(paramMap => {
        const id = +paramMap.get('id');
        this.store.dispatch(new SelectPower({ id: id }))
        this.hasPowerInStore(id).subscribe(exists => {
          if (!exists) {
            this.store.dispatch(
              new LoadPower({ id: id })
            );
          }
        });
      }),
      switchMap(() => this.store.pipe(select(getSelectedPower)))
    );
  }
}

This is a bit long, so let’s break it down.

The take-away is that we can use the tap() operator to perform a task such as logging, or side effects such as dispatching actions to the store.

switchMap()

The switchMap() operator switches from one stream to another, unsubscribing from the previous observable and returning a new observable.

In the previous example we used the switchMap() operator to return the new observable that is a result of the getSelectedPower selector, which returns an observable of the selected Power object.

Another common use case for the switchMap() is in an NgRx effect:

@Effect()
loadPower: Observable<Action> = this.actions.ofType<LoadPower>(LOAD_POWER)
  .pipe(
    map(action => action.payload),
    switchMap(payload => this.powersService.getPower(payload.id).pipe(retry(3))),
    map(power => new LoadPowerSuccess(power)),
    catchError((e: HttpErrorResponse) => Observable.of(new HttpError(e)))
  );

In the example above the switchMap() operator receives the payload notification from the LoadPower action and returns a new observable, the result of the getPower() method in the powersService, which uses the HttpClient to GET the power from a REST API.

catchError() or catch()

The catch() operator was renamed in RxJS v5.5.x to catchError(). Like the name suggests, we use the catchError() operator to receive any error notifications that are emitted in the observable stream.

In the previous example we used the catchError() operator to catch any HttpErrorResponse objects. The example is an NgRx effect, so we return a new HttpError action that will be dispatched as a result of the error.

first()

The first() operator returns the first notification observed and completes the observable stream. We can specify a predicate function to filter for a specific value. We can also specify a selector function to transform the value that is returned from the operator. And, it supports a default value.

When we looked at the EditComponent earlier, we referenced the hasPowerInStore() method, which returns an Observable of a boolean value; either true when the power exists in the store, or false when the power does not exist in the store. This is a convenient way to check if a value exists in the NgRx store:

hasPowerInStore(id: number): Observable<boolean> {
  return this.store
    .select(getPowerEntities)
      .pipe(first(powers => powers !== null, powers => powers[id] !== undefined));
}

In the example above I am using the first() operator with both the predicate and selector functions specified. First, the predicate requires that the entities dictionary is not null. Then, the selector function returns the boolean value that is determined if the entity exists in the store.

last()

The opposite of the first() operator, last() returns the last value that is observed and completes the observable stream. The other difference is that the last() operator waits for the completion notification from the original stream before returning. This is very useful for once-and-done operations:

withLatestFrom()

The withLatestFrom() operator merges streams of data, providing the value from another observable along with the latest value from the source observable once both observables have emitted a notification (or value).

Let’s look at an example:

ngOnInit() {
  this.heroes = this.power
    .pipe(
      withLatestFrom(this.heroesService.getHeroes()),
      map(([power, heroes]) => heroes.filter(hero => hero.powers.indexOf(power.id) > -1))
    );
}

In this example we are retreiving the heroes that have a chosen power. The withLatestFrom() operator merges the result of the getHeroes() observable stream with the power stream. The map() operator receives both values that are emitted by the observable streams as an array. We use destructuring to access the first and second values in the array, namely, the power object and the array of heroes.

forJoin()

The forkJoin() operator is similar to the Promise.all() method in that it starts (forks) multiple observers at once and then joins the final values from each observable when all observables complete. It is important to note that if any of the input observables never complete, then the forkJoin() will never complete.

Let’s look at an example:

hero: Observable<Hero>
powers: Observable<Array<Power>>;

ngOnInit() {
  this.hero = this.activatedRoute.paramMap
    .pipe(
      const id = +paramMap.get('id');
      switchMap(paramMap => this.heroesService.getHero(id)
    );
  this.powers = this.hero
    .pipe(
      mergeMap(hero => forkJoin(
        hero.powers.map(id => this.powersService.getPower(id))
      ))
    );
}

mergeMap()

The mergeMap() operator maps the value received by a source observable to a function that returns an observable, and merges the values emitted by that observable.

In the example above we used the mergeMap() operator which accepted the hero value emitted by the source this.hero observable. We then use the forkJoin() operator to return a single observable of an array of Power objects that are returned from the getPower() method. The values are then merged using mergeAll().

distinctUntilChanged()

The distinctUntilChanged() operator only emits distinct values from the source observable. It uses strict equality checking by default, or you can specify a comparator function to determine the uniqueness of the values.

Let’s look at an example:

ngOnInit() {
  this.form.valueChanges
    .pipe(
      debounceTime(500),
      distinctUntilChanged((prev: Power, next: Power) => prev.name === next.name)
    )
    .subscribe(value => {
      if (!this.form.valid) {
        return;
      }
      this.powerChange.emit({
        ...this.power,
        ...value
      });
    });
}

In this example we only want to emit the powerChange event when the name of the Power object has changed in the form. The distinctUntilChanged() allows us to filter the observable stream and to only emit the value when we have determined that the value has changed in the observable stream. This helps us to avoid saving the same data, if say, a user interacts with a form value, but then changes it back to the original value.

debounceTime()

In the previous example we also used the debounceTime() operator to wait until 500 milliseconds has past since the last value was emitted by the observable stream. Using the debounceTime() operator we can avoid emitting the powerChange event on each keyup of an input. The previous values that were emitted are dropped if a new value arrives before the interval has lapsed.

Conclusion

First, reactive programming using the RxJS library is a paradigm for working with observable streams of asynchronous events or data. Using operators to filter, transform or alter the timing of the stream of data enables us to easily compose logic with observable streams.

Second, if you’re an Angular developer, having a good understanding of the fundamental operators available in RxJS will greatly improve your ability to create apps that consume and create asynchronous events and values. It’s almost impossible to not use RxJS and observables when building an application with Angular.

Finally, learning to read and understand marble diagrams is very beneficial for understanding the various operators.