Limitación de tareas asíncronas


Me gustaría ejecutar un montón de tareas asincrónicas, con un límite en cuántas tareas pueden estar pendientes de finalización en un momento dado.

Digamos que tiene 1000 URLs, y solo desea tener 50 solicitudes abiertas a la vez; pero tan pronto como una solicitud se completa, abre una conexión a la siguiente URL de la lista. De esa manera, siempre hay exactamente 50 conexiones abiertas a la vez, hasta que la lista de URL se agote.

También quiero utilizar un número dado de hilos si posible.

, se me ocurrió un método de extensión, ThrottleTasksAsync que hace lo que quiero. ¿Existe ya una solución más sencilla? Yo asumiría que este es un escenario común.

Uso:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Aquí está el código:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

El método utiliza BlockingCollection y SemaphoreSlim para hacerlo funcionar. El throttler se ejecuta en un subproceso, y todas las tareas asincrónicas se ejecutan en el otro subproceso. Para lograr paralelismo, agregué un parámetro MaxDegreeOfParallelism que se pasa a un bucle Parallel.ForEach re-propuesto como un bucle while.

La versión antigua era:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

Pero, el grupo de subprocesos se agota rápidamente, y no se puede hacer async/await.

Bono: Para evitar el problema en BlockingCollection donde se lanza una excepción en Take() cuando se llama CompleteAdding(), estoy usando la sobrecarga TryTake con un tiempo de espera. Si no utilizo el tiempo de espera en TryTake, frustraría el propósito de usar un BlockingCollection ya que TryTake no se bloqueará. Hay una manera mejor? Idealmente, habría un TakeAsync método.

Author: Josh Wyant, 2014-03-19

3 answers

Como se sugiere, use Flujo de datos TPL.

A TransformBlock<TInput, TOutput> puede ser lo que estás buscando.

Se define un MaxDegreeOfParallelism para limitar cuántas cadenas se pueden transformar (es decir, cuántas URL se pueden descargar) en paralelo. Luego publicas URL en el bloque, y cuando termines le dices al bloque que has terminado de agregar elementos y recuperas las respuestas.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Nota: El TransformBlock almacena en búfer tanto su entrada como su salida. ¿Por qué, entonces, necesitamos vincularlo a un BufferBlock?

Porque el TransformBlock no se completará hasta que todos los elementos (HttpResponse) se hayan consumido, y await downloader.Completion se colgará. En su lugar, dejamos que el downloader reenvíe toda su salida a un bloque de búfer dedicado, luego esperamos que el downloader se complete e inspeccionamos el bloque de búfer.

 50
Author: dcastro,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-02-10 19:42:31

Digamos que tienes 1000 URLs, y solo quieres tener 50 solicitudes abiertas en una vez; pero tan pronto como se completa una solicitud, se abre una conexión a la siguiente URL de la lista. De esa manera, siempre hay exactamente 50 las conexiones se abren a la vez, hasta que la lista de URL se agota.

La siguiente solución simple ha surgido muchas veces aquí en SO. No usa código de bloqueo y no crea hilos explícitamente, por lo que escala muy bien:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

La cosa es, el procesamiento de los datos descargados debe hacerse en una tubería diferente, con un nivel de paralelismo diferente, especialmente si se trata de un procesamiento vinculado a la CPU.

Por ejemplo, es probable que desee tener 4 subprocesos simultáneamente haciendo el procesamiento de datos (el número de núcleos de CPU), y hasta 50 solicitudes pendientes de más datos (que no utilizan subprocesos en absoluto). AFAICT, esto no es lo que su código está haciendo actualmente.

Ahí es donde TPL Dataflow o Rx pueden sea útil como una solución preferida. Sin embargo, es ciertamente posible implementar algo como esto con TPL simple. Nota, el único código de bloqueo aquí es el que hace el procesamiento de datos real dentro de Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
 34
Author: noseratio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-10 19:39:25

Como se solicitó, aquí está el código con el que terminé yendo.

El trabajo se configura en una configuración master-detail, y cada master se procesa como un lote. Cada unidad de trabajo está en cola de esta manera:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Los maestros se almacenan uno a la vez para guardar el trabajo para otros procesos externos. Los detalles de cada maestro se envían para el trabajo a través de la masterTransform TransformManyBlock. También se crea un BatchedJoinBlock para recopilar los detalles en un lote.

El trabajo real se realiza en el detailTransform TransformBlock, asincrónicamente, 150 a la vez. BoundedCapacity se establece en 300 para asegurar que demasiados Maestros no se almacenen en búfer al principio de la cadena, mientras que también deja espacio para que se pongan en cola suficientes registros de detalle para permitir que se procesen 150 registros a la vez. El bloque envía un object a sus destinos, porque se filtra a través de los enlaces dependiendo de si se trata de un Detail o Exception.

El batchAction ActionBlock recopila la salida de todos los lotes y realiza actualizaciones masivas de la base de datos, error registro, etc. para cada lote.

Habrá varios BatchedJoinBlock s, uno para cada maestro. Dado que cada ISourceBlock se genera secuencialmente y cada lote solo acepta el número de registros de detalles asociados con un maestro, los lotes se procesarán en orden. Cada bloque solo genera un grupo y se desvincula al completarse. Solo el último bloque de lotes propaga su finalización al ActionBlock final.

La red de flujo de datos:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
 3
Author: Josh Wyant,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-04-08 19:31:07