Home JavaScript RxJS Observables Explained — Internals, Operators and Production Pitfalls

RxJS Observables Explained — Internals, Operators and Production Pitfalls

In Plain English 🔥
Imagine you subscribe to a newspaper. You don't get every paper ever printed — you only get new ones from the day you subscribed. That's an Observable: a source that delivers values over time, only to whoever is actively listening. A Promise is like ordering one pizza — it arrives once and it's done. An Observable is like a pizza conveyor belt at a restaurant — it keeps sending slices as long as you're sitting at the table, and the moment you leave (unsubscribe), the slices stop coming to you.
⚡ Quick Answer
Imagine you subscribe to a newspaper. You don't get every paper ever printed — you only get new ones from the day you subscribed. That's an Observable: a source that delivers values over time, only to whoever is actively listening. A Promise is like ordering one pizza — it arrives once and it's done. An Observable is like a pizza conveyor belt at a restaurant — it keeps sending slices as long as you're sitting at the table, and the moment you leave (unsubscribe), the slices stop coming to you.

Every modern JavaScript app — whether it's an Angular dashboard, a React data-fetching layer, or a Node.js event pipeline — eventually runs into the same problem: asynchronous data that arrives in bursts, needs to be transformed, combined with other streams, and cancelled gracefully. setTimeout and Promises handle one-shot async well, but they fall apart the moment you need to debounce a search box, retry a failing API call with exponential backoff, or merge a WebSocket stream with an HTTP response. RxJS was built exactly for that world.

RxJS (Reactive Extensions for JavaScript) brings the Observer pattern, the Iterator pattern, and functional programming together into one composable API. At its core, an Observable is a lazy, cancellable, composable data pipeline. Unlike a Promise, it can emit multiple values over time, it doesn't start executing until something subscribes to it, and it can be torn down mid-flight — which is the key to avoiding memory leaks in dynamic UIs.

By the end of this article you'll understand how Observables work under the hood, why cold vs hot matters in production, how the most important operators actually compose, how multicasting prevents redundant network calls, and exactly which mistakes ship bugs to production. You'll also walk away with interview-ready answers that go beyond surface-level definitions.

How Observables Work Internally — Not Just What They Are

Most tutorials treat Observable as a black box. Let's crack it open. At its simplest, an Observable is a function that accepts an Observer (an object with next, error, and complete callbacks) and returns a teardown function. That's the entire contract. When you call subscribe(), RxJS invokes that producer function and wires up the observer. Nothing happens before that call — that's what 'lazy' means.

This is fundamentally different from a Promise, which starts its executor synchronously the moment you call new Promise(). An Observable defers all work until subscription time, which means you can pass an Observable around, compose it with operators, and store it in a variable without triggering any side effects. That referential transparency is what makes Observables safe to compose.

The teardown function returned by the producer (or set via subscriber.add()) is called when you unsubscribe, or when the Observable completes or errors. This is the foundation of RxJS's memory-safety story — every resource (timers, event listeners, WebSocket connections) must be cleaned up in that teardown. If your custom Observable doesn't return a teardown, you've created a leak.

ObservableInternals.js · JAVASCRIPT
12345678910111213141516171819202122232425262728293031323334353637
import { Observable } from 'rxjs';

// Creating an Observable from scratch reveals its internals.
// The function we pass IS the producer — it runs only on subscribe().
const intervalObservable = new Observable((subscriber) => {
  let tickCount = 0;
  console.log('[Producer] Observable execution started');

  // setInterval is the resource we must clean up.
  const intervalId = setInterval(() => {
    tickCount++;
    console.log(`[Producer] Emitting tick #${tickCount}`);
    subscriber.next(tickCount); // Push value to observer

    if (tickCount === 3) {
      clearInterval(intervalId);   // Stop the interval ourselves
      subscriber.complete();        // Signal no more values
    }
  }, 500);

  // Return teardown logic — called if consumer unsubscribes early
  return () => {
    console.log('[Teardown] Interval cleared — no leak');
    clearInterval(intervalId);
  };
});

