In my Angular app I would like to get SSE events from a server, and then do something with the results. For this, I found a solution where I wrap the SSE EventSource into an Observable. The code is the following:
import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
@Injectable({
providedIn: 'root',
})
export class SseServiceService {
constructor(private _zone: NgZone) {}
/**
* Creates an event source
*/
getEventSource(url: string): EventSource {
return new EventSource(url);
}
/**
* Returns an event source stream from the url
*
* @param url url of the event source
*/
getServerSentEvent(url: string) {
return new Observable((observer) => {
const eventSource = this.getEventSource(url);
eventSource.onmessage = (event) => {
this._zone.run(() => {
observer.next(event);
});
};
eventSource.onerror = (error) => {
this._zone.run(() => {
observer.error(error);
});
};
});
}
}
The question is:
Should’t I call eventSource.close() when the observable is getting destoryed?
Is there a way to assign a destructor, to observables made with new Observable()?
>Solution :
You can optionally return a teardown function form the "subscribe function" passed to the constructor:
return new Observable((observer) => {
const eventSource = this.getEventSource(url);
...
return () => eventSource.close();
})
There’re also operators such as finalize()
or tap()
(in RxJS 7+) that let you call a function when the chain is being disposed.