skip to content

mergeMap (aka flatMap)

 

mergeMap, as well as other **Map operators, will substitute value on the source stream with a stream of values, returned by inner function. When source stream emits, mergeMap will call inner function to merge yet another inner stream to the resulting stream.

Also try this mergeMap vs exhaustMap vs switchMap vs concatMap head-to-head comparison

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
const { rxObserver, palette } = require('api/v0.3');
const { from, timer, pipe } = require('rxjs');
const { zip, take, map, mergeMap, delayWhen } = require('rxjs/operators');


// our source$ will emit values at 5ms, 10ms, 20ms
const source$ = fromDelayed([ 5, 10, 20 ]).pipe(
    zip(from(palette), Marble) // colorize each item
  );

const mergeMap$ = source$.pipe(
    mergeMap(x => timer(0, 3).pipe(
        take(3),
        colorize(x.color))  // colorize as source$ value
      )
  );

// visualization
source$.subscribe(rxObserver('source$'));
mergeMap$.subscribe(rxObserver('mergeMap( timer(0, 3).take(3) )'));


// helpers
function colorize(color) {
  return pipe(
    map(y => Marble(y, color))
  );
}

// creates a colored Marble
function Marble(value,color) {
  return {
    valueOf: ()=>value
    , color
  };
}

// like .from, but items are delayed by their value
function fromDelayed (arr) {
  return from(arr).pipe(
      delayWhen(x=>timer(x))
    );
}

0mssource$startcomplete55 1010 2020 mergeMap( timer(0, 3).take(3) )startcomplete00 11 00 22 11 22 00 11 22

NOTE: mergeMap is also available via flatMap alias