Rx.NET 練習紀錄
Reactive Extensions, ReactiveX, RxJS, rx.js
引言
Rx.NET
就是 .NET 版的 RxJS
。 理論基礎全部都一樣。然在實作過程『語形』因語言而有所差異,『語法』、『語意』是一樣的。 Rx.NET 語形以 Linq 為基礎需要一些與 RxJS 指令的對映想像,不過還滿直覺易猜的就不做對映表了。
要點回顧
參考之前 RxJS 練習紀錄。
RxJS: Reactive Extensions For JavaScript 試用RxJS is a library for composing asynchronous and event-based programs by using observable sequences. RxJS 為設計用來處理訊息的非同步組合操作。Rx 的三個主要原素:Subject, Subscribe, Observer 。其中 Subject is Objservable。主題是可以被觀察/被訂閱的。
用程式碼解釋
subject.subscribe(observer);
主題.訂閱於(觀察者)
// 觀察者訂閱了主題。
// 某客戶訂閱了雜誌。
// The observer subscribes to the subject.
// 抽象化 =>
observable.subscribe(observer);
可觀察的.訂閱於(觀察者)
// 觀察者訂閱了可觀察者。
// The observer subscribes to the observable.
重點不在指令而在目的。重點在訊息(資料)流的需求能否滿足。滿足需求的 Rx 串連指令可能有多種解法。
最簡單的一條龍訊息(資料)流:
stream_input → Subject → Observable → Observable → Observable → Subscribe → stream_output
若不知 Rx.NET 指令怎麼下可以參考 RxJS 是如何下的,再對映相同指令就行了。
先總結
Rx 適合訊息(資料)流複雜運算的應用,基於使用相同的高階指令提高可閱讀性;然若是簡單的訊息流導入 Rx 會造成反效果讓可讀性變差。
開發環境
平台: .NET8 IDE: Visual Studio 2022 骨架: NET8 Blazor WebAssembly Rx庫: Rx.NET 第6版
以下為練習紀錄
只顯示關鍵程式碼。
merge & scan
將合併二條訊息流(主題),一個『+1』一個『-1』再串流加總。
送出『+1』
@using System.Reactive.Subjects
...略...
<button @onclick=HandleClick>+1</button>
@code {
[CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
Subject<int> PlusSubject => BizState.plusSubject;
void HandleClick()
{
PlusSubject.OnNext(+1); // <--- 送出『+1』
}
}
送出『-1』
@using System.Reactive.Subjects
...略...
<button @onclick=HandleClick>-1</button>
@code {
[CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
Subject<int> MinusSubject => BizState.minusSubject;
void HandleClick()
{
MinusSubject.OnNext(-1); // <--- 送出『+1』
}
}
串流加總
@using System.Reactive
@using System.Reactive.Linq
@using System.Reactive.Subjects
...略...
@code {
[CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
Subject<int> PlusSubject => BizState.plusSubject;
Subject<int> MinusSubject => BizState.minusSubject;
//# State
MyObserver<int> plusObserver = new("plus");
MyObserver<int> minusObserver = new("minus");
int accValue = 0;
MyObserver<int> accObserver = new("acc");
protected override void OnInitialized()
{
...略...
// merge & scan: 合併二條訊息流(主題),再串流加總。
IObservable<int> accumulate = Observable.Merge(PlusSubject, MinusSubject)
.Scan(0, (int acc, int cur) => acc + cur);
accumulate.Subscribe(accObserver); //--- 再訂閱至『累加』串流
accumulate.Subscribe(acc =>
{
accValue = acc; //--- 現在的『累加』結果。
InvokeAsync(StateHasChanged);
});
}
}
沒圖沒真象之一

event 間隔延遲 - 拉長 click 間的間隔時間
不是固定延遲 event 送達時間,是把 event 間隔時間刻意拉長1秒。
下面關鍵的碼不到十行寫了三天啊!
@using System.Reactive
@using System.Reactive.Linq
@using System.Reactive.Subjects
@implements IDisposable
<h3>RxDemo03 - 拉長 click 間的間隔時間。</h3>
...略...
<button @onclick=HandleClick>Click me</button>
...略...
@code {
MyObserver<string> clickObserver = new("click");
MyObserver<string> bufferedObserver = new("buffered");
MyObserver<string> resultObserver = new("result");
IObservable<long> interval = Observable.Interval(TimeSpan.FromSeconds(1d));
Subject<string> clickSubject = new();
protected override void OnInitialized()
{
// init
...略...
// 訂閱
clickSubject.Subscribe(clickObserver);
// 分批延遲間隔: click → Buffer → buffered_click
var buffered = clickSubject.Buffer(TimeSpan.FromSeconds(1d)).Where(b => b.Count > 0);
buffered.Select(bufferedItems => $"b[{bufferedItems.Count}]").Subscribe(bufferedObserver);
// 依序合併: Concat(delayed_click)
// buffered_click → Zip(interval) → Select → delayed_click
Observable.Concat(buffered.Select(bufferedItems => bufferedItems.ToObservable().Zip(interval)))
.Select(x => x.First)
.Subscribe(resultObserver);
// 平行合併,等同 merge。順序可能會變化。
//buffered.SelectMany(bufferedItems => bufferedItems.ToObservable().Zip(interval).Select(z => z.First))
// .Subscribe(resultObserver);
}
int clickIdx = 1;
void HandleClick()
{
clickSubject.OnNext($"c{clickIdx++}");
}
}
沒圖沒真象之二

完整原始碼
參考文件
Last updated