RxJSでObservableを結合する(merge, forkJoin, concat, combineLatest)

(2017-05-09)

RxJSでRxをはじめる - sambaiz.net

merge

2つのstreamの両方の値がemitされる。

Rx.Observable.merge(
  stream1,
  stream2
).subscribe(
  data => console.log(`merge ${data}`),
  err => console.log(`merge ${err}`)
);

forkJoin

completeしたときの最後の値を配列としてemitする。 非同期で一つ値をemitするようなstreamで、Promise.allのようなことをしたいときはこれ。

Rx.Observable.forkJoin(
  stream1,
  stream2
).subscribe(
  data => console.log(`      forkJoin: ${data}`),
  err => console.log(`      forkJoin: ${err}`)
)

concat

前のstreamがcompleteしたら次のstreamの値がemitされる。

Rx.Observable.concat(
  stream1,
  stream2
).subscribe(
  data => console.log(`  concat ${data}`),
  err => console.log(`  concat ${err}`)
);

combineLatest

stream自体を結合するのではなく値を結合する。 この例だと、streamでemitされた値がa、stream2で最後のemitされた値がbになる。 combineする値がない場合はemitされない。

stream1.combineLatest(stream2, (a, b) => a + b).subscribe(
  data => console.log(`    combineLatest ${data}`),
  err => console.log(`    combineLatest ${err}`)
);

同時に実行したときの結果

const stream1 = Rx.Observable.interval(100).map(v => `stream 1-${v+1}`).take(3);
const stream2 = Rx.Observable.interval(100).map(v => `stream 2-${v+1}`).take(3).delay(150);

stream1とstream2

merge stream 1-1
  concat stream 1-1
merge stream 1-2
  concat stream 1-2
merge stream 2-1 <- mergeではstream1はcompleteしていないが、stream2がemitされる
merge stream 1-3
  concat stream 1-3
    combineLatest stream 1-3stream 2-1 <- stream2の値がemitされたのでcombineする
merge stream 2-2
    combineLatest stream 1-3stream 2-2
      forkJoin: stream 1-3,stream 2-3 <- stream1とstream2がcompleteしたのでforkJoinでemitされる
    combineLatest stream 1-3stream 2-3
merge stream 2-3
  concat stream 2-1 <- concatではstream1がcompleteしたので、stream2がemitされる
  concat stream 2-2
  concat stream 2-3