RxJSでRxをはじめる

(2017-03-18)

https://github.com/ReactiveX/rxjs

Rx(ReactiveX)とは

非同期処理をうまく扱えるようにするライブラリ。いろんな言語で実装されている。 非同期処理の結果はObservableなStreamに流される。 ObservableはIteratableのように扱うことができる。

RxはObserver pattern を拡張したもの。 Observer patternというのは、Subjectが、Observeしている全てのObserverに対して通知を送るデザインパターン。 C#などのeventのそれ。

C#のdelegateとevent - sambaiz.net

試してみる

inputのkeyupイベントのObservableを作成し、それをsubscribe()して出力している。

<html>
<head>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
</head>
<body>

<input type="text" id="input" />

<script>

const inputForm = document.querySelector('#input');

const keyups = Rx.Observable.fromEvent(inputForm, 'keyup');

keyups.subscribe(
  data => console.log(data),
  err => console.log(err)
);

</script>

</body>
</html>

入力するとこんなのが出力される。

KeyboardEvent {isTrusted: true, key: "a", code: "KeyA", location: 0, ctrlKey: false…}
KeyboardEvent {isTrusted: true, key: "b", code: "KeyB", location: 0, ctrlKey: false…}
KeyboardEvent {isTrusted: true, key: "c", code: "KeyC", location: 0, ctrlKey: false…}

Observable

create

next()でObservableに値をemitし、complete()で終了させる。 error()でエラーをemitするとそれ以後の値はemitされない。

Rx.Observable.create(function (observer) {
    observer.next("AAAAA");
    observer.next("BBBBB");
    observer.next("CCCCC");
    observer.complete();
}).subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);
AAAA
BBBB
CCCC
completed

from

配列などのIteratableをObservableに変換する。

Rx.Observable.from([1,2,3]).subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);
1
2
3
completed

fromEvent

上で使ったやつ。

Rx.Observable.fromEvent(document.querySelector('#input'), 'keyup').subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);
KeyboardEvent {isTrusted: true, key: "a", code: "KeyA", location: 0, ctrlKey: false…}

fromPromise

PromiseもObservableに変換できる。

Rx.Observable.fromPromise(Promise.resolve("ok")).subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);
ok
completed

interval

一定時間ごとにemitし続ける。

Rx.Observable.interval(1000).subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);
0
1
2
3

RxJSでObservableを結合する(merge, forkJoin, concat, combineLatest) - sambaiz.net

Operator

Observableのメソッド。新しいObservableを作って返す。

上で試したkeyupのObservableにいろいろやってみる。

pluck

nestされたプロパティを指定する。この例だと.target.value

const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
  .pluck('target', 'value');
h
ho
hog
hoge

filter

フィルタリングする。この例だと長さが2より大きいものだけがemitされる。

const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
  .pluck('target', 'value')
  .filter(text => text.length > 2 );
hog
hoge

map

map。この例だとvalue: ${text}のフォーマットでemitされる。

const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
  .pluck('target', 'value')
  .filter(text => text.length > 2 )
  .map(text => `value: ${text}`);
value: hog
value: hoge

reduce

reduce。emitされるのはcompleteされたときなので、takeUntil()で 渡したObservableが何かemitしたときにcompleteさせるようにしている。

const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
  .takeUntil(Rx.Observable.interval(5000))
  .pluck('target', 'value')
  .filter(text => text.length > 2 )
  .map(text => `value: ${text}`)
  .reduce((acc, curr) => `${acc} ${curr}`, "");

keyups.subscribe(
  data => console.log(data),
  err => console.log(err),
  () => console.log("completed")
);
 value: aaa value: aaaa value: aaaaa value: aaaaaa value: aaaaaaa value: aaaaaaaa value: aaaaaaaaa value: aaaaaaaaaa value: aaaaaaaaaaa
 completed

Subject

Observerでもあり、Observableでもあるブリッジのようなもの。

これまでのObservableはSubscribeされるまでemitしない”Cold”なものだったが、 SubjectはそんなObservableをSubscribeし、それをトリガーにemitするので、 “Cold”なObservableを常にemitし得る”Hot”なものに変えることができる。

// ColdなObservable
const cold = Rx.Observable.from([1,2,3]);

// Coldだと、いつから、何回読んでも同じ値が得られる

// 1, 2, 3, completed
cold.subscribe(
  data => console.log(data),
  err => {},
  () => console.log("completed")
);

// 1, 2, 3, completed
cold.subscribe(
  data => console.log(data), 
  err => {},
  () => console.log("completed")
);

(Publish)Subject

Subscribeした時点からemitされたアイテムをemitする。それまでにemitされたアイテムはしない。

const subject = new Rx.Subject(); 

subject.subscribe(
  data => console.log(`1: ${data}`),
  err => {},
  () => console.log("1: completed")
);

subject.next("AAA") // 1: AAA

subject.subscribe(
  data => console.log(`2: ${data}`),
  err => {},
  () => console.log("2: completed")
);

subject.next("BBB"); 

subject.complete(); 
1: AAA
1: BBB
2: BBB
1: completed
2: completed

AsyncSubject

complete時に最後にemitされた値だけをemitする。

const subject = new Rx.AsyncSubject();

subject.subscribe(
  data => console.log(`1: ${data}`),
  err => {},
  () => console.log("1: completed")
);

subject.next("AAA"); 
subject.next("BBB");

subject.complete();

subject.subscribe(
  data => console.log(`2: ${data}`),
  err => {},
  () => console.log("2: completed")
);
1: BBB
1: completed
2: BBB
2: completed

BehaviorSubject

Subscribeしたとき、最近のアイテムをemitする。あとはSubjectと同じ。

const subject = new Rx.BehaviorSubject("ZZZ")

subject.subscribe(
  data => console.log(`1: ${data}`),
  err => {},
  () => console.log("1: completed")
);

subject.next("AAA");
subject.next("BBB");

subject.subscribe(
  data => console.log(`2: ${data}`),
  err => {},
  () => console.log("2: completed")
);

subject.next("CCC"); 

subject.complete(); 
1: ZZZ
1: AAA
1: BBB
2: BBB
1: CCC
2: CCC
1: completed
2: completed

ReplaySubject

いつSubscribeしてもbufferにある全てのアイテムをemitする。

const subject = new Rx.ReplaySubject(2) // buffer size = 2

subject.subscribe(
  data => console.log(`1: ${data}`),
  err => {},
  () => console.log("1: completed")
);

subject.next("AAA");
subject.next("BBB");
subject.next("CCC");
subject.next("DDD");

subject.subscribe(
  data => console.log(`2: ${data}`),
  err => {},
  () => console.log("2: completed")
);

subject.complete(); 

buffer size = 2 なので2がSubscribeしたときにはAAAとBBBはもうない。

1: AAA
1: BBB
1: CCC
1: DDD
2: CCC
2: DDD
1: completed
2: completed

参考

歌舞伎座tech発表資料 RxJSの中を追う