Angular + RxJS: How to pause an observable stream posted on Jul 26, 2017

A common Angular scenario is to have a component which subscribes to an observable and then performs some action on each element of the observable sequence. The example below is our starting point and is about as simplified as possible so I won’t spend too long on it.

The component sets up a source$ which is a sequence of integers emitted at 500ms intervals.

  
   this.source$ = Observable.range(1, 500).zip(
      Observable.interval(500),
      function (x, y) { return x; }
    );
  

When the user clicks start the component subscribes to the stream and pushes each element to an array which is displayed in the view.

  
   start() {
    if (!this.itemSubscription) {
      this.itemSubscription = this.source$
        .subscribe(item => {
          this.items.push(item);
        }, error => {
          this.items.push(error);
        });
      }
    }
  

The user can unsubscribe at any time by clicking stop.

  
   stop() {
    if (this.itemSubscription) {
      this.itemSubscription.unsubscribe();
      this.itemSubscription = null;
      this.items = [];
    }
   }
  

I had a variation on this requirement where I had to perform an operation on each element and that task had to complete before I could move on to the next element in the sequence. I also needed the subscription to buffer incoming elements rather than just discarding them whilst the task was taking place.

The actual problem I had was for plotting the elements on to a map and grouping nearby elements. For the purposes of the sample it is just going to pause based on whether the user has toggled a busy button or not.

  
   <md-slide-toggle #busyToggle [checked]="busy$ | async" (change)="setBusy(busyToggle.checked)" color="secondary"&rt;busy</md-slide-toggle&rt;
  

The toggle button is wired up to a busy boolean property and a busy$ observable to the component can listen for changes.

  
     busy$ = new Subject();
     busy = false;

     ...

    ngOnInit() {
      this.busySubscription = this.busy$.subscribe(busy => this.busy = busy);
    }

    ...

    setBusy(busy: boolean) {
      this.busy$.next(busy);
    }         
  

The important bit is the change to the item subscription. Instead of just being a stream of items it now it uses a concatMap to maintain an observable sequence. If the component is busy a new observable is emitted which will not complete until the busy status changes again. This way the stream gets paused. Once busy is changed the busy observable will complete and the item observables queued behind it will be emitted in sequence.

  
     start() {
      if (!this.itemSubscription) {
        this.itemSubscription = this.source$.concatMap(item => {
        const busySubject = new Subject();

        this.busy$
          .subscribe(result => {
            busySubject.next(item);
            busySubject.complete();
          });

        if (this.busy) {
          return busySubject.asObservable();
        } else {
          return Observable.of(item);
        }

       })
        .subscribe(item => {
          this.items.push(item);
        }, error => {
          this.items.push(error);
        });
      }
    }
  

This is the best solution I have come up with so far. I still feel there might be a way to remove the need to the busy property and just use the busy$ observable but I haven’t found it yet. If I do I’ll post it. If you have any ideas please fork the plunk above and let me know in the comments.