Rx.NET 練習紀錄

Reactive Extensions, ReactiveX, RxJS, rx.js

引言

Rx.NET 就是 .NET 版的 RxJS。 理論基礎全部都一樣。然在實作過程『語形』因語言而有所差異,『語法』、『語意』是一樣的。 Rx.NET 語形以 Linq 為基礎需要一些與 RxJS 指令的對映想像,不過還滿直覺易猜的就不做對映表了。

要點回顧

參考之前 RxJS 練習紀錄。

RxJS: Reactive Extensions For JavaScript 試用RxJS is a library for composing asynchronous and event-based programs by using observable sequences. RxJS 為設計用來處理訊息的非同步組合操作。

Rx 的三個主要原素:Subject, Subscribe, Observer 。其中 Subject is Objservable。主題是可以被觀察/被訂閱的。

用程式碼解釋

subject.subscribe(observer);
主題.訂閱於(觀察者)
// 觀察者訂閱了主題。
// 某客戶訂閱了雜誌。
// The observer subscribes to the subject.

// 抽象化 =>
observable.subscribe(observer);
可觀察的.訂閱於(觀察者)
// 觀察者訂閱了可觀察者。
// The observer subscribes to the observable.

重點不在指令而在目的。重點在訊息(資料)流的需求能否滿足。滿足需求的 Rx 串連指令可能有多種解法。

最簡單的一條龍訊息(資料)流:

stream_input → Subject → Observable → Observable → Observable → Subscribe → stream_output

若不知 Rx.NET 指令怎麼下可以參考 RxJS 是如何下的,再對映相同指令就行了。

先總結

Rx 適合訊息(資料)流複雜運算的應用,基於使用相同的高階指令提高可閱讀性;然若是簡單的訊息流導入 Rx 會造成反效果讓可讀性變差。

開發環境

平台: .NET8 IDE: Visual Studio 2022 骨架: NET8 Blazor WebAssembly Rx庫: Rx.NET 第6版

以下為練習紀錄

只顯示關鍵程式碼。

merge & scan

將合併二條訊息流(主題),一個『+1』一個『-1』再串流加總。

送出『+1』

PlusProducer.razor
@using System.Reactive.Subjects

...略...
  <button @onclick=HandleClick>+1</button>

@code {
  [CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
  Subject<int> PlusSubject => BizState.plusSubject;

  void HandleClick()
  {
    PlusSubject.OnNext(+1); // <--- 送出『+1』
  }
}

送出『-1』

MinusProducer.razor
@using System.Reactive.Subjects

...略...
  <button @onclick=HandleClick>-1</button>
  
@code {
  [CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
  Subject<int> MinusSubject => BizState.minusSubject;

  void HandleClick()
  {
    MinusSubject.OnNext(-1); // <--- 送出『+1』
  }
}

串流加總

Consumer.razor
@using System.Reactive
@using System.Reactive.Linq
@using System.Reactive.Subjects

...略...

@code {
  [CascadingParameter] _RXDEMO02 BizState { get; set; } = default!;
  Subject<int> PlusSubject => BizState.plusSubject;
  Subject<int> MinusSubject => BizState.minusSubject;

  //# State
  MyObserver<int> plusObserver = new("plus");
  MyObserver<int> minusObserver = new("minus");
  int accValue = 0;
  MyObserver<int> accObserver = new("acc");

  protected override void OnInitialized()
  {
    ...略...

    // merge & scan: 合併二條訊息流(主題),再串流加總。
    IObservable<int> accumulate = Observable.Merge(PlusSubject, MinusSubject)
                                            .Scan(0, (int acc, int cur) => acc + cur);

    accumulate.Subscribe(accObserver); //--- 再訂閱至『累加』串流
    accumulate.Subscribe(acc =>    
    {
      accValue = acc; //--- 現在的『累加』結果。
      InvokeAsync(StateHasChanged);
    });
  }
}

沒圖沒真象之一

累加總

event 間隔延遲 - 拉長 click 間的間隔時間

不是固定延遲 event 送達時間,是把 event 間隔時間刻意拉長1秒。

下面關鍵的碼不到十行寫了三天啊!

@using System.Reactive
@using System.Reactive.Linq
@using System.Reactive.Subjects
@implements IDisposable

<h3>RxDemo03 - 拉長 click 間的間隔時間。</h3>
...略...
<button @onclick=HandleClick>Click me</button>
...略...

@code {
  MyObserver<string> clickObserver = new("click");
  MyObserver<string> bufferedObserver = new("buffered");
  MyObserver<string> resultObserver = new("result");

  IObservable<long> interval = Observable.Interval(TimeSpan.FromSeconds(1d));
  Subject<string> clickSubject = new();

  protected override void OnInitialized()
  {
    // init
    ...略...

    // 訂閱
    clickSubject.Subscribe(clickObserver);
    
    // 分批延遲間隔: click → Buffer → buffered_click 
    var buffered = clickSubject.Buffer(TimeSpan.FromSeconds(1d)).Where(b => b.Count > 0);
    buffered.Select(bufferedItems => $"b[{bufferedItems.Count}]").Subscribe(bufferedObserver);

    // 依序合併: Concat(delayed_click)
    // buffered_click → Zip(interval) → Select → delayed_click    
    Observable.Concat(buffered.Select(bufferedItems => bufferedItems.ToObservable().Zip(interval)))
              .Select(x => x.First)
              .Subscribe(resultObserver);

    // 平行合併,等同 merge。順序可能會變化。
    //buffered.SelectMany(bufferedItems => bufferedItems.ToObservable().Zip(interval).Select(z => z.First))
    //        .Subscribe(resultObserver);
  }

  int clickIdx = 1;
  void HandleClick()
  {
    clickSubject.OnNext($"c{clickIdx++}");
  }
}

沒圖沒真象之二

完整原始碼

參考文件

Last updated