MassTransit 試用紀錄

Message Bus, MessageBus 應用準備。

引言

Message Bus, MessageBus 應用準備。

參考文章

MassTransit - Getting Started with the In-Memory Transport

MassTransit - Getting Started with the In-Memory Transport

Build Clean Messaging in .NET with MassTransit

Build Clean Messaging in .NET with MassTransit

開發環境

  • IDE: Visual Studio 2022

  • 執行平台: .NET6

  • 系統骨架: Blazor Server App

關鍵原碼紀錄

首先要有訊息交換合約/DTO

/// <summary>
/// the contract of MassTransitPractiseService
/// </summary>
public record HelloMessage
{
  public string Value { get; init; }
}

Queue 之 Producer

MyMessageProducer.cs
using MassTransit;

/// MassTransit 練習
/// 注意:此物件在背景執行。
/// <see cref="[MassTransit-第一個練習In Memory](https://masstransit.io/quick-starts/in-memory)"/>
public class MyMessageProducer : BackgroundService
{
  readonly ILogger<MassTransitMessageProducer> _logger;
  readonly IBus _bus;

  public MassTransitMessageProducer(ILogger<MassTransitMessageProducer> logger, IBus bus)
  {
    _logger = logger;
    _bus = bus;
  }

  protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  {
    while (!stoppingToken.IsCancellationRequested)
    {
      await Task.Yield(); // 讓其它執行緒先過去,避免主機卡死?
      
      // 送出訊息
      await _bus.Publish(new HelloMessage { Value = $"現在時間 {DateTimeOffset.Now}" }, stoppingToken);
      await Task.Delay(1000, stoppingToken);
    }
  }
}

Queue 之 Consumer

MyMessageConsumer.cs
using MassTransit;
using BlazorComponentBus;

/// ※注意:此物件也是在背景執行。
public class MyMessageConsumer : IConsumer<HelloMessage>
{
  readonly ILogger<MassTransitMessageConsumer> _logger;
  readonly ComponentBus _busSvc; // 另一種 message bus 以實作 notification。

  public MassTransitMessageConsumer(ILogger<MassTransitMessageConsumer> logger, ComponentBus busSvc)
  {
    _logger = logger;
    _busSvc = busSvc;
  }

  public async Task Consume(ConsumeContext<HelloMessage> context)
  {
    _logger.LogInformation($"Received message: {context.Message.Value}");

    /// ※ message 轉車到前端
    /// 因為此段碼也在背景執行緒執行所以不能直接轉送到前端。
    /// 本例再透過 BlazorComponentBus 元件轉送到前端以實作 notification。
    /// BlazorComponentBus 元件不在本例說明。
    await _busSvc.Publish<HelloMessage>(context.Message);
  }
}

註冊以執行

Program.cs
using MassTransit;

var builder = WebApplication.CreateBuilder(args);

/// 註冊:DI Service
[...略...]

builder.Services.AddMassTransit(c =>
{
  c.SetKebabCaseEndpointNameFormatter();

  // By default, sagas are in-memory, but should be changed to a durable
  // sega repository.
  c.SetInMemorySagaRepositoryProvider();

  var asm = typeof(Program).Assembly;
  c.AddConsumers(asm); // 把實作IConsumer介面的MyMessageConsumer類別註冊進去。
  c.AddSagaStateMachines(asm);
  c.AddSagas(asm);
  c.AddActivities(asm);

  c.UsingInMemory((ctx, cfg) =>
  {
    cfg.ConfigureEndpoints(ctx);
  });
});

/// 註冊:背景工作 hosted
builder.Services.AddHostedService<MyMessageProducer>();

var app = builder.Build();

[...略...]

app.Run();

(EOF)

Last updated