안녕하세요😊 오늘은 C#의 System.Threading.Channels를 활용하여 생산자-소비자 패턴을 구현하는 방법을 알아보겠습니다!
🎈 System.Threading.Channels란? System.Threading.Channels는 .NET에서 제공하는 고성능 채널(Channel) 기반의 데이터 전송 라이브러리입니다. 이를 활용하면 비동기적으로 데이터를 안전하게 주고받을 수 있습니다.
📌 주요 특징:
- 비동기 데이터 전송 ✨: Task 기반의 비동기 프로그래밍 지원
- 생산자-소비자 패턴 구현 용이 🏭: 여러 개의 생산자와 소비자가 데이터를 주고받을 수 있음
- 채널의 크기 조절 가능 📏: 유연한 용량 설정과 대기 모드 지원
💡 단일 생산자가 5개의 Task를 생성하고 소비자가 이를 처리하는 패턴 구현 아래 예제에서는 단일 생산자가 5개의 Task를 생성하여 데이터를 추가하고, 소비자가 이를 읽어 처리하는 방식으로 프로그램을 설계하겠습니다!
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
// 채널 생성 (용량 20으로 설정)
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(20)
{
FullMode = BoundedChannelFullMode.Wait // 채널이 가득 차면 대기
});
// 취소 토큰 (10초 후 작업 취소용)
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));
// 생산자 실행 (5개의 Task 등록)
var producerTask = RunProducer(channel.Writer, cts.Token);
// 소비자 실행
var consumerTask = RunConsumer(channel.Reader, cts.Token);
try
{
await Task.WhenAll(producerTask, consumerTask);
Console.WriteLine("모든 작업이 정상적으로 완료되었습니다.");
}
catch (OperationCanceledException)
{
Console.WriteLine("작업이 취소되었습니다.");
}
}
static async Task RunProducer(ChannelWriter<int> writer, CancellationToken token)
{
var tasks = new List<Task>();
var random = new Random();
// 5개의 Task를 실행하여 데이터 생성
for (int i = 0; i < 5; i++)
{
int producerId = i + 1;
tasks.Add(Task.Run(async () =>
{
for (int j = 0; j < 20; j++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(random.Next(100, 500), token);
int item = j * 100 + producerId;
await writer.WriteAsync(item, token);
Console.WriteLine($"생산자 {producerId}: 항목 {item} 생산");
}
}, token));
}
await Task.WhenAll(tasks);
writer.Complete();
}
static async Task RunConsumer(ChannelReader<int> reader, CancellationToken token)
{
var tasks = new List<Task>();
var random = new Random();
// 5개의 소비자 Task 실행
for (int i = 0; i < 5; i++)
{
int consumerId = i + 1;
tasks.Add(Task.Run(async () =>
{
while (await reader.WaitToReadAsync(token))
{
while (reader.TryRead(out int item))
{
await Task.Delay(random.Next(200, 800), token);
Console.WriteLine($"소비자 {consumerId}: 항목 {item} 처리 완료");
}
}
}, token));
}
await Task.WhenAll(tasks);
}
}
🔹 코드 설명
- 채널 생성 🏗️
- Channel.CreateBounded<int>(new BoundedChannelOptions(20))을 사용해 용량 20인 채널을 생성합니다.
- FullMode = BoundedChannelFullMode.Wait를 설정하여 채널이 가득 찼을 때 대기하도록 합니다.
- 생산자(Producer) 구현 🚀
- RunProducer에서 5개의 Task를 생성하여 데이터를 채널에 추가합니다.
- 각 생산자는 20개의 항목을 랜덤한 시간 간격을 두고 생성합니다.
- 소비자(Consumer) 구현 📦
- RunConsumer에서 5개의 Task를 생성하여 데이터를 병렬로 읽어 처리합니다.
- 데이터가 존재하면 TryRead를 통해 데이터를 읽어 처리합니다.
- 취소 및 예외 처리 🚦
- CancellationTokenSource를 사용해 10초 후 자동 취소하도록 설정합니다.
- OperationCanceledException을 처리하여 작업이 정상적으로 종료되도록 합니다.
✅ 결과 예시
생산자 1: 항목 101 생산
생산자 2: 항목 201 생산
소비자 3: 항목 101 처리 완료
소비자 1: 항목 201 처리 완료
...
모든 작업이 정상적으로 완료되었습니다.
이제 하나의 생산자가 5개의 Task를 등록하여 데이터를 생성하고 소비자가 5개의 Task를 동시에 처리하는 방식으로 개선되었습니다! 🎉
'Language > C#' 카테고리의 다른 글
[C#] .NET Core에서 ActionFilterAttribute 활용하기 (0) | 2025.03.07 |
---|---|
[C#] .NET에서 Dependency Injection(DI) 기초 가이드 (0) | 2025.02.26 |
[C#] C# 클로저(Closure) 쉽게 이해하기 (0) | 2025.02.07 |
[C#] DotNetty로 TCP 소켓 통신 구현: Unity와 게임 서버 간 패킷 교환하기 (0) | 2025.01.03 |
[C#] protobuf-net 프로토콜 (0) | 2025.01.02 |