Wrapping an RxJS observable stream into an Angular service
How to create an injectable service that emits a stream of values, and a UI component that subscribes to this stream, displaying its values in real time.
Angular’s dependency injection mechanism allows us to cleanly separate business logic (services) from UI (components). What if our app generates a stream of values and we want to implement it as an injectable service? Here I’ll show you how to create an injectable service that emits a stream of values, and a UI component that subscribes to this stream, displaying its values in real time.
In one of my RxJS blog posts, I showed how you would use the method Observable.create()
providing an observer as an argument. Let’s create a service with a method that will take an observer as an argument and emit the current time every second.
import {Observable} from 'rxjs/Observable'; export class ObservableService{ createObservableService(): Observable<Date>{ // 1 return new Observable( // 2 observer => { // 3 setInterval(() => observer.next(new Date()) // 4 , 1000); } ); } }
1. Return an observable stream of dates
2. Create an observable
3. Provide an observer
4. Emit the new date every second
In this service, an instance of the RxJS Observable object is created, assuming that the subscriber will provide an Observer that knows what to do with the emitted data. Whenever the observable invokes the method next(new Date())
on the observer, the subscriber will receive the current date and time. The data stream never throws an error and never completes.
If you inject the ObservableService
into the AppComponent
, it invokes the method createObservableService()
and subscribes to its stream of values, creating an observer that knows what to do with the data. The observer just assigns the received time to the variable currentTime which renders the time on UI.
import 'rxjs/add/operator/map'; import {Component} from "@angular/core"; import {ObservableService} from "./observable.service"; @Component({ selector: 'app-root', providers: [ ObservableService ], template: `<h1>Custom observable service</h1> Current time: mediumTime // 1 `}) export class AppComponent { currentTime: Date; constructor(private observableService: ObservableService) { // 2 this.observableService.createObservableService() // 3 .subscribe( data => this.currentTime = data ); // 4 } }
1. Display the time using the date pipe
2. Inject the service that wraps the observable
3. Create the observable and start emitting dates
4. Subscribe to the stream of dates
This app doesn’t use any servers, and you can see it in action in the great Stackblitz online IDE.
In the browser’s window, the current time will be updated every second. You use DatePipe
here with the format 'mediumTime'
, which displays only hours, minutes, and seconds (all date formats are described in the DatePipe documentation).
This simple example demonstrates a basic technique for wrapping any application logic in an observable stream and subscribing to it. In this case, we use setInterval()
, but you could replace it with any application-specific code that generates one or more values and sends them as a stream.
Don’t forget about error handling and completing the stream if need be. The following code snippet shows a sample observable that sends one element to the observer, may throw an error, and tells the observer that the streaming is complete:
return new Observable( observer => { try { observer.next('Hello from observable'); //throw ("Got an error"); } catch(err) { observer.error(err); } finally{ observer.complete(); } } );
If you uncomment the line with a throw, observer.error()
is invoked, which results in the invocation of the error handler on the subscriber if there is one.
The data producer for the observable stream was generating date/time, but it could be any app code that generates some useful values—e.g., a WebSocket server generating stock quotes, auction bids, actions of online game players, etc. During workshops, I show a sample online auction app that has a Node server emulating users’ bids on products. That server uses a WebSocket connection to push new bids for products to all users that are interested in receiving them.