RxJS: Reactive Extensions For JavaScript 試用
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. RxJS 為設計用來處理訊息的非同步組合操作。
試用心得與結論
Reactive X (Reactive Extensions) 設計用來處理訊息之非同步處理。其中 RxJS 為JavaScript的版本,各主流程式語言也各有自己的Rx版本(Rx Languages)。 Rx 讓複雜的資料(訊息)流演算過程因指令統一而變得易讀一些;也就是資料流複雜度不高的話會有反效果。資料演算複雜度不會因導入Rx而下降。不過,因為演算過程使用相同的指令集所以他人讀碼時不用再猜這指令的功能為何,故而使得複雜的資料演算變得比較易讀一些,不過依然燒腦。
Reactive Extensions 以 Observer patten 為設計核心,以 stream 運轉資料(訊息)流,並導入大量的 Higher Order Function 指令,使得:
演算步驟更明確,因為(不因人而異)使用相同的指令將步驟分段計算。
資料分流與合併更明確,因為(不因人而異)使用相同的指令將資料(或訊息)分流與合併。
資料(訊息)流可以被多方同時訂閱、也可以分支、合併多方資料(訊息)流。
連續訊息處理,用於UI操作之控制訊息的狀態處理變得很方便,最典型的滑鼠拖拉應用,滑鼠與鍵盤與其他UI介面操作之訊息多方整併等等。
若沒有訂閱(subscribe)則
經試用,可應用於: 一、大批量數據的演算。 二、UI訊息可以被多方訂閱(subscribe)並整併。 三、連續訊息流(stream)處理。
參考
指令入門
用一行碼來解釋的話,
observable.subscribe(observer);
可觀察的.訂閱於(觀察者)
// 觀察者訂閱了可觀察者
// the observer subscribes to the observable.
太抽象了,更正式一些的說明。
Rx 的三個主要原素:Subject, Subscribe, Observer 。其中 Subject is Objservable。主題是可以被觀察/訂閱的。
重新用一行碼來解釋:
subject.subscribe(observer);
// the observer subscribes to the subject
// 觀察者訂閱了主題。
// 某客戶訂閱了雜誌。
在實作上Observable與Subject的異同
相同
Subject 與 Observable 都可以被訂閱
相異
Subject 為訊息輸入接口,可以加入新訊息。
Observable 只能被觀察,無法加入訊息。
Observable 可串接 Observable 讓訊息分流或合併。
Subject 不可串接 Subject,但可轉化成Observable。
即:stream_input → Subject → Observable → Observable → Observable → Subscribe → stream_output
入門用法
處理大量數據
///
/// 處理大量數據
///
import { from } from "rxjs"
import { filter, map, reduce } from "rxjs/operators"
// 模擬大量數據(實務上應是stream而不是array)
const dataList = [1,2,3,4,5,6]
// 處理大量數據
from(dataList).pipe(
filter(x=> x % 2 === 0)
map(x=> x + x)
reduce((acc,x)=>acc+x,0)
).subscribe(x=>{
console.log(x)
})
訊息訂閱與合併
///
/// 訊息訂閱與合併
/// 註:Subject, Observable, Subscribe 變數命名時以'$'結尾
///
import { Subject, of } from "rxjs";
import { mapTo, merge, scan } from "rxjs/operators";
import { useObservable } from "rxjs-hooks";
//# make input event as Rx-Subject used to proceed
const event1$ = new Subject() //<-- 變數命名時以'$'結尾
const observable1$ = event1$.pipe(mapTo(1)) // 轉化訊息
const event2$ = new Subject()
const observable2$ = event2$.pipe(mapTo(-1)) // 轉化訊息
const proceed$ = of(0).pipe(
merge(observable1$, observable2$), // 合併訊息流
scan((acc, v) => acc + v, 0) // 處理訊息:累加
)
// UI畫面
function RxSample4(props) {
const value = useObservable(() => proceed$) // 訂閱 proceed$
return (
<div className={classes.root}>
<p>proceed: {value}</p>
<Button onClick={e => {
// 把訊息送入subject event1$
event1$.next(e)
}}>加一</Button>
<Button onClick={e => {
// 把訊息送入subject event2$
event2$.next(e)
}}>減一</Button>
</div>
);
}
///
/// 此例效果等同累加減1,一個簡單的累加演算,經過RxJS後變得複雜了。此例是反效果。
/// 然而若是複雜的資料流演算,串接數個步驟與資料流動分分合合,那經過RxJS與適當安排語法就會變得易讀與易於維護許多。
///
/// buttonPlus → event1$ → mapTo → observable1$ +
/// |- merge → scan → proceed$ → subscribe
/// buttonMinus → event2$ → mapTo → observable2$ +
繪成資料流圖

