Manage concurrent and interdependent requests with RxJS
Have you ever had to retrieve information from multiple data sources or different API endpoints? Did you optimize your request sequence for concurrency and interdependent data?
If you answered yes to at least one of these questions, you should read on now. I'll show you my solution using the Angular framework and RxJS. Let your requests flow like a series of interconnected waterfalls.
Photo: © Quang Nguyen Vinh / pexels.com
Before we take an in-depth look at some code examples, here's a brief explanation of RxJS.
What is RxJS and what are Observables?
The name RxJS is an abbreviation for Reactive Extensions Library for JavaScript. In the official RxJS Introduction it says:
RxJS is a library for composing asynchronous and event-based programs by using observable sequences.
The term “observable sequence” is key here. It represents the idea of an invokable collection of future values. The related
functionality is encapsulated in the Observable
class.
For example, you can create an observable that emits the value of a text input on every value change. Or, you can wrap the Promise of a fetch request in an observable that emits the response when the promise completes.
Another key concept to keep in mind is the use of a pipe
. Using an
observable's pipe function, we can execute a series of actions on the observable
and its datastream. If you're new to RxJS, you should learn the basics with the official docs. I can also recommend the
website Learn RxJS.
The Challenge
In a current client project, I was faced with the following challenge: The web application should enable users to make specific data sets available offline. As a first step, I turned the app into a Progressive web app and created an injectable service that handles the data storage with IndexedDB.
Then I was lost for a moment!
To be honest: I've never had to retrieve information from different API endpoints where the result of some initial requests would determine hundreds of follow-up requests. I had to optimize the request sequence for concurrency as well as interdependency.
After a lot of online research and some trial and error, I arrived at a satisfying solution. The following source code is an adapted version of my client project, which I'm happy to share with you.
The Solution
Do you like going to the movies? Imagine a web application that shows the movie program for different cinemas. You can see when which movie is playing, access information about the actors, watch film clips and browse through screenshots. You can also read general information about the cinemas (e.g. name, location) and their movie halls (e.g., number of seats).
Photo: © Pixabay / pexels.com
To retrieve and store all this data offline, we need to access different endpoints. Some of the requests (e.g., for each movie) depend on information retrieved in other requests (e.g., for the movie program). This means, we first need to define a logical order for our download sequence.
Step 1: The Main Download Pipe
In my project, the CinemaDownloadService
class contains the entire download logic. The service can be
injected by, e.g., an Angular component. It provides the public
method downloadCinemaInfo
that downloads all necessary information to make a cinema and its
movie program available offline:
public downloadCinemaInfo(cinemaId: number): Observable<CinemaOfflineData> {
return this.downloadDataForCinemaId(cinemaId).pipe(
switchMap(this.downloadMovies),
switchMap(this.downloadActorBiographies),
switchMap(this.downloadFilmClips),
switchMap(this.downloadScreenshots),
);
}
The method returns an observable that emits the data collection CinemaOfflineData
once all individual
downloads have finished.
export interface CinemaOfflineData {
generalInfo: CinemaGeneralInfo;
movieHalls: MovieHall[];
movieProgram: MovieProgram;
movies: Movie[];
actorBios: Actor[];
filmClips: FilmClip[];
screenshots: MovieScreenshot[];
}
To collect the data, the method first downloads all information that directly depends on cinemaId
. We'll
take a closer look at this in a moment. Afterwards, it uses a pipe
and the switchMap
operator to download the remaining data.
The switchMap operator takes the value emitted by the observable to create a new observable, which replaces the old one. To put it in simpler terms: Once we're finished with a specific download operation, we start a new one that incorporates the result of the previous operation.
Step 2: Download Basic Information
As a first step, we want to download the general information about the cinema, its movie halls and the current movie program:
private downloadDataForCinemaId(cinemaId: number): Observable<CinemaOfflineData> {
return forkJoin([
this._request.getCinemaGeneralInfo(cinemaId).pipe(
retryStrategy()
),
this._request.getMovieHalls(cinemaId).pipe(
retryStrategy()
),
this._request.getMovieProgram(cinemaId).pipe(
retryStrategy()
),
]).pipe(
map(([generalInfo, movieHalls, movieProgram]: [CinemaGeneralInfo, MovieHall[], MovieProgram]) => {
const data: CinemaOfflineData = {
generalInfo, movieHalls, movieProgram,
movies: [], actorBios: [],
filmClips: [], screenshots: [],
};
return data;
})
);
}
The individual backend requests are made through an independent RequestService
, which we access
through this._request
. The details of the implementation are irrelevant for this post. All you need to know is:
Each request method returns an observable that emits the response and then completes.
We use the forkJoin operator to wait for all requests to finish and then create the
first instance of our CinemaOfflineData
object. The remaining properties, like movies
,
are initialized as empty arrays. We'll download the corresponding
data in the next steps of our download pipe.
Step 3: Limit Concurrent Requests with mergeMap
After we've successfully downloaded the movie program, we want to retrieve all movies that are listed in the program. Depending on the size of the cinema and the covered time period, this could mean hundreds of individual requests.
The modern HTTP/2 protocol supports full request multiplexing. But depending on your backend server and the maximum requests per second, it might be a good idea to limit the number of concurrent requests.
To achieve this, we use a combination of the following RxJS functions: from
, mergeMap
,
and reduce
. Here's the complete movies download method:
private downloadMovies = (data: CinemaOfflineData): Observable<CinemaOfflineData> => {
const movies: Movie[] = [];
const movieIds = this.getMovieIds(data.movieProgram);
return from(movieIds).pipe(
mergeMap(
id => this._request.getMovieInfo(id).pipe(
retryStrategy()
),
MAX_CONCURRENT_BACKEND_REQUESTS
),
reduce(
(accumulator, item) => {
accumulator.push(item);
return accumulator;
},
movies
),
map(movies => ({ ...data, movies }))
);
};
I know, that's a lot to take in. Let's look at the implementation step by step:
- We extract the unique
movieIds
from the movie program we've already downloaded. Then we use from to turn the array of movie IDs into an observable. - Next, we use mergeMap to create a request for each movie. We pass the
constant
MAX_CONCURRENT_BACKEND_REQUESTS
as the second parameter to limit the max number of concurrent requests. - Each individual request observable applies the
retryStrategy
operator to repeat the request in case it should throw an error. This is my own custom operator, which uses the retry function. - The next element in our observable pipe is reduce. We use this function to
accumulate the results of all backend requests into a single
movies
array. - Last, we integrate the movies data into a new instance of our
CinemaOfflineData
object, using the map operator.
Now the main download pipe can move on the the next step: Downloading the biographies for all actors. We extract the
unique actorIds
from the movies array and apply the same download logic as before:
private downloadActorBiographies = (data: CinemaOfflineData): Observable<CinemaOfflineData> => {
const actorBios: Actor[] = [];
const actorIds = this.getActorIds(data.movies);
return from(actorIds).pipe(
mergeMap(
id => this._request.getActorBiography(id).pipe(
retryStrategy()
),
MAX_CONCURRENT_BACKEND_REQUESTS
),
reduce(
(accumulator, item) => {
accumulator.push(item);
return accumulator;
},
actorBios
),
map(actorBios => ({ ...data, actorBios }))
);
};
We repeat the same steps for the film clips and the screenshots.
Step 4: Subscribe to the Download Observable
To execute the whole download logic, you need to call the public method downloadCinemaInfo
and
subscribe to the observable it returns:
this._cinemaDownloadService.downloadCinemaInfo(cinemaId)
.subscribe((cinemaData: CinemaOfflineData) => {
/* Store data offline, e.g., with IndexedDB */
});
That's it! You've successfully created an elegant download logic that is optimized for concurrency and interdependent data.
Of course, you'll also want to cover error handling using catchError. You might also want to visualize the download process using a Subject to communicate the current status.
Conclusion
As we've seen, RxJS offers a set of powerful tools like mergeMap
and reduce
.
They help us to create efficient and robust solutions.
You can define a main download pipe that provides a quick overview of the individual steps. Then you place the specific download logic into isolated private functions. This helps to keep your code clean and maintainable.
Useful Resources
Posted on