Task изнутри: управление потоками в .NET и создание своих планировщиков

c5837716749eecadfa9eb63e25772251.png

Привет, Хабр! Сегодня поговорим о том, как работают задачи в .NET, зачем может понадобиться собственный TaskScheduler и как его реализовать.

Зачем нужен Task?

Начнём с основы. В .NET Task — это удобная абстракция над потоками. Если потоки Thread дают полный контроль над выполнением, то Task управляет ресурсами через пул потоков ThreadPool и помогает строить асинхронные операции.

Простой пример:

using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        Console.WriteLine($"Основной поток: {Thread.CurrentThread.ManagedThreadId}");

        await Task.Run(async () =>
        {
            Console.WriteLine($"Task выполняется на потоке: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(1000); // имитируем работу
        });

        Console.WriteLine("Task завершён.");
    }
}

Основная задача выполняется на одном потоке, а Task.Run запускает дополнительный поток из пула потоков.

Почему это удобно: не нужно вручную управлять потоками, тратить ресурсы на их создание/уничтожение или заботиться о масштабировании.

Но вот загвоздка: иногда нужно больше контроля над задачами. Например:

  • Ограничить число одновременно выполняемых задач.

  • Установить приоритет выполнения.

  • Выполнить задачи в специфическом контексте (например, в UI-потоке).

Для этого .NET позволяет создавать свои TaskScheduler.

Что делает TaskScheduler?

TaskScheduler управляет распределением задач между потоками. По умолчанию используется TaskScheduler.Default, который отправляет задачи в ThreadPool. Однако:

  • Если нужно ограничить количество одновременных задач, стандартный планировщик не подойдёт.

  • Если требуется приоритет задач, придётся создавать кастомный TaskScheduler.

Создаем свой TaskScheduler

Ограничение параллелизма

Планировщик с ограничением одновременных задач:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler, IDisposable
{
    private readonly int _maxDegreeOfParallelism;
    private readonly LinkedList _tasks = new LinkedList();
    private int _runningTasks = 0;
    private readonly object _lock = new object();
    private bool _disposed = false;

    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1)
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    protected override IEnumerable GetScheduledTasks()
    {
        lock (_lock)
        {
            return _tasks.ToArray();
        }
    }

    protected override void QueueTask(Task task)
    {
        if (_disposed)
            throw new ObjectDisposedException(nameof(LimitedConcurrencyLevelTaskScheduler));

        lock (_lock)
        {
            _tasks.AddLast(task);
            TryExecuteTask();
        }
    }

    private void TryExecuteTask()
    {
        if (_runningTasks >= _maxDegreeOfParallelism || _tasks.Count == 0)
            return;

        var task = _tasks.First.Value;
        _tasks.RemoveFirst();
        _runningTasks++;

        ThreadPool.UnsafeQueueUserWorkItem(async _ =>
        {
            try
            {
                await Task.Yield();
                base.TryExecuteTask(task);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Ошибка при выполнении задачи: {ex}");
            }
            finally
            {
                lock (_lock)
                {
                    _runningTasks--;
                    TryExecuteTask();
                }
            }
        }, null);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;

    public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism;

    public void Dispose()
    {
        if (!_disposed)
        {
            lock (_lock)
            {
                _tasks.Clear();
                _disposed = true;
            }
        }
    }
}

В коде:

Очередь задач (_tasks): задачи добавляются в очередь, если уже выполняется максимально разрешённое количество задач.

Управление состоянием (_runningTasks): счетчик текущих задач увеличивается при начале выполнения и уменьшается при завершении.

Асинхронность: используется Task.Yield(), чтобы задачи не блокировали поток.

Приоритетизация задач

Добавим возможность управлять приоритетами. Высокоприоритетные задачи должны выполняться первыми.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class PriorityTaskScheduler : TaskScheduler
{
    private readonly int _maxDegreeOfParallelism;
    private readonly LinkedList _highPriorityTasks = new LinkedList();
    private readonly LinkedList _lowPriorityTasks = new LinkedList();
    private int _runningTasks = 0;
    private readonly object _lock = new object();

    public PriorityTaskScheduler(int maxDegreeOfParallelism)
    {
        _maxDegreeOfParallelism = maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : throw new ArgumentOutOfRangeException();
    }

    public void Enqueue(Task task, bool highPriority)
    {
        lock (_lock)
        {
            if (highPriority)
                _highPriorityTasks.AddLast(task);
            else
                _lowPriorityTasks.AddLast(task);

            TryExecuteTask();
        }
    }

    private void TryExecuteTask()
    {
        if (_runningTasks >= _maxDegreeOfParallelism)
            return;

        Task task = null;
        lock (_lock)
        {
            if (_highPriorityTasks.Count > 0)
            {
                task = _highPriorityTasks.First.Value;
                _highPriorityTasks.RemoveFirst();
            }
            else if (_lowPriorityTasks.Count > 0)
            {
                task = _lowPriorityTasks.First.Value;
                _lowPriorityTasks.RemoveFirst();
            }

            if (task == null) return;
            _runningTasks++;
        }

        ThreadPool.UnsafeQueueUserWorkItem(async _ =>
        {
            try
            {
                await Task.Yield();
                base.TryExecuteTask(task);
            }
            finally
            {
                lock (_lock)
                {
                    _runningTasks--;
                    TryExecuteTask();
                }
            }
        }, null);
    }

    protected override IEnumerable GetScheduledTasks() => throw new NotSupportedException();

    protected override void QueueTask(Task task) => Enqueue(task, highPriority: false);

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
}

Теперь задачи с высоким приоритетом помещаются в отдельную очередь. Если высокоприоритетных задач нет, выполняются задачи из низкоприоритетной очереди.

Использование:

var scheduler = new PriorityTaskScheduler(2);

scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("Low priority"), scheduler), highPriority: false);
scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("High priority"), scheduler), highPriority: true);

Тестирование TaskScheduler

Тестирование кастомного TaskScheduler — это этап, который поможет убедиться в корректности работы. Для этого может быть достаточно простых юнит-тестов. Например, протестируем, что LimitedConcurrencyLevelTaskScheduler правильно ограничивает число одновременно выполняемых задач.

[Fact]
public async Task SchedulerLimitsConcurrency()
{
    // Создаём планировщик, ограничивающий выполнение до 2 задач одновременно
    var scheduler = new LimitedConcurrencyLevelTaskScheduler(2);
    var taskFactory = new TaskFactory(scheduler);
    int runningTasks = 0;

    var tasks = Enumerable.Range(0, 10).Select(async _ =>
    {
        Interlocked.Increment(ref runningTasks);
        Assert.True(runningTasks <= 2, "Превышен лимит параллелизма");
        await Task.Delay(100); // Симуляция работы
        Interlocked.Decrement(ref runningTasks);
    });

    await Task.WhenAll(tasks);
}

Счетчик runningTasks увеличивается при запуске задачи и уменьшается при завершении. Через Assert проверяем, что одновременно выполняется не более 2 задач.

Итоги

Создание собственного TaskScheduler необходимо в случаях:

  • Если стандартный TaskScheduler не справляется с ограничением задач.

  • Требуется управление приоритетами.

  • Нужен специфический контекст выполнения (UI, ограниченный пул потоков).

В завершение скажу пару слов об открытых уроках по ASP.NET, которые в ближайшее время пройдут в OTUS:

  • 28 ноября: «SignalR в ASP.NET Core приложениях». Подробнее

  • 11 декабря: «Мониторинг работоспособности ASP.NET Core приложений». Подробнее

© Habrahabr.ru