안녕하세요😊 오늘은 C#의 System.Threading.Channels를 활용하여 생산자-소비자 패턴을 구현하는 방법을 알아보겠습니다!

🎈 System.Threading.Channels란? System.Threading.Channels는 .NET에서 제공하는 고성능 채널(Channel) 기반의 데이터 전송 라이브러리입니다. 이를 활용하면 비동기적으로 데이터를 안전하게 주고받을 수 있습니다.

📌 주요 특징:

  • 비동기 데이터 전송 ✨: Task 기반의 비동기 프로그래밍 지원
  • 생산자-소비자 패턴 구현 용이 🏭: 여러 개의 생산자와 소비자가 데이터를 주고받을 수 있음
  • 채널의 크기 조절 가능 📏: 유연한 용량 설정과 대기 모드 지원

💡 단일 생산자가 5개의 Task를 생성하고 소비자가 이를 처리하는 패턴 구현 아래 예제에서는 단일 생산자가 5개의 Task를 생성하여 데이터를 추가하고, 소비자가 이를 읽어 처리하는 방식으로 프로그램을 설계하겠습니다!

 

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // 채널 생성 (용량 20으로 설정)
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(20)
        {
            FullMode = BoundedChannelFullMode.Wait // 채널이 가득 차면 대기
        });
        
        // 취소 토큰 (10초 후 작업 취소용)
        using var cts = new CancellationTokenSource();
        cts.CancelAfter(TimeSpan.FromSeconds(10));
        
        // 생산자 실행 (5개의 Task 등록)
        var producerTask = RunProducer(channel.Writer, cts.Token);
        
        // 소비자 실행
        var consumerTask = RunConsumer(channel.Reader, cts.Token);
        
        try
        {
            await Task.WhenAll(producerTask, consumerTask);
            Console.WriteLine("모든 작업이 정상적으로 완료되었습니다.");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("작업이 취소되었습니다.");
        }
    }
    
    static async Task RunProducer(ChannelWriter<int> writer, CancellationToken token)
    {
        var tasks = new List<Task>();
        var random = new Random();
        
        // 5개의 Task를 실행하여 데이터 생성
        for (int i = 0; i < 5; i++)
        {
            int producerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                for (int j = 0; j < 20; j++)
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(random.Next(100, 500), token);
                    int item = j * 100 + producerId;
                    await writer.WriteAsync(item, token);
                    Console.WriteLine($"생산자 {producerId}: 항목 {item} 생산");
                }
            }, token));
        }
        
        await Task.WhenAll(tasks);
        writer.Complete();
    }
    
    static async Task RunConsumer(ChannelReader<int> reader, CancellationToken token)
    {
        var tasks = new List<Task>();
        var random = new Random();
        
        // 5개의 소비자 Task 실행
        for (int i = 0; i < 5; i++)
        {
            int consumerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                while (await reader.WaitToReadAsync(token))
                {
                    while (reader.TryRead(out int item))
                    {
                        await Task.Delay(random.Next(200, 800), token);
                        Console.WriteLine($"소비자 {consumerId}: 항목 {item} 처리 완료");
                    }
                }
            }, token));
        }
        
        await Task.WhenAll(tasks);
    }
}

 

 

🔹 코드 설명

  1. 채널 생성 🏗️
    • Channel.CreateBounded<int>(new BoundedChannelOptions(20))을 사용해 용량 20인 채널을 생성합니다.
    • FullMode = BoundedChannelFullMode.Wait를 설정하여 채널이 가득 찼을 때 대기하도록 합니다.
  2. 생산자(Producer) 구현 🚀
    • RunProducer에서 5개의 Task를 생성하여 데이터를 채널에 추가합니다.
    • 각 생산자는 20개의 항목을 랜덤한 시간 간격을 두고 생성합니다.
  3. 소비자(Consumer) 구현 📦
    • RunConsumer에서 5개의 Task를 생성하여 데이터를 병렬로 읽어 처리합니다.
    • 데이터가 존재하면 TryRead를 통해 데이터를 읽어 처리합니다.
  4. 취소 및 예외 처리 🚦
    • CancellationTokenSource를 사용해 10초 후 자동 취소하도록 설정합니다.
    • OperationCanceledException을 처리하여 작업이 정상적으로 종료되도록 합니다.

결과 예시

생산자 1: 항목 101 생산
생산자 2: 항목 201 생산
소비자 3: 항목 101 처리 완료
소비자 1: 항목 201 처리 완료
...
모든 작업이 정상적으로 완료되었습니다.

이제 하나의 생산자가 5개의 Task를 등록하여 데이터를 생성하고 소비자가 5개의 Task를 동시에 처리하는 방식으로 개선되었습니다! 🎉

+ Recent posts