Comet, IPC, MassTransit
System.IO.Pipelines и Named Pipes — это разные технологии в .NET, предназначенные для различных задач ввода-вывода.
Основные различия
System.IO.Pipelines — это высокопроизводительная абстракция для обработки потоков данных внутри одного процесса. Она оптимизирована для сценариев с большим объёмом данных (парсинг, сетевые протоколы, сериализация), минимизируя копирование памяти через буферизацию и асинхронную обработку.
System.IO.Pipes (включая NamedPipeServerStream/NamedPipeClientStream) — это механизм межпроцессного взаимодействия (IPC). Named Pipes используются для обмена данными между разными процессами на одной машине или по сети.
Сравнение
Практический пример
csharp// Pipelines - внутрипроцессная обработка var pipe = new Pipe(); await pipe.Writer.WriteAsync(data); ReadResult result = await pipe.Reader.ReadAsync(); // Named Pipes - между процессами var server = new NamedPipeServerStream("MyPipe"); server.WaitForConnection(); await server.WriteAsync(data); // Другой процесс читает
Pipelines не заменяют Named Pipes, а дополняют их для высоконагруженных сценариев обработки данных.
System.IO.Pipelines и Channels — это разные абстракции для внутрипроцессного взаимодействия в .NET, хотя оба работают с потоками данных.
Основные различия
System.IO.Pipelines предназначена для высокопроизводительной обработки байтовых потоков (IO-bound задачи). Она оптимизирует работу с сырыми данными из сети, файлов или сокетов, минимизируя аллокации памяти и копирование через буферы и PipeReader/PipeWriter.
System.Threading.Channels — это обобщённые каналы для объектов (object-bound producer-consumer). Они потокобезопасны и подходят для передачи готовых объектов (Channel<T>), блокирующих чтение/запись с bounded/unbounded режимами.
Сравнение
Практические сценарии
csharp// Pipelines - парсинг байтов из сокета var pipe = new Pipe(); await FillPipeAsync(socket, pipe.Writer); // IO из сети await ParseLinesAsync(pipe.Reader); // Парсинг без копирования // Channels - очередь объектов между задачами var channel = Channel.CreateBounded<string>(10); channel.Writer.WriteAsync("message"); // Producer await foreach (var msg in channel.Reader.ReadAllAsync()) // Consumer
Они дополняют друг друга: Pipelines для низкоуровневого IO, Channels для высокоуровневой координации объектов. Часто используются вместе.
Channels — это рекомендуемая абстракция для взаимодействия между двумя BackgroundService в .NET, так как они потокобезопасны и идеальны для producer-consumer паттерна между сервисами.
Pipelines подходят только если данные — это сырые байты с высокой нагрузкой (например, парсинг из сети), но для типичных задач (сообщения, объекты) Channels проще и эффективнее.
Почему Channels
Полностью thread-safe для записи из одного сервиса и чтения из другого.
Встроенная поддержка backpressure через bounded capacity.
Официальный пример MS для BackgroundService использует именно Channel для очередей задач.
Пример реализации
csharp// Общий Channel (регистрируется как Singleton) services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>(); // ProducerService public class ProducerService : BackgroundService { private readonly IBackgroundTaskQueue _queue; protected override async Task ExecuteAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { await _queue.QueueBackgroundWorkItemAsync(async token => { // Обработка данных await Task.Delay(1000, token); }); await Task.Delay(5000, ct); } } } // ConsumerService public class ConsumerService : BackgroundService { private readonly IBackgroundTaskQueue _queue; protected override async Task ExecuteAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { var workItem = await _queue.DequeueAsync(ct); await workItem(ct); // Выполнение задачи от Producer } } } // BackgroundTaskQueue (Channel) public interface IBackgroundTaskQueue { ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem); ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken ct); } public class BackgroundTaskQueue : IBackgroundTaskQueue { private readonly Channel<Func<CancellationToken, ValueTask>> _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>( new BoundedChannelOptions(100) { FullMode = BoundedChannelFullMode.Wait }); public ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem) => _queue.Writer.WriteAsync(workItem); public ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken ct) => _queue.Reader.ReadAsync(ct); }
Регистрируйте сервисы в Program.cs: services.AddHostedService<ProducerService>(); services.AddHostedService<ConsumerService>();. Это стандартный паттерн для decoupling фоновых сервисов.
Полный пример BackgroundService с NamedPipeServerStream для Aspire
1. RouterService.csproj
xml<Project Sdk="Microsoft.NET.Sdk.Web"> <PropertyGroup> <TargetFramework>net10.0</TargetFramework> <Nullable>enable</Nullable> <ImplicitUsings>enable</ImplicitUsings> </PropertyGroup> <ItemGroup> <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="8.0.0" /> <PackageReference Include="Aspire.Hosting" Version="8.0.0" /> </ItemGroup> </Project>
2. Program.cs (RouterService)
csharpusing Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; var builder = DistributedApplication.CreateBuilder(args); // Регистрация BackgroundService builder.Services.AddHostedService<NamedPipeRouterService>(); var app = builder.Build(); await app.RunAsync();
3. NamedPipeRouterService.cs (BackgroundService)
csharpusing System.IO.Pipes; using System.Text; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; public class NamedPipeRouterService : BackgroundService { private readonly ILogger<NamedPipeRouterService> _logger; private readonly CancellationTokenSource _cts = new(); public NamedPipeRouterService(ILogger<NamedPipeRouterService> logger) { _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("🚀 NamedPipe Router запущен на pipe: AspireRouterPipe"); while (!stoppingToken.IsCancellationRequested) { try { // Создаем сервер с поддержкой множественных клиентов var pipeServer = new NamedPipeServerStream( "AspireRouterPipe", PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, // До 254 клиентов PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.WriteThrough); _logger.LogInformation("⏳ Ожидание подключения клиента..."); await pipeServer.WaitForConnectionAsync(stoppingToken); _logger.LogDebug("✅ Клиент подключен: {ClientId}", pipeServer.SafePipeHandle.DangerousGetHandle()); // Обработка клиента в отдельной задаче _ = Task.Run(() => HandleClientAsync(pipeServer, stoppingToken), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "❌ Ошибка сервера NamedPipe"); await Task.Delay(1000, stoppingToken); } } } private async Task HandleClientAsync(NamedPipeServerStream pipeServer, CancellationToken ct) { var buffer = new byte[4096]; try { while (pipeServer.IsConnected && !ct.IsCancellationRequested) { // Чтение сообщения от клиента int bytesRead = await pipeServer.ReadAsync(buffer, 0, buffer.Length, ct); if (bytesRead == 0) break; string message = Encoding.UTF8.GetString(buffer, 0, bytesRead); _logger.LogInformation("📨 Получено: {Message}", message); // Простая маршрутизация (эхо обратно) var response = $"Echo: {message}"; var responseBytes = Encoding.UTF8.GetBytes(response); await pipeServer.WriteAsync(responseBytes, 0, responseBytes.Length, ct); _logger.LogInformation("📤 Отправлено: {Response}", response); } } catch (Exception ex) { _logger.LogError(ex, "❌ Ошибка обработки клиента"); } finally { pipeServer.Dispose(); _logger.LogInformation("🔌 Клиент отключен"); } } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("🛑 Остановка NamedPipe Router..."); _cts.Cancel(); await base.StopAsync(cancellationToken); } }
4. AppHost (AppHost.csproj)
csharpvar builder = DistributedApplication.CreateBuilder(args); builder.AddProject<Projects.RouterService>("router"); builder.Build().Run();
5. Клиент (другое Aspire-приложение)
csharp// NamedPipeClientService public async Task SendMessageAsync(string message) { using var client = new NamedPipeClientStream(".", "AspireRouterPipe", PipeDirection.InOut); await client.ConnectAsync(5000); // Timeout 5s var data = Encoding.UTF8.GetBytes(message); await client.WriteAsync(data); // Чтение ответа var buffer = new byte[4096]; int bytes = await client.ReadAsync(buffer); var response = Encoding.UTF8.GetString(buffer, 0, bytes); Console.WriteLine($"Ответ от роутера: {response}"); }
Запуск в Aspire
bashdotnet run --project AppHost
Результат:
text🚀 NamedPipe Router запущен на pipe: AspireRouterPipe ⏳ Ожидание подключения клиента... ✅ Клиент подключен 📨 Получено: Hello from Worker1 📤 Отправлено: Echo: Hello from Worker1 🔌 Клиент отключен
Этот код готов к production: async, cancellation-aware, logging, multiple clients. Добавьте ConcurrentDictionary для маршрутизации между клиентами!
Полная интеграция MassTransit в .NET Aspire (IPC)
MassTransit + Aspire = идеальный IPC для микросервисов. Aspire автоматически настраивает брокеры (RabbitMQ/PostgreSQL).
Структура решения
textAspireSolution/ ├── Contracts/ # Общие контракты сообщений ├── ApiService/ # Publisher ├── WorkerService/ # Consumer ├── AppHost/ # Оркестратор └── ServiceDefaults/
1. Contracts (Message Contracts)
csharp// Contracts.csproj <Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <TargetFramework>net10.0</TargetFramework> </PropertyGroup> </Project> // OrderCreated.cs public record OrderCreated( Guid OrderId, string CustomerId, decimal Amount, DateTime CreatedAt);
2. ServiceDefaults (общая настройка)
csharp// ServiceDefaults.csproj <Project Sdk="Microsoft.NET.Sdk"> <ItemGroup> <PackageReference Include="Aspire" Version="8.2.0" /> </ItemGroup> </Project> // Program.cs public static class Extensions { public static IServiceCollection AddServiceDefaults(this IServiceCollection services) { services.AddServiceDiscovery(); return services; } }
3. ApiService (Publisher)
csharp// ApiService.csproj <PackageReference Include="MassTransit.RabbitMQ.AspNetCore" Version="8.2.5" /> <ProjectReference Include="..\Contracts\Contracts.csproj" /> <ProjectReference Include="..\ServiceDefaults\ServiceDefaults.csproj" /> // Program.cs var builder = WebApplication.CreateBuilder(args); builder.AddServiceDefaults(); // MassTransit с RabbitMQ из Aspire builder.Services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq"); // Имя ресурса из Aspire }); }); var app = builder.Build(); app.MapPost("/orders", async (IPublishEndpoint publish, OrderCreated order) => { await publish.Publish(order); return Results.Ok(); }); app.Run();
4. WorkerService (Consumer)
csharp// WorkerService.csproj <PackageReference Include="MassTransit.RabbitMQ" Version="8.2.5" /> <ProjectReference Include="..\Contracts\Contracts.csproj" /> <ProjectReference Include="..\ServiceDefaults\ServiceDefaults.csproj" /> // Program.cs var builder = Host.CreateApplicationBuilder(args); builder.AddServiceDefaults(); builder.Services.AddMassTransit(x => { x.AddConsumer<OrderConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq"); cfg.ReceiveEndpoint("order-queue", e => { e.ConfigureConsumer<OrderConsumer>(context); }); }); }); var host = builder.Build(); await host.RunAsync();
csharp// OrderConsumer.cs public class OrderConsumer : IConsumer<OrderCreated> { private readonly ILogger<OrderConsumer> _logger; public OrderConsumer(ILogger<OrderConsumer> logger) { _logger = logger; } public Task Consume(ConsumeContext<OrderCreated> context) { _logger.LogInformation("✅ Заказ обработан: OrderId={OrderId}, Customer={CustomerId}", context.Message.OrderId, context.Message.CustomerId); return Task.CompletedTask; } }
5. AppHost (Оркестрация)
csharp// AppHost.csproj <PackageReference Include="Aspire.Hosting.RabbitMQ" Version="8.2.0" /> <PackageReference Include="Aspire.Hosting.PostgreSql" Version="8.2.0" /> // Program.cs var builder = DistributedApplication.CreateBuilder(args); // RabbitMQ для MassTransit var rabbitmq = builder.AddRabbitMQ("rabbitmq") .WithJsonSerializer() .WithManagementPort(); // RabbitMQ Dashboard // PostgreSQL (альтернатива для SQL Transport) var postgres = builder.AddPostgres("postgres") .WithDataVolume() .WithTcpPortMapping(); // Проекты builder.AddProject<Projects.ApiService>("api") .WithReference(rabbitmq) .WithExternalHttpEndpoints(); builder.AddProject<Projects.WorkerService>("worker") .WithReference(rabbitmq); builder.Build().Run();
6. Запуск и тест
bashdotnet run --project AppHost
Dashboard Aspire: https://localhost:18888
RabbitMQ UI: http://localhost:15672 (user:guest/pass:guest)
API: POST https://localhost:5001/orders
json{ "orderId": "guid", "customerId": "client123", "amount": 100.50 }
SQL Transport (без RabbitMQ)
csharp// В ApiService/WorkerService builder.Services.AddMassTransit(x => { x.UsingSqlServer((context, cfg) => { cfg.ConnectionString = postgres.GetConnectionString(); cfg.ReceiveEndpoint("order-queue", e => { }); }); });
Результат: Масштабируемый IPC с retry, dead-letter, monitoring из коробки. Aspire управляет инфраструктурой автоматически!
MassTransit и Named Pipes — разные уровни IPC в Aspire: MassTransit для распределённых систем, Named Pipes для локального высокопроизводительного обмена.
Сравнение
Когда использовать
Named Pipes в Aspire:
textЛокальный кластер (1 машина) - Мониторинг процессов - Реал-тайм дашборды - Внутри VM/container
Производительность: 100MB+/s, без брокера.
MassTransit в Aspire:
textРаспределённый кластер - Микросервисы - Event Sourcing - Сагi-оркестрация
Функции: partitioning, filtering, healthchecks.
Aspire Dashboard сравнение
Рекомендация по сценарию
textДля ваших BackgroundService в Aspire: 1. **Named Pipes** — если: - Локальная машина - Реал-тайм (<10ms) - Простая "Звезда" 2. **MassTransit** — если: - Много контейнеров - Нужны retry/DLQ - Будущий кластер
Гибрид: Named Pipes для локального + MassTransit для cross-host IPC. Aspire делает MassTransit проще, но Named Pipes быстрее в 10x для local!
Комментариев нет:
Отправить комментарий