console.log('[Main] Observable defined — nothing running yet');

const subscription = intervalObservable.subscribe({
  next: (tick) => console.log(`[Observer] Received: ${tick}`),
  error: (err) => console.error(`[Observer] Error: ${err}`),
  complete: () => console.log('[Observer] Stream complete'),
});

// Uncomment to see teardown fire before tick #3:
// setTimeout(() => subscription.unsubscribe(), 800);
▶ Output
[Main] Observable defined — nothing running yet
[Producer] Observable execution started
[Producer] Emitting tick #1
[Observer] Received: 1
[Producer] Emitting tick #2
[Observer] Received: 2
[Producer] Emitting tick #3
[Observer] Received: 3
[Teardown] Interval cleared — no leak
[Observer] Stream complete
🔥
Why Lazy Evaluation Matters:Because Observables don't execute until subscribed, you can build a retry-with-backoff pipeline, store it in a service property, and hand it to three different components — each component gets its own independent execution with its own retry counter. This is the cold Observable contract and it's what makes RxJS pipelines reusable without hidden shared state.

Cold vs Hot Observables — The Distinction That Ships Bugs

This is the single most misunderstood concept in RxJS and the root cause of both duplicate API calls and missed WebSocket messages. Understanding it deeply separates senior RxJS engineers from everyone else.

A cold Observable creates its producer fresh for each subscriber. Each subscriber gets the complete sequence from the beginning, with its own independent execution context. The interval example above is cold — two subscribers would each get their own timer. fromFetch(), ajax(), and interval() are cold by default.

A hot Observable shares a single producer among all subscribers. Subscribers only receive values emitted after they subscribe — like a live concert stream. fromEvent() (wrapping a DOM event) is hot because there's one event listener on the element, not one per subscriber.

The danger zone is HTTP requests: if you build a search-as-you-type feature using a cold ajax() Observable and render it in two places, each render triggers a separate HTTP request. The fix is multicasting — turning a cold Observable hot so all subscribers share one execution. shareReplay(1) is the production workaround most Angular devs reach for, but it has its own subtleties around refCounting and memory.

ColdVsHotMulticast.js · JAVASCRIPT
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
import { Observable, Subject, interval } from 'rxjs';
import { share, shareReplay, take } from 'rxjs/operators';

// ─── COLD Observable demonstration ─────────────────────────────────────────
const coldTimer$ = new Observable((subscriber) => {
  // Each subscriber gets its own counter starting from 0
  let count = 0;
  const id = setInterval(() => subscriber.next(count++), 300);
  return () => clearInterval(id);
}).pipe(take(3));

console.log('--- COLD: two subscribers get independent streams ---');
coldTimer$.subscribe(v => console.log(`Cold Subscriber A: ${v}`));
setTimeout(() => {
  coldTimer$.subscribe(v => console.log(`Cold Subscriber B: ${v}`));
}, 400); // B starts 400ms late — still gets 0, 1, 2 from its own producer

// ─── HOT Observable via Subject ────────────────────────────────────────────
// A Subject is both an Observable and an Observer — it's the canonical hot source.
const liveScore$ = new Subject();

setTimeout(() => {
  console.log('\n--- HOT: Subject shares one stream ---');

  liveScore$.subscribe(score => console.log(`Fan A sees: ${score}`));

  // Emit the first goal — both fans see it (if subscribed at that moment)
  liveScore$.next('Goal! 1-0');

  // Fan B subscribes after the first goal — they MISS it
  liveScore$.subscribe(score => console.log(`Fan B sees: ${score}`));
  liveScore$.next('Goal! 2-0'); // Only this one is seen by Fan B
  liveScore$.complete();
}, 1200);

// ─── shareReplay: solve the cold HTTP + multiple subscriber problem ─────────
// Simulating an HTTP call that should only fire ONCE
const expensiveApiCall$ = new Observable((subscriber) => {
  console.log('\n[HTTP] Making API call — this should only appear ONCE');
  setTimeout(() => {
    subscriber.next({ user: 'Alice', role: 'admin' });
    subscriber.complete();
  }, 100);
}).pipe(
  shareReplay(1) // Replay last 1 emission to late subscribers; share the single HTTP call
);

