Is there a way to make a destructor for RXJS Observables?

Advertisements

In my Angular app I would like to get SSE events from a server, and then do something with the results. For this, I found a solution where I wrap the SSE EventSource into an Observable. The code is the following:


import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable({
  providedIn: 'root',
})
export class SseServiceService {
  constructor(private _zone: NgZone) {}

  /**
   * Creates an event source
   */
  getEventSource(url: string): EventSource {
    return new EventSource(url);
  }

  /**
   * Returns an event source stream from the url
   *
   * @param url url of the event source
   */
  getServerSentEvent(url: string) {
    return new Observable((observer) => {
      const eventSource = this.getEventSource(url);

      eventSource.onmessage = (event) => {
        this._zone.run(() => {
          observer.next(event);
        });
      };

      eventSource.onerror = (error) => {
        this._zone.run(() => {
          observer.error(error);
        });
      };
    });
  }
}


The question is:

Should’t I call eventSource.close() when the observable is getting destoryed?

Is there a way to assign a destructor, to observables made with new Observable()?

>Solution :

You can optionally return a teardown function form the "subscribe function" passed to the constructor:

return new Observable((observer) => {
  const eventSource = this.getEventSource(url);
  ...
  return () => eventSource.close();
})

There’re also operators such as finalize() or tap() (in RxJS 7+) that let you call a function when the chain is being disposed.

Leave a ReplyCancel reply