Saya menerapkan produsen-konsumen di aplikasi inti asp.net menggunakan layanan yang dihosting. Saya bisa membuatnya bekerja ke titik di mana konsumen memproses item dari _recordProcessingChannel.ReadAllAsync() secara sinkron.

Saya mencoba membagi hasil _recordProcessingChannel.ReadAllAsync() menjadi beberapa tugas paralel. Misalnya: Saya memiliki 10.000 item yang dibaca dari saluran dan saya ingin membagi pekerjaan ini menjadi 4 tugas terpisah dan memproses 2500 item per ICMService.

Konsumen:

await foreach (var record in _recordProcessingChannel.ReadAllAsync())
{

    using var scope = _serviceProvider.CreateScope();
    var processor = scope.ServiceProvider.GetRequiredService<ICMService>();

    processor.UploadRecord(record);
                
}

Pembaca:

public IAsyncEnumerable<RecordData> ReadAllAsync(CancellationToken ct = default) => _channel.Reader.ReadAllAsync(ct);

Terima kasih sebelumnya atas bantuan yang diberikan

2
Kris Bog 7 Juli 2020, 11:38

1 menjawab

Jawaban Terbaik

Anda dapat memulai sejumlah tugas pemrosesan yang diperlukan dan menggunakan BlockingCollection agar enqueue berfungsi. Sesuatu seperti ini:

// my dummy async enumerable
public async IAsyncEnumerable<int> ReadAllAsync()
{
    for (int i = 0; i < 3; i++)
    {
        yield return i*3 + 1;
        yield return i*3 + 2;
        yield return i*3 + 3;
        await Task.Delay(200);
    }
    yield return 777;
}
var collection = new BlockingCollection<int>();
// start "processors"
var tasks = Enumerable.Range(0, 4)
    .Select(i => 
        Task.Run(() =>
        {
            while (!collection.IsCompleted)
            {           
                int? data = null;
                try
                {
                    data = collection.Take();
                }
                catch (InvalidOperationException) { }

                if (data != null)
                {
                    // simulate processing 
                    Thread.Sleep(400);
                    Console.WriteLine(data.Value);
                }
            }
            Console.WriteLine("No more items to take.");
        }))
    .ToArray();

await foreach (var record in ReadAllAsync())
{
    collection.Add(record);
}
collection.CompleteAdding(); // signal that enqueuing has finished

await Task.WhenAll(tasks);

Ini dapat ditingkatkan dengan memperkenalkan beberapa pensinyalan asinkron (dengan SemaphoreSlim.WaitAsync atau AsyncManualResetEvent.WaitAsync< /a> misalnya) sehingga utas konsumen tidak akan menggunakan CPU saat menunggu item baru. Sebagai contoh:

var collection = new ConcurrentQueue<int>();
var semaphore = new SemaphoreSlim(0, 4);
var cts = new CancellationTokenSource(); // to signal that queueing is completed
var tasks = Enumerable.Range(0, 4)
    .Select(i => 
        Task.Run(async () =>
        {
            while (true)
            {
                if (cts.Token.IsCancellationRequested && !collection.Any())
                {
                    Console.WriteLine("No more items to take.");
                    break;
                }
                else if (!cts.Token.IsCancellationRequested)
                {
                    try
                    {
                        await semaphore.WaitAsync(cts.Token);
                    }
                    catch (OperationCanceledException)
                    {
                        //ignore
                    }
                }

                if(collection.TryDequeue(out var data))
                {
                    //simulate work
                    Thread.Sleep(400);
                    Console.WriteLine(data);
                }                   
            }
        }))
    .ToArray();

await foreach (var record in ReadAllAsync())
{
    collection.Enqueue(record);
    semaphore.Release();
}
cts.Cancel(); // addition completed.
await Task.WhenAll(tasks);
Console.WriteLine("end");
1
Guru Stron 7 Juli 2020, 13:15