setTimeout(() => {
  // Two components subscribe — only one HTTP call fires
  expensiveApiCall$.subscribe(data => console.log('[Component Header] Got:', data));
  expensiveApiCall$.subscribe(data => console.log('[Component Sidebar] Got:', data));
}, 2400);
▶ Output
--- COLD: two subscribers get independent streams ---
Cold Subscriber A: 0
Cold Subscriber A: 1
Cold Subscriber B: 0
Cold Subscriber A: 2
Cold Subscriber B: 1
Cold Subscriber B: 2

--- HOT: Subject shares one stream ---
Fan A sees: Goal! 1-0
Fan A sees: Goal! 2-0
Fan B sees: Goal! 2-0

[HTTP] Making API call — this should only appear ONCE
[Component Header] Got: { user: 'Alice', role: 'admin' }
[Component Sidebar] Got: { user: 'Alice', role: 'admin' }
⚠️
Watch Out — shareReplay Memory Leak:shareReplay(1) without { refCount: true } keeps the inner subscription alive even after all consumers unsubscribe. In Angular services loaded into long-lived modules this leaks memory across navigation. Use shareReplay({ bufferSize: 1, refCount: true }) in production, which tears down the shared subscription when the last subscriber leaves.

Operator Internals and Composition — map, switchMap, mergeMap, exhaustMap Compared

Operators are pure functions that take an Observable and return a new Observable. They don't mutate the source — each operator wraps the previous one in a new layer, forming a pipeline. Under the hood, pipe() is just function composition: pipe(opA, opB, opC) is equivalent to opC(opB(opA(source))).

The higher-order mapping operators — switchMap, mergeMap, concatMap, exhaustMap — are where most production bugs live. They all accept a function that maps each emitted value to an inner Observable. The difference is what they do with overlapping inner subscriptions.

switchMap cancels the previous inner Observable when a new outer value arrives. This is perfect for autocomplete — you only care about the response for the latest keystroke. mergeMap subscribes to every inner Observable concurrently, which is useful for parallel requests but can overwhelm a server. concatMap queues them, processing one at a time in order. exhaustMap ignores new outer values while an inner Observable is still active — ideal for a login button that shouldn't fire twice.

