Angular RxJs
- RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables.
- In Angular, RxJS is fundamental for handling asynchronous operations, events, HTTP requests, and state management.
1. Basic Concepts
Section titled “1. Basic Concepts”Understanding Observables, Observers, and Subscriptions
Section titled “Understanding Observables, Observers, and Subscriptions”Observables are the core of RxJS - they represent lazy push collections of multiple values. Observers are consumers of Observable values, and Subscriptions represent the execution of an Observable.
import { Observable } from "rxjs";
// Creating a custom Observableconst simpleObservable = new Observable<string>((observer) => { // Observable execution observer.next("Hello"); // Emit value observer.next("World"); // Emit another value setTimeout(() => { observer.next("Delayed value"); observer.complete(); // Signal completion }, 1000);
// Optional cleanup function return () => { console.log("Observable teardown"); };});
// Subscribing to the Observableconst subscription = simpleObservable.subscribe({ next: (value) => console.log("Received:", value), error: (err) => console.error("Error:", err), complete: () => console.log("Stream completed"),});
// Unsubscribing to prevent memory leakssubscription.unsubscribe();Subjects and Their Variants
Section titled “Subjects and Their Variants”Subjects are special types of Observables that allow multicasting to multiple Observers. They act as both Observable and Observer.
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from "rxjs";
// Regular Subject - no initial value, only emits after subscriptionconst subject = new Subject<string>();subject.next("This is missed"); // Won't be received by late subscriberssubject.subscribe((value) => console.log("Subscriber 1:", value));subject.next("Hello"); // Received by Subscriber 1
// BehaviorSubject - requires initial value, emits current value to new subscribersconst behaviorSubject = new BehaviorSubject<number>(0);behaviorSubject.subscribe((value) => console.log("Sub1:", value)); // Immediately gets 0behaviorSubject.next(1);behaviorSubject.subscribe((value) => console.log("Sub2:", value)); // Immediately gets 1
// ReplaySubject - replays specified number of previous values to new subscribersconst replaySubject = new ReplaySubject<number>(2); // Buffer size of 2replaySubject.next(1);replaySubject.next(2);replaySubject.next(3);replaySubject.subscribe((value) => console.log("Late subscriber:", value)); // Gets 2, 3
// AsyncSubject - only emits the last value when completedconst asyncSubject = new AsyncSubject<number>();asyncSubject.next(1);asyncSubject.next(2);asyncSubject.subscribe((value) => console.log("Async sub:", value)); // Nothing yetasyncSubject.complete(); // Now emits 22. Creation Operators
Section titled “2. Creation Operators”Creation operators are functions that create new Observables from various sources like values, arrays, promises, events, or timers.
import { of, from, fromEvent, interval, timer, EMPTY } from "rxjs";
// of - creates Observable from fixed set of valuesconst numbers$ = of(1, 2, 3, 4, 5); // Emits each number synchronously
// from - creates from array, promise, or iterableconst array$ = from([10, 20, 30]); // Converts array to Observableconst promise$ = from(fetch("/api/data")); // Converts promise to Observable
// fromEvent - creates Observable from DOM eventsconst click$ = fromEvent(document, "click"); // Mouse clicksconst input$ = fromEvent(document.getElementById("search"), "input"); // Input events
// interval - emits sequential numbers at specified intervalsconst seconds$ = interval(1000); // 0, 1, 2... every second
// timer - emits after delay, optionally repeatsconst delayed$ = timer(3000); // Emits 0 after 3 seconds, then completesconst periodic$ = timer(2000, 1000); // Starts after 2s, emits every 1s
// EMPTY - immediately completes without emitting valuesconst empty$ = EMPTY; // Useful for cases where you need an Observable that does nothing3. Transformation Operators
Section titled “3. Transformation Operators”Transformation operators modify the values emitted by source Observables, transforming them into different values or structures.
Fundamental Transformation Operators
Section titled “Fundamental Transformation Operators”import { map, switchMap, mergeMap, concatMap, exhaustMap, toArray,} from "rxjs/operators";
// map - transforms each value using a projection functionconst numbers$ = of(1, 2, 3).pipe( map((x) => x * 2) // Double each value); // Output: 2, 4, 6
// switchMap - projects each value to Observable, cancels previous inner Observablesconst search$ = fromEvent(searchInput, "input").pipe( map((event) => (event.target as HTMLInputElement).value), switchMap((searchTerm) => { // If new search comes, cancel previous HTTP request return this.http.get(`/api/search?q=${searchTerm}`); }));
// mergeMap - projects each value to Observable and merges the resultsconst saveRequests$ = userActions$.pipe( mergeMap((userAction) => this.http.post("/api/users", userAction)) // All requests run in parallel);
// concatMap - projects each value to Observable, runs sequentiallyconst sequentialSaves$ = userActions$.pipe( concatMap((userAction) => this.http.post("/api/users", userAction)) // Each request waits for previous to complete);
// exhaustMap - ignores new values while current inner Observable is activeconst loginClicks$ = fromEvent(loginButton, "click").pipe( exhaustMap(() => this.http.post("/api/login", credentials)) // Ignores clicks while login request is in progress);
// toArray - collects all emissions and emits as single arrayconst numberArray$ = of(1, 2, 3, 4, 5).pipe(toArray()); // Output: [1, 2, 3, 4, 5]4. Filtering Operators
Section titled “4. Filtering Operators”Filtering operators select which values from the source Observable should be emitted based on certain conditions or timing.
import { filter, take, takeUntil, skip, debounceTime, distinctUntilChanged, first, last,} from "rxjs/operators";
// filter - only emit values that satisfy the predicateconst evenNumbers$ = of(1, 2, 3, 4, 5).pipe(filter((x) => x % 2 === 0)); // Output: 2, 4
// take - take only the first n values, then completeconst firstThree$ = of(1, 2, 3, 4, 5).pipe(take(3)); // Output: 1, 2, 3
// takeUntil - emit values until another Observable emitsconst destroy$ = new Subject<void>();const dataStream$ = interval(1000).pipe( takeUntil(destroy$) // Stops when destroy$ emits);
// skip - skip the first n valuesconst skipTwo$ = of(1, 2, 3, 4, 5).pipe(skip(2)); // Output: 3, 4, 5
// debounceTime - only emit after specified quiet periodconst searchInput$ = fromEvent(searchInput, "input").pipe( debounceTime(300), // Wait 300ms after last keystroke map((event) => (event.target as HTMLInputElement).value));
// distinctUntilChanged - only emit when current value is different from previousconst distinctNumbers$ = of(1, 1, 2, 2, 3, 2).pipe(distinctUntilChanged()); // Output: 1, 2, 3, 2
// first - emit the first value that matches conditionconst firstEven$ = of(1, 3, 2, 4).pipe(first((x) => x % 2 === 0)); // Output: 2
// last - emit the last value that matches conditionconst lastEven$ = of(1, 2, 3, 4, 5).pipe(last((x) => x % 2 === 0)); // Output: 45. Combination Operators
Section titled “5. Combination Operators”Combination operators combine multiple Observables into single Observables, controlling how and when values from different sources are merged.
import { combineLatest, forkJoin, merge, zip, withLatestFrom, startWith,} from "rxjs";
// combineLatest - emits array of latest values whenever any input emitsconst formState$ = combineLatest([username$, password$, email$]).pipe( map(([username, password, email]) => ({ username, password, email, valid: !!username && !!password, })));
// forkJoin - waits for all Observables to complete, then emits last valuesconst userDashboard$ = forkJoin({ user: this.http.get("/api/user"), posts: this.http.get("/api/posts"), settings: this.http.get("/api/settings"),}); // Emits when all three requests complete
// merge - combines multiple Observables, emits values as they arriveconst allClicks$ = merge( fromEvent(button1, "click"), fromEvent(button2, "click"), fromEvent(button3, "click")); // Emits clicks from any button
// zip - combines values in sequence, emits when all sources have new valuesconst zipped$ = zip(of("A", "B", "C"), of(1, 2, 3), of(true, false, true)); // Output: ['A', 1, true], ['B', 2, false], ['C', 3, true]
// withLatestFrom - combines source with latest value from another Observableconst clicksWithUser$ = fromEvent(button, "click").pipe( withLatestFrom(currentUser$), map(([clickEvent, user]) => ({ user, timestamp: Date.now() })));
// startWith - start stream with initial valueconst dataWithLoading$ = this.http.get("/api/data").pipe( startWith("loading...") // Immediately emit 'loading...' before HTTP response);6. Utility Operators
Section titled “6. Utility Operators”Utility operators perform side effects, modify timing, or add other utility functions to Observables without transforming the emitted values.
import { tap, delay, timeout, finalize } from "rxjs/operators";
// tap - perform side effects without affecting the streamconst loggedStream$ = userActions$.pipe( tap((userAction) => console.log("Action performed:", userAction)), tap((userAction) => this.analyticsService.track(userAction)), tap({ next: (value) => console.log("Value:", value), error: (err) => console.error("Error:", err), complete: () => console.log("Completed"), }));
// delay - delay each emission by specified timeconst delayedMessage$ = of("Hello World").pipe( delay(2000) // Emits after 2 seconds);
// timeout - error if no value is emitted within specified timeconst apiWithTimeout$ = this.http.get("/api/slow-data").pipe( timeout(5000) // Throw error if request takes longer than 5 seconds);
// finalize - execute callback when observable completes or errorsconst resource$ = this.http.get("/api/resource").pipe( finalize(() => { console.log("Request completed or failed"); this.loading = false; }));7. Error Handling Operators
Section titled “7. Error Handling Operators”Error handling operators catch and handle errors in Observable streams, allowing for graceful error recovery and retry logic.
import { catchError, retry, retryWhen, delay } from "rxjs/operators";import { of, throwError } from "rxjs";
// catchError - handle errors by returning a new Observableconst safeApiCall$ = this.http.get("/api/data").pipe( catchError((error) => { console.error("API call failed:", error); // Return fallback value return of([]); // Or re-throw the error // return throwError(() => new Error('Custom error message')); }));
// retry - automatically resubscribe specified number of times on errorconst retryApiCall$ = this.http.get("/api/unreliable").pipe( retry(3) // Retry up to 3 times before giving up);
// retryWhen - retry with custom logic and conditionsconst retryWithBackoff$ = this.http.get("/api/data").pipe( retryWhen((errors) => errors.pipe( delay(1000), // Wait 1 second between retries take(3) // Only retry 3 times ) ));
// Comprehensive error handling patternconst robustApiCall$ = this.http.get("/api/data").pipe( timeout(5000), // Add timeout retry(2), // Retry twice on failure catchError((error) => { // Handle different error types if (error.name === "TimeoutError") { this.notificationService.show("Request timed out"); } else if (error.status === 404) { this.notificationService.show("Data not found"); return of({ notFound: true }); } else if (error.status === 500) { this.notificationService.show("Server error"); }
// Re-throw for global error handler return throwError(() => error); }));8. Angular-Specific Patterns
Section titled “8. Angular-Specific Patterns”HTTP Service Patterns
Section titled “HTTP Service Patterns”Common patterns for handling HTTP requests in Angular services with proper error handling and transformation.
import { HttpClient } from "@angular/common/http";import { Injectable } from "@angular/core";
@Injectable({ providedIn: "root" })export class UserService { constructor(private http: HttpClient) {}
// Basic GET request with error handling getUsers(): Observable<User[]> { return this.http.get<User[]>("/api/users").pipe( catchError((error) => { console.error("Failed to fetch users", error); this.notificationService.show("Failed to load users"); return of([]); // Return empty array as fallback }) ); }
// Multiple parallel requests getUserDashboard(userId: string): Observable<DashboardData> { return forkJoin({ user: this.http.get<User>(`/api/users/${userId}`), posts: this.http.get<Post[]>(`/api/users/${userId}/posts`), settings: this.http.get<Settings>(`/api/users/${userId}/settings`), }).pipe( catchError((error) => { console.error("Dashboard load failed", error); return of({ user: null, posts: [], settings: null }); }) ); }
// Sequential dependent requests createUserWithProfile(userData: UserData): Observable<User> { return this.http.post<User>("/api/users", userData).pipe( switchMap((createdUser) => this.http .post<Profile>( `/api/users/${createdUser.id}/profile`, userData.profile ) .pipe( map(() => createdUser) // Return the created user ) ) ); }
// Search with debounce and cancellation searchUsers(term$: Observable<string>): Observable<User[]> { return term$.pipe( debounceTime(300), distinctUntilChanged(), switchMap((term) => term ? this.http.get<User[]>(`/api/users?search=${term}`) : of([]) ) ); }}Component Patterns with RxJS
Section titled “Component Patterns with RxJS”Common RxJS patterns used in Angular components for handling user interactions, data loading, and lifecycle management.
import { Component, OnInit, OnDestroy } from "@angular/core";import { Subject } from "rxjs";import { takeUntil, debounceTime, distinctUntilChanged } from "rxjs/operators";
@Component({ selector: "app-user-search", template: ` <input #searchInput placeholder="Search users..." /> <div *ngIf="loading">Loading...</div> <div *ngFor="let user of users$ | async"> {{ user.name }} </div> `,})export class UserSearchComponent implements OnInit, OnDestroy { private destroy$ = new Subject<void>(); private searchTerm = new Subject<string>();
loading = false; users$ = this.searchTerm.pipe( debounceTime(300), distinctUntilChanged(), switchMap((term) => { this.loading = true; return this.userService .searchUsers(term) .pipe(finalize(() => (this.loading = false))); }) );
@ViewChild("searchInput") searchInput!: ElementRef;
ngOnInit() { // Set up search input fromEvent(this.searchInput.nativeElement, "input") .pipe( map((event: any) => event.target.value), takeUntil(this.destroy$) ) .subscribe((value) => this.searchTerm.next(value));
// Auto-refresh data every 30 seconds interval(30000) .pipe(takeUntil(this.destroy$)) .subscribe(() => { this.refreshData(); }); }
refreshData() { // Implementation for refreshing data }
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}Route and Parameter Handling
Section titled “Route and Parameter Handling”Using RxJS to handle route parameters, query parameters, and route data in Angular components.
import { ActivatedRoute, Router } from '@angular/router';
@Component({...})export class UserDetailComponent implements OnInit { private destroy$ = new Subject<void>();
// Stream for user ID from route params userId$ = this.route.paramMap.pipe( map(params => params.get('id')), filter(id => !!id), // Filter out null values distinctUntilChanged() // Only emit when ID actually changes );
// Stream for user data user$ = this.userId$.pipe( switchMap(id => this.userService.getUser(id!)) );
// Stream for query parameters searchQuery$ = this.route.queryParamMap.pipe( map(params => params.get('q') || ''), distinctUntilChanged() );
// Combine route params and query params combinedParams$ = combineLatest([ this.route.params, this.route.queryParams ]).pipe( map(([params, queryParams]) => ({ userId: params['id'], search: queryParams['q'] })) );
constructor( private route: ActivatedRoute, private router: Router ) {}
ngOnInit() { // React to parameter changes this.combinedParams$.pipe( takeUntil(this.destroy$) ).subscribe(({ userId, search }) => { console.log('User:', userId, 'Search:', search); });
// Handle query parameter changes for search this.searchQuery$.pipe( debounceTime(300), switchMap(query => this.searchService.search(query)), takeUntil(this.destroy$) ).subscribe(results => { this.searchResults = results; }); }
updateSearch(query: string) { // Update query params without reloading component this.router.navigate([], { relativeTo: this.route, queryParams: { q: query }, queryParamsHandling: 'merge' }); }
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}Reactive Forms with RxJS
Section titled “Reactive Forms with RxJS”Using RxJS with Angular reactive forms for validation, value monitoring, and auto-saving.
import { FormBuilder, FormGroup, Validators } from '@angular/forms';
@Component({...})export class UserRegistrationComponent implements OnInit { registrationForm: FormGroup; private destroy$ = new Subject<void>();
// Form status stream formStatus$ = this.registrationForm.statusChanges.pipe( map(status => status === 'VALID') );
// Form value changes with debounce for auto-save formChanges$ = this.registrationForm.valueChanges.pipe( debounceTime(1000), filter(() => this.registrationForm.valid), distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)) );
// Password match validation stream passwordsMatch$ = combineLatest([ this.registrationForm.get('password')!.valueChanges, this.registrationForm.get('confirmPassword')!.valueChanges ]).pipe( map(([password, confirm]) => password === confirm), startWith(true) );
constructor(private fb: FormBuilder) { this.registrationForm = this.fb.group({ email: ['', [Validators.required, Validators.email]], password: ['', [Validators.required, Validators.minLength(8)]], confirmPassword: ['', Validators.required], newsletter: [false] }); }
ngOnInit() { // Auto-save form data when valid and changed this.formChanges$.pipe( takeUntil(this.destroy$) ).subscribe(formValue => { this.autoSaveForm(formValue); });
// Show/hide password mismatch error this.passwordsMatch$.pipe( takeUntil(this.destroy$) ).subscribe(match => { const confirmPasswordControl = this.registrationForm.get('confirmPassword'); if (!match) { confirmPasswordControl!.setErrors({ passwordMismatch: true }); } else { if (confirmPasswordControl!.errors?.['passwordMismatch']) { delete confirmPasswordControl!.errors['passwordMismatch']; if (Object.keys(confirmPasswordControl!.errors).length === 0) { confirmPasswordControl!.setErrors(null); } } } });
// Enable/disable newsletter based on user preference this.registrationForm.get('newsletter')!.valueChanges.pipe( takeUntil(this.destroy$) ).subscribe(newsletter => { this.updateNewsletterPreference(newsletter); }); }
private autoSaveForm(formValue: any) { // Save form data to backend or local storage console.log('Auto-saving form:', formValue); this.userService.saveFormDraft(formValue).subscribe(); }
private updateNewsletterPreference(enabled: boolean) { // Update newsletter preference this.userService.updateNewsletter(enabled).subscribe(); }
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}9. Best Practices and Performance
Section titled “9. Best Practices and Performance”Memory Management and Unsubscription
Section titled “Memory Management and Unsubscription”Proper techniques to prevent memory leaks by managing subscriptions and implementing clean unsubscription patterns.
import { Component, OnDestroy } from '@angular/core';
@Component({...})export class SafeComponent implements OnDestroy { private destroy$ = new Subject<void>();
// Pattern 1: Using takeUntil (recommended) ngOnInit() { this.dataService.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => this.data = data);
this.userService.getUser().pipe( takeUntil(this.destroy$) ).subscribe(user => this.user = user); }
// Pattern 2: Using async pipe in templates (best practice) data$ = this.dataService.getData().pipe( catchError(error => of(null)) );
// Pattern 3: Manual subscription management (avoid if possible) private subscriptions = new Subscription();
alternativeInit() { const sub1 = this.service1.getData().subscribe(); const sub2 = this.service2.getData().subscribe();
this.subscriptions.add(sub1); this.subscriptions.add(sub2); }
ngOnDestroy() { // Cleanup for takeUntil pattern this.destroy$.next(); this.destroy$.complete();
// Cleanup for manual subscriptions this.subscriptions.unsubscribe(); }}Performance Optimization Patterns
Section titled “Performance Optimization Patterns”Techniques to optimize RxJS performance in Angular applications, including caching, debouncing, and efficient data handling.
// Share expensive operations across multiple subscribers@Injectable()export class DataService { private usersCache$: Observable<User[]>;
getUsers(): Observable<User[]> { if (!this.usersCache$) { this.usersCache$ = this.http.get<User[]>('/api/users').pipe( shareReplay(1) // Cache and replay to all subscribers ); } return this.usersCache$; }}
// Efficient search with cancellationsearchProducts(term: string): Observable<Product[]> { return this.http.get<Product[]>(`/api/products?search=${term}`).pipe( timeout(5000), retry(2), catchError(error => of([])) );}
// Lazy loading with conditional requestsgetUserData(userId$: Observable<string>): Observable<UserData> { return userId$.pipe( filter(id => !!id), // Only proceed if ID exists distinctUntilChanged(), // Only if ID changed switchMap(id => this.http.get<UserData>(`/api/users/${id}`)) );}Common RxJS Patterns Cheat Sheet
Section titled “Common RxJS Patterns Cheat Sheet”Reusable RxJS patterns that solve common problems in Angular applications.
// 1. Search with debounce and cancellationcreateSearch(term$: Observable<string>): Observable<SearchResult[]> { return term$.pipe( debounceTime(300), distinctUntilChanged(), switchMap(term => term ? this.searchService.search(term) : of([]) ), catchError(error => { console.error('Search failed:', error); return of([]); }) );}
// 2. Polling with exponential backoffpollWithBackoff(initialDelay: number, maxDelay: number): Observable<Data> { return timer(0, initialDelay).pipe( switchMap((_, attempt) => this.api.getData().pipe( retryWhen(errors => errors.pipe( delayWhen(() => timer(Math.min(initialDelay * Math.pow(2, attempt), maxDelay))) )) ) ), shareReplay(1) );}
// 3. Dependent data loadingloadUserWithRelatedData(userId: string): Observable<{user: User, posts: Post[]}> { return this.api.getUser(userId).pipe( switchMap(user => forkJoin({ posts: this.api.getUserPosts(userId), settings: this.api.getUserSettings(userId) }).pipe( map(({ posts, settings }) => ({ user, posts, settings })) ) ) );}
// 4. Loading state managementwithLoadingState<T>(source$: Observable<T>): Observable<{data?: T, loading: boolean, error?: any}> { return merge( source$.pipe( map(data => ({ data, loading: false, error: undefined })), catchError(error => of({ data: undefined, loading: false, error })) ), of({ data: undefined, loading: true, error: undefined }) );}
// 5. Form auto-save with validationautoSaveForm(form: FormGroup): Observable<void> { return form.valueChanges.pipe( debounceTime(1000), filter(() => form.valid && form.dirty), distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)), switchMap(formValue => this.api.saveForm(formValue)), tap(() => form.markAsPristine()) );}Summary
Section titled “Summary”This comprehensive guide covers the essential RxJS operators and patterns used in Angular development. Key takeaways:
- Use
takeUntilpattern for subscription management - Prefer async pipe in templates when possible
- Use appropriate combination operators for different scenarios
- Always handle errors with
catchError - Optimize performance with
debounceTime,distinctUntilChanged, andshareReplay - Choose the right flattening operator (
switchMap,mergeMap,concatMap,exhaustMap) based on your use case