MassTransit 試用紀錄
Message Bus, MessageBus 應用準備。
引言
Message Bus, MessageBus 應用準備。
參考文章
MassTransit - Getting Started with the In-Memory Transport
Build Clean Messaging in .NET with MassTransit
開發環境
IDE: Visual Studio 2022
執行平台: .NET6
系統骨架: Blazor Server App
關鍵原碼紀錄
首先要有訊息交換合約/DTO
/// <summary>
/// the contract of MassTransitPractiseService
/// </summary>
public record HelloMessage
{
public string Value { get; init; }
}
Queue 之 Producer
using MassTransit;
/// MassTransit 練習
/// 注意:此物件在背景執行。
/// <see cref="[MassTransit-第一個練習In Memory](https://masstransit.io/quick-starts/in-memory)"/>
public class MyMessageProducer : BackgroundService
{
readonly ILogger<MassTransitMessageProducer> _logger;
readonly IBus _bus;
public MassTransitMessageProducer(ILogger<MassTransitMessageProducer> logger, IBus bus)
{
_logger = logger;
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Yield(); // 讓其它執行緒先過去,避免主機卡死?
// 送出訊息
await _bus.Publish(new HelloMessage { Value = $"現在時間 {DateTimeOffset.Now}" }, stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}
Queue 之 Consumer
using MassTransit;
using BlazorComponentBus;
/// ※注意:此物件也是在背景執行。
public class MyMessageConsumer : IConsumer<HelloMessage>
{
readonly ILogger<MassTransitMessageConsumer> _logger;
readonly ComponentBus _busSvc; // 另一種 message bus 以實作 notification。
public MassTransitMessageConsumer(ILogger<MassTransitMessageConsumer> logger, ComponentBus busSvc)
{
_logger = logger;
_busSvc = busSvc;
}
public async Task Consume(ConsumeContext<HelloMessage> context)
{
_logger.LogInformation($"Received message: {context.Message.Value}");
/// ※ message 轉車到前端
/// 因為此段碼也在背景執行緒執行所以不能直接轉送到前端。
/// 本例再透過 BlazorComponentBus 元件轉送到前端以實作 notification。
/// BlazorComponentBus 元件不在本例說明。
await _busSvc.Publish<HelloMessage>(context.Message);
}
}
註冊以執行
using MassTransit;
var builder = WebApplication.CreateBuilder(args);
/// 註冊:DI Service
[...略...]
builder.Services.AddMassTransit(c =>
{
c.SetKebabCaseEndpointNameFormatter();
// By default, sagas are in-memory, but should be changed to a durable
// sega repository.
c.SetInMemorySagaRepositoryProvider();
var asm = typeof(Program).Assembly;
c.AddConsumers(asm); // 把實作IConsumer介面的MyMessageConsumer類別註冊進去。
c.AddSagaStateMachines(asm);
c.AddSagas(asm);
c.AddActivities(asm);
c.UsingInMemory((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
});
});
/// 註冊:背景工作 hosted
builder.Services.AddHostedService<MyMessageProducer>();
var app = builder.Build();
[...略...]
app.Run();
(EOF)
Last updated