안녕하세요😊 오늘은 C#의 System.Threading.Channels를 활용하여 생산자-소비자 패턴을 구현하는 방법을 알아보겠습니다!

🎈 System.Threading.Channels란? System.Threading.Channels는 .NET에서 제공하는 고성능 채널(Channel) 기반의 데이터 전송 라이브러리입니다. 이를 활용하면 비동기적으로 데이터를 안전하게 주고받을 수 있습니다.

📌 주요 특징:

  • 비동기 데이터 전송 ✨: Task 기반의 비동기 프로그래밍 지원
  • 생산자-소비자 패턴 구현 용이 🏭: 여러 개의 생산자와 소비자가 데이터를 주고받을 수 있음
  • 채널의 크기 조절 가능 📏: 유연한 용량 설정과 대기 모드 지원

💡 단일 생산자가 5개의 Task를 생성하고 소비자가 이를 처리하는 패턴 구현 아래 예제에서는 단일 생산자가 5개의 Task를 생성하여 데이터를 추가하고, 소비자가 이를 읽어 처리하는 방식으로 프로그램을 설계하겠습니다!

 

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // 채널 생성 (용량 20으로 설정)
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(20)
        {
            FullMode = BoundedChannelFullMode.Wait // 채널이 가득 차면 대기
        });
        
        // 취소 토큰 (10초 후 작업 취소용)
        using var cts = new CancellationTokenSource();
        cts.CancelAfter(TimeSpan.FromSeconds(10));
        
        // 생산자 실행 (5개의 Task 등록)
        var producerTask = RunProducer(channel.Writer, cts.Token);
        
        // 소비자 실행
        var consumerTask = RunConsumer(channel.Reader, cts.Token);
        
        try
        {
            await Task.WhenAll(producerTask, consumerTask);
            Console.WriteLine("모든 작업이 정상적으로 완료되었습니다.");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("작업이 취소되었습니다.");
        }
    }
    
    static async Task RunProducer(ChannelWriter<int> writer, CancellationToken token)
    {
        var tasks = new List<Task>();
        var random = new Random();
        
        // 5개의 Task를 실행하여 데이터 생성
        for (int i = 0; i < 5; i++)
        {
            int producerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                for (int j = 0; j < 20; j++)
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(random.Next(100, 500), token);
                    int item = j * 100 + producerId;
                    await writer.WriteAsync(item, token);
                    Console.WriteLine($"생산자 {producerId}: 항목 {item} 생산");
                }
            }, token));
        }
        
        await Task.WhenAll(tasks);
        writer.Complete();
    }
    
    static async Task RunConsumer(ChannelReader<int> reader, CancellationToken token)
    {
        var tasks = new List<Task>();
        var random = new Random();
        
        // 5개의 소비자 Task 실행
        for (int i = 0; i < 5; i++)
        {
            int consumerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                while (await reader.WaitToReadAsync(token))
                {
                    while (reader.TryRead(out int item))
                    {
                        await Task.Delay(random.Next(200, 800), token);
                        Console.WriteLine($"소비자 {consumerId}: 항목 {item} 처리 완료");
                    }
                }
            }, token));
        }
        
        await Task.WhenAll(tasks);
    }
}

 

 

🔹 코드 설명

  1. 채널 생성 🏗️
    • Channel.CreateBounded<int>(new BoundedChannelOptions(20))을 사용해 용량 20인 채널을 생성합니다.
    • FullMode = BoundedChannelFullMode.Wait를 설정하여 채널이 가득 찼을 때 대기하도록 합니다.
  2. 생산자(Producer) 구현 🚀
    • RunProducer에서 5개의 Task를 생성하여 데이터를 채널에 추가합니다.
    • 각 생산자는 20개의 항목을 랜덤한 시간 간격을 두고 생성합니다.
  3. 소비자(Consumer) 구현 📦
    • RunConsumer에서 5개의 Task를 생성하여 데이터를 병렬로 읽어 처리합니다.
    • 데이터가 존재하면 TryRead를 통해 데이터를 읽어 처리합니다.
  4. 취소 및 예외 처리 🚦
    • CancellationTokenSource를 사용해 10초 후 자동 취소하도록 설정합니다.
    • OperationCanceledException을 처리하여 작업이 정상적으로 종료되도록 합니다.

결과 예시

생산자 1: 항목 101 생산
생산자 2: 항목 201 생산
소비자 3: 항목 101 처리 완료
소비자 1: 항목 201 처리 완료
...
모든 작업이 정상적으로 완료되었습니다.

이제 하나의 생산자가 5개의 Task를 등록하여 데이터를 생성하고 소비자가 5개의 Task를 동시에 처리하는 방식으로 개선되었습니다! 🎉


데드락은 두 개 이상의 프로세스가 서로 상대방이 점유하고 있는 자원을 기다리며 무한정 대기하는 상태를 말합니다. 즉, 각 프로세스가 자신이 필요로 하는 자원을 얻기 위해 다른 프로세스가 해제되기를 기다리는데, 다른 프로세스 역시 자원을 해제하지 못하고 있는 상황입니다. 이런 상황에서는 프로세스들이 계속 대기 상태에 빠지게 되어 더 이상 진행할 수 없게 됩니다.

 

1. 소스

using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    private static readonly object lockA = new object();
    private static readonly object lockB = new object();

    static async Task Main(string[] args)
    {
        var task1 = Task.Run(() => TaskA());
        var task2 = Task.Run(() => TaskB());

        await Task.WhenAll(task1, task2);

        Console.WriteLine("Tasks completed");
    }

    private static void TaskA()
    {
        lock (lockA)
        {
            Console.WriteLine("TaskA acquired lockA");
            Thread.Sleep(100); // Simulate work

            lock (lockB)
            {
                Console.WriteLine("TaskA acquired lockB");
                // Simulate work
            }
        }
    }

    private static void TaskB()
    {
        lock (lockB)
        {
            Console.WriteLine("TaskB acquired lockB");
            Thread.Sleep(100); // Simulate work

            lock (lockA)
            {
                Console.WriteLine("TaskB acquired lockA");
                // Simulate work
            }
        }
    }
}

 

2. 결과

TaskB acquired lockB
TaskA acquired lockA

 

3. 분석

이 코드는 두 개의 작업(TaskA와 TaskB)이 서로 다른 순서로 두 개의 잠금(lockA와 lockB)을 얻으려고 할 때 데드락(교착 상태)이 발생할 수 있음을 보여줍니다.

TaskA는 먼저 lockA를 얻은 다음 lockB를 얻으려고 합니다. 반면에 TaskB는 먼저 lockB를 얻은 다음 lockA를 얻으려고 합니다.

이렇게 서로 다른 순서로 잠금을 시도하기 때문에, TaskA는 lockA를 잡고 lockB를 기다리는 상황에 놓이고, 동시에 TaskB는 lockB를 잡고 lockA를 기다리는 상황에 놓입니다. 이로 인해 두 작업 모두 더 이상 진행할 수 없는 교착 상태에 빠지게 됩니다.

실제로 이 코드를 실행하면, 프로그램이 데드락에 빠져 "Tasks completed" 메시지가 출력되지 않고, 무한히 대기 상태에 빠져 있는 것을 볼 수 있습니다. 이를 통해 데드락 상황이 발생했음을 확인할 수 있습니다.

데드락을 피하기 위해서는 모든 스레드가 동일한 순서로 잠금을 얻도록 코드를 작성해야 합니다. 또는, Mutex와 같은 고급 동기화 메커니즘을 사용하는 것이 좋습니다.

+ Recent posts