Choosing the wrong one causes race conditions (mergeMap for search), dropped requests (exhaustMap for pagination), or stalled queues (concatMap when order doesn't matter but throughput does).

HigherOrderOperators.js · JAVASCRIPT
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
import { fromEvent, of, timer, Subject } from 'rxjs';
import {
  switchMap,
  mergeMap,
  concatMap,
  exhaustMap,
  map,
  delay,
  tap,
  take
} from 'rxjs/operators';

// Simulated async search API — takes longer for 'slow' queries
const fakeSearchApi = (searchTerm) => {
  const responseDelay = searchTerm === 'slow' ? 600 : 200;
  return of(`Results for: "${searchTerm}"`).pipe(
    delay(responseDelay),
    tap(() => console.log(`[API] Response ready for: "${searchTerm}"`)),
  );
};

// ─── switchMap — cancels previous, only latest matters ──────────────────────
const searchInput$ = new Subject(); // simulate keystrokes

console.log('=== switchMap (autocomplete) ===');
const searchResults$ = searchInput$.pipe(
  switchMap((term) => {
    console.log(`[switchMap] New term "${term}" — cancelling previous inner subscription`);
    return fakeSearchApi(term);
  })
);

const sub1 = searchResults$.subscribe(result => console.log('[UI] Showing:', result));

// Simulate rapid typing — 'slow' starts first but 'fast' should win
searchInput$.next('slow');
setTimeout(() => searchInput$.next('fast'), 100); // arrives before 'slow' responds
setTimeout(() => sub1.unsubscribe(), 700);

// ─── exhaustMap — ignores while busy ────────────────────────────────────────
const loginClick$ = new Subject();

setTimeout(() => {
  console.log('\n=== exhaustMap (login button) ===');

  const loginRequest$ = loginClick$.pipe(
    exhaustMap(() => {
      console.log('[Auth] Sending login request...');
      // Simulates a 400ms login round-trip
      return of('Login success').pipe(delay(400));
    })
  );

  const sub2 = loginRequest$.subscribe(result => console.log('[Auth] Result:', result));

  // User double-clicks — second click is ignored while first is in flight
  loginClick$.next('click');
  setTimeout(() => loginClick$.next('double-click'), 100); // ignored
  setTimeout(() => sub2.unsubscribe(), 600);
}, 800);

// ─── concatMap — ordered queue ───────────────────────────────────────────────
setTimeout(() => {
  console.log('\n=== concatMap (ordered file uploads) ===');

  const filesToUpload$ = of('file-1.jpg', 'file-2.jpg', 'file-3.jpg');

  filesToUpload$.pipe(
    concatMap((filename) => {
      console.log(`[Upload] Starting: ${filename}`);
      return of(`${filename} uploaded`).pipe(delay(200));
    })
  ).subscribe(result => console.log('[Upload] Done:', result));
}, 1400);
▶ Output
=== switchMap (autocomplete) ===
[switchMap] New term "slow" — cancelling previous inner subscription
[switchMap] New term "fast" — cancelling previous inner subscription
[API] Response ready for: "fast"
[UI] Showing: Results for: "fast"

=== exhaustMap (login button) ===
[Auth] Sending login request...
[Auth] Result: Login success

=== concatMap (ordered file uploads) ===
[Upload] Starting: file-1.jpg
[Upload] Done: file-1.jpg uploaded
[Upload] Starting: file-2.jpg
[Upload] Done: file-2.jpg uploaded
[Upload] Starting: file-3.jpg
[Upload] Done: file-3.jpg uploaded
⚠️
Decision Rule for Higher-Order Operators:Ask yourself: 'What should happen if a new event arrives while the previous one is still processing?' Cancel old → switchMap. Stack them → mergeMap. Queue them → concatMap. Ignore new → exhaustMap. Tattoo this on your brain before any RxJS interview.

Production Patterns — Error Handling, Retry and Memory Management

Error handling in RxJS is a trap for the unprepared. When an Observable errors, it terminates — no more values, no recovery. That means if you have a WebSocket stream and it throws, your UI goes silent. The answer is catchError, which intercepts an error and must return a new Observable (including EMPTY to silently swallow it, or throwError to re-throw).

retryWhen and its modern replacement retry({ delay, count }) let you implement exponential backoff — critical for flaky API endpoints. But retry resubscribes to the entire source Observable, which for cold Observables means a fresh HTTP call — exactly what you want. For hot sources, retry can cause confusing behaviour because the source doesn't reset.

For memory management in SPAs, the takeUntilDestroyed() operator (Angular 16+) or the classic takeUntil(destroy$) pattern ensures subscriptions are torn down when a component unmounts. In React with RxJS, cleaning up in useEffect's return function is the equivalent. Forgetting this in a long-lived app with many navigations leads to dozens of stale subscriptions running in the background, causing ghost updates to unmounted components and measurable memory growth you'll only catch in a production heap snapshot.

ProductionErrorHandling.js · JAVASCRIPT
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
import { throwError, of, Subject, timer } from 'rxjs';
import {
  catchError,
  retry,
  switchMap,
  takeUntil,
  finalize,
  tap,
  mergeMap,
  delayWhen
} from 'rxjs/operators';

// ─── Exponential backoff retry ───────────────────────────────────────────────
let attemptCount = 0;

const unreliableApi$ = new Subject().pipe(
  // This pattern is used to kick off an Observable manually
);

// Simulates a flaky fetch that succeeds on the 3rd attempt
const fetchWithRetry$ = new (require('rxjs').Observable)((subscriber) => {
  attemptCount++;
  console.log(`[HTTP] Attempt #${attemptCount}`);

  if (attemptCount < 3) {
    subscriber.error(new Error(`Network timeout on attempt ${attemptCount}`));
  } else {
    subscriber.next({ data: 'User profile fetched successfully' });
    subscriber.complete();
  }
}).pipe(
  // retry with exponential backoff — delay doubles each time, max 3 retries
  retry({
    count: 3,
    delay: (error, retryIndex) => {
      const backoffMs = Math.pow(2, retryIndex) * 100; // 200ms, 400ms, 800ms
      console.log(`[Retry] Attempt ${retryIndex} after ${backoffMs}ms — Reason: ${error.message}`);
      return timer(backoffMs);
    },
  }),
  catchError((finalError) => {
    // Only runs if all retries are exhausted
    console.error(`[Error] Giving up after all retries: ${finalError.message}`);
    // Return a fallback value so the stream recovers instead of dying
    return of({ data: 'Cached fallback data', fromCache: true });
  }),
  finalize(() => console.log('[Cleanup] HTTP stream finalized — run cleanup here'))
);

fetchWithRetry$.subscribe({
  next: (result) => console.log('[UI] Displaying:', result),
  complete: () => console.log('[UI] Done'),
});

// ─── takeUntil pattern — prevents memory leaks ──────────────────────────────
setTimeout(() => {
  console.log('\n--- takeUntil (component lifecycle) ---');

  // In a real app this would be a component's ngOnDestroy Subject or React useEffect cleanup
  const componentDestroyed$ = new Subject();

  const liveDataStream$ = new (require('rxjs').interval)(200).pipe(
    tap(tick => console.log(`[Stream] Tick ${tick} — component still alive`)),
    takeUntil(componentDestroyed$) // auto-unsubscribes when destroy$ emits
  );

  liveDataStream$.subscribe({
    next: (tick) => console.log(`[Component] Rendered tick: ${tick}`),
    complete: () => console.log('[Component] Subscription cleaned up — no leak'),
  });

  // Simulate component unmount after 500ms
  setTimeout(() => {
    console.log('[Component] Unmounting...');
    componentDestroyed$.next(true);
    componentDestroyed$.complete();
  }, 500);
}, 1500);
▶ Output
[HTTP] Attempt #1
[Retry] Attempt 1 after 200ms — Reason: Network timeout on attempt 1
[HTTP] Attempt #2
[Retry] Attempt 2 after 400ms — Reason: Network timeout on attempt 2
[HTTP] Attempt #3
[Cleanup] HTTP stream finalized — run cleanup here
[UI] Displaying: { data: 'User profile fetched successfully' }
[UI] Done

--- takeUntil (component lifecycle) ---
[Stream] Tick 0 — component still alive
[Component] Rendered tick: 0
[Stream] Tick 1 — component still alive
[Component] Rendered tick: 1
[Stream] Tick 2 — component still alive
[Component] Rendered tick: 2
[Component] Unmounting...
[Component] Subscription cleaned up — no leak
⚠️
Interview Gold — catchError Must Return an Observable:A classic mistake is returning a plain value from catchError (e.g. return null). This throws a runtime error because RxJS expects an ObservableInput from catchError's callback. Always return of(fallbackValue), EMPTY, or throwError(() => new Error(...)) — never a raw value.
BehaviourswitchMapmergeMapconcatMapexhaustMap
Concurrent inner subscriptions1 (kills previous)Unlimited1 (queued)1 (blocks new)
Cancels in-flight innerYesNoNoNo
Ordering of resultsLatest onlyRace orderSource order guaranteedSource order (only first while idle)
Best use caseAutocomplete / latest-value-winsParallel API callsSequential uploads / ordered mutationsLogin / payment button (no double submit)
Risk if misusedRace conditions if used for mutationsServer overwhelm / out-of-order responsesBackpressure / UI stalls if queue growsSilent dropped events if UX isn't clear

🎯 Key Takeaways

  • An Observable is a lazy function — it does nothing until subscribe() is called. Returning a teardown function from the producer is what separates memory-safe from leaky custom Observables.
  • Cold Observables create a new execution per subscriber (HTTP calls, timers). Hot Observables share one execution (DOM events, WebSockets). Confusing the two causes duplicate network requests or missed events — shareReplay({ bufferSize: 1, refCount: true }) is the production bridge.
  • switchMap, mergeMap, concatMap and exhaustMap differ only in what they do with overlapping inner subscriptions. Getting this wrong causes race conditions, double form submissions, or stalled UIs — always ask 'what if a new event fires while the previous inner Observable is still alive?'
  • When an Observable errors, it terminates permanently. catchError must return an ObservableInput (not a raw value) to recover the stream. Combine with retry({ delay, count }) for exponential backoff on flaky endpoints, and always pair long-lived subscriptions with takeUntil to avoid memory leaks in SPAs.

⚠ Common Mistakes to Avoid

  • Mistake 1: Nested subscribe() calls instead of higher-order operators — Symptom: deeply nested code, impossible to unsubscribe from inner streams, classic callback hell in reactive disguise — Fix: replace subscribe-inside-subscribe with switchMap, mergeMap or concatMap. The inner Observable should be returned from the operator callback, not subscribed to directly.
  • Mistake 2: Not unsubscribing from long-lived Observables in SPA components — Symptom: memory grows on each navigation, ghost setState or Angular ChangeDetector errors after component is destroyed, intervals running after the page changes — Fix: use takeUntil(destroy$) with a Subject that emits in ngOnDestroy, or use Angular's takeUntilDestroyed() / React's useEffect cleanup return to tear down subscriptions deterministically.
  • Mistake 3: Using shareReplay(N) without refCount: true and wondering why HTTP calls still fire after all subscribers are gone — Symptom: API calls continue in the background, visible in the Network tab even when no component needs the data — Fix: use shareReplay({ bufferSize: 1, refCount: true }) so the shared subscription is torn down when subscriber count reaches zero, preventing phantom requests.

Interview Questions on This Topic

  • QWhat is the difference between a cold and a hot Observable, and how does shareReplay convert one to the other? Can you describe a real scenario where mixing them up caused a bug?
  • QExplain the difference between switchMap, mergeMap, concatMap and exhaustMap. If you were building an autocomplete search and a form submission button, which would you use for each and why?
  • QAn Observable emits values, then errors. You add a catchError that returns of(fallback), but you notice the Observable never emits again after the error — even though you expected it to continue. Why does this happen and how do you fix it?

Frequently Asked Questions

What is the difference between an Observable and a Promise in JavaScript?

A Promise is eager (starts executing immediately), can only emit one value, and cannot be cancelled. An Observable is lazy (executes only on subscribe), can emit zero to infinite values over time, and supports cancellation via unsubscribe(). Observables are also composable — you can pipe operators over them before any data flows.

When should I use a Subject instead of a plain Observable?

Use a Subject when you need to push values imperatively into a stream — for example, triggering a stream from a button click handler or a Redux-style action dispatcher. A plain Observable is better when the source is declarative and self-contained (e.g. wrapping an HTTP call). BehaviorSubject is the go-to when late subscribers need the last emitted value immediately on subscription, like a current-user state store.

Why does RxJS have so many operators? Do I need to learn them all?

RxJS has ~100+ operators because async data has genuinely complex needs — combining streams, buffering, throttling, retrying, multicasting. In production you'll reach for about 15-20 operators 90% of the time: map, filter, tap, switchMap, mergeMap, concatMap, exhaustMap, catchError, retry, takeUntil, debounceTime, distinctUntilChanged, combineLatest, forkJoin, and shareReplay. Master those deeply rather than skimming all 100.

🔥
TheCodeForge Editorial Team Verified Author

Written and reviewed by senior developers with real-world experience across enterprise, startup and open-source projects. Every article on TheCodeForge is written to be clear, accurate and genuinely useful — not just SEO filler.

← PreviousWeb Accessibility — WCAG Basics
Forged with 🔥 at TheCodeForge.io — Where Developers Are Forged