常用指令
只列清單指令說明請找官網 RxJS API List 說明或google大神。有好幾個指令都很玄妙。 重點不在各個指令如何使用,而是如何有意義的組合RxJS指令。
rxjs
Subject, Observable, from, of, fromEvent, timer, interval, range,
rxjs/operators
tap, filter, map, reduce, scan, merge, startWith, mapTo, flapMap, withLastestFrom, takeUntil, switchMap,
基本應用與關鍵原始碼留存
應用一:大量數據統計計算
import React, { useState, useEffect } from 'react'
import { from } from "rxjs"
import { map, filter, reduce, tap } from "rxjs/operators"
function RxProceed({ dataList }) {
const [result, setResult] = useState(0)
useEffect(() => {
//※ RxJS 運算為同步執行,
const subs$ = from(dataList).pipe(
tap(x => console.log('input', x)), // tracing
filter(x => x % 2 === 0),
tap(x => console.log('filter', x)), // tracing
map(x => x + x),
tap(x => console.log('map', x)), // tracing
reduce((acc, x) => acc + x, 0),
tap(x => console.log('reduce', x)), // tracing
).subscribe(x => {
setResult(x)
});
return () => subs$.unsubscribe();
}, [dataList])
return (
<div>
<h3>RxJS Proceed</h3>
<p style={{ fontSize: '1.5em' }}>
result: {result}
</p>
</div>
)
}
應用二:讓訊息可訂閱
import React from 'react'
import { TextField } from 'widgets/InputFields'
import { Paper } from '@material-ui/core'
import { Subject, Observable , range, of, interval } from "rxjs"
import { map, filter, reduce, startWith, scan, tap, mapTo } from "rxjs/operators"
import { useObservable } from "rxjs-hooks"
/// 將透過Rx-Subject訂閱訊息
const subject$ = new Subject();
const observable$ = subject$.pipe(
startWith('init')
);
export default function RxSample2(props) {
const value = useObservable(() => observable$);
return (
<div>
<h2>RxJS 應用二:讓訊息可訂閱(make events subscribable)</h2>
<p>subscribe event: {value}</p>
<TextField name="foo" value={value}
onChange={(name, newValue) => {
subject$.next(newValue) // 送出訊息到 Rx-Subject$
}} />
<WatchRxSubject />
</div>
)
}
///--------------------------------------------------------
/// 觀看訊息並紀錄最後十筆訊息
function WatchRxSubject(propx) {
/// 觀察訊息,等同訂閱了訊息
const value = useObservable(() =>
observable$.pipe(
scan((msgList, msg) => [msg, ...msgList].slice(0,10), []),
tap(console.log) // tracing
));
return (
<Paper style={{margin:'0.5em', padding:'0.5em'}}>
<h3>觀察 RxJS Subject</h3>
<p>{JSON.stringify(value)}</p>
</Paper>
)
}
應用三:連續訊息運算:拖拉
import React from "react";
import { useEventCallback } from "rxjs-hooks";
import { fromEvent } from "rxjs";
import { map, switchMap, takeUntil, withLatestFrom } from "rxjs/operators";
export default function RxSample3(props) {
const classes = useStyles()
const [onMouseDown, [x, y]] = useEventCallback(
(event$, state$) =>
event$.pipe(
withLatestFrom(state$),
map(([e, prevPos]) => [e.clientX, e.clientY, prevPos]),
switchMap(([startX, startY, prevPos]) => {
return fromEvent(window, "mousemove").pipe(
map(moveEvent => {
return [
moveEvent.clientX - startX + prevPos[0],
moveEvent.clientY - startY + prevPos[1]
];
}),
takeUntil(fromEvent(window, "mouseup"))
);
})
),
[0, 0]
);
return (
<div className={classes.root}>
<h2>應用三:連續訊息運算:拖拉:mouse down→move→move→move→up</h2>
<div>{x},{y}</div>
<div className={classes.box} onMouseDown={onMouseDown} style={{ left: x, top: y }}>
drag me
</div>
</div>
);
}
應用四:連續訊息運算:訊息合併(merge events)
import React, { useState } from "react"
import { Button } from '@material-ui/core'
import { Subject, Observable, of } from "rxjs";
import { mapTo, map, merge, scan } from "rxjs/operators";
import { useObservable } from "rxjs-hooks";
//-----------------------------------------------------
//## make input event as Rx-Subject used to proceed
const event1$ = new Subject()
const observable1$ = event1$.pipe(mapTo(1))
const event2$ = new Subject()
const observable2$ = event2$.pipe(mapTo(-1))
const proceed$ = of(0).pipe(
merge(observable1$, observable2$), // 合併Observable
scan((acc, v) => acc + v, 0)
)
//-----------------------------------------------------
export default function RxSample4(props) {
const value = useObservable(() => proceed$)
//const [count, setCount] = useState(0)
return (
<div className={classes.root}>
<h2>應用四:連續訊息運算:訊息合併(merge events)</h2>
<p style={{ fontSize: '2em' }}>
//with useState : {count}<br/>
with useObsvable: {value}
</p>
<Button className={classes.button}
onClick={(e) => {
//setCount(c => c + 1)
event1$.next(e)
}}>加一
</Button>
<Button className={classes.button}
onClick={(e) => {
//setCount(c => c - 1)
event2$.next(e)
}}>減一
</Button>
</div >
);
}
Last updated