Task изнутри: управление потоками в .NET и создание своих планировщиков
Привет, Хабр! Сегодня поговорим о том, как работают задачи в .NET, зачем может понадобиться собственный TaskScheduler
и как его реализовать.
Зачем нужен Task?
Начнём с основы. В .NET Task
— это удобная абстракция над потоками. Если потоки Thread
дают полный контроль над выполнением, то Task
управляет ресурсами через пул потоков ThreadPool
и помогает строить асинхронные операции.
Простой пример:
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
Console.WriteLine($"Основной поток: {Thread.CurrentThread.ManagedThreadId}");
await Task.Run(async () =>
{
Console.WriteLine($"Task выполняется на потоке: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000); // имитируем работу
});
Console.WriteLine("Task завершён.");
}
}
Основная задача выполняется на одном потоке, а Task.Run
запускает дополнительный поток из пула потоков.
Почему это удобно: не нужно вручную управлять потоками, тратить ресурсы на их создание/уничтожение или заботиться о масштабировании.
Но вот загвоздка: иногда нужно больше контроля над задачами. Например:
Ограничить число одновременно выполняемых задач.
Установить приоритет выполнения.
Выполнить задачи в специфическом контексте (например, в UI-потоке).
Для этого .NET позволяет создавать свои TaskScheduler
.
Что делает TaskScheduler?
TaskScheduler
управляет распределением задач между потоками. По умолчанию используется TaskScheduler.Default
, который отправляет задачи в ThreadPool
. Однако:
Если нужно ограничить количество одновременных задач, стандартный планировщик не подойдёт.
Если требуется приоритет задач, придётся создавать кастомный
TaskScheduler
.
Создаем свой TaskScheduler
Ограничение параллелизма
Планировщик с ограничением одновременных задач:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler, IDisposable
{
private readonly int _maxDegreeOfParallelism;
private readonly LinkedList _tasks = new LinkedList();
private int _runningTasks = 0;
private readonly object _lock = new object();
private bool _disposed = false;
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected override IEnumerable GetScheduledTasks()
{
lock (_lock)
{
return _tasks.ToArray();
}
}
protected override void QueueTask(Task task)
{
if (_disposed)
throw new ObjectDisposedException(nameof(LimitedConcurrencyLevelTaskScheduler));
lock (_lock)
{
_tasks.AddLast(task);
TryExecuteTask();
}
}
private void TryExecuteTask()
{
if (_runningTasks >= _maxDegreeOfParallelism || _tasks.Count == 0)
return;
var task = _tasks.First.Value;
_tasks.RemoveFirst();
_runningTasks++;
ThreadPool.UnsafeQueueUserWorkItem(async _ =>
{
try
{
await Task.Yield();
base.TryExecuteTask(task);
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка при выполнении задачи: {ex}");
}
finally
{
lock (_lock)
{
_runningTasks--;
TryExecuteTask();
}
}
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
public void Dispose()
{
if (!_disposed)
{
lock (_lock)
{
_tasks.Clear();
_disposed = true;
}
}
}
}
В коде:
Очередь задач (_tasks
): задачи добавляются в очередь, если уже выполняется максимально разрешённое количество задач.
Управление состоянием (_runningTasks
): счетчик текущих задач увеличивается при начале выполнения и уменьшается при завершении.
Асинхронность: используется Task.Yield()
, чтобы задачи не блокировали поток.
Приоритетизация задач
Добавим возможность управлять приоритетами. Высокоприоритетные задачи должны выполняться первыми.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class PriorityTaskScheduler : TaskScheduler
{
private readonly int _maxDegreeOfParallelism;
private readonly LinkedList _highPriorityTasks = new LinkedList();
private readonly LinkedList _lowPriorityTasks = new LinkedList();
private int _runningTasks = 0;
private readonly object _lock = new object();
public PriorityTaskScheduler(int maxDegreeOfParallelism)
{
_maxDegreeOfParallelism = maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : throw new ArgumentOutOfRangeException();
}
public void Enqueue(Task task, bool highPriority)
{
lock (_lock)
{
if (highPriority)
_highPriorityTasks.AddLast(task);
else
_lowPriorityTasks.AddLast(task);
TryExecuteTask();
}
}
private void TryExecuteTask()
{
if (_runningTasks >= _maxDegreeOfParallelism)
return;
Task task = null;
lock (_lock)
{
if (_highPriorityTasks.Count > 0)
{
task = _highPriorityTasks.First.Value;
_highPriorityTasks.RemoveFirst();
}
else if (_lowPriorityTasks.Count > 0)
{
task = _lowPriorityTasks.First.Value;
_lowPriorityTasks.RemoveFirst();
}
if (task == null) return;
_runningTasks++;
}
ThreadPool.UnsafeQueueUserWorkItem(async _ =>
{
try
{
await Task.Yield();
base.TryExecuteTask(task);
}
finally
{
lock (_lock)
{
_runningTasks--;
TryExecuteTask();
}
}
}, null);
}
protected override IEnumerable GetScheduledTasks() => throw new NotSupportedException();
protected override void QueueTask(Task task) => Enqueue(task, highPriority: false);
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
}
Теперь задачи с высоким приоритетом помещаются в отдельную очередь. Если высокоприоритетных задач нет, выполняются задачи из низкоприоритетной очереди.
Использование:
var scheduler = new PriorityTaskScheduler(2);
scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("Low priority"), scheduler), highPriority: false);
scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("High priority"), scheduler), highPriority: true);
Тестирование TaskScheduler
Тестирование кастомного TaskScheduler
— это этап, который поможет убедиться в корректности работы. Для этого может быть достаточно простых юнит-тестов. Например, протестируем, что LimitedConcurrencyLevelTaskScheduler
правильно ограничивает число одновременно выполняемых задач.
[Fact]
public async Task SchedulerLimitsConcurrency()
{
// Создаём планировщик, ограничивающий выполнение до 2 задач одновременно
var scheduler = new LimitedConcurrencyLevelTaskScheduler(2);
var taskFactory = new TaskFactory(scheduler);
int runningTasks = 0;
var tasks = Enumerable.Range(0, 10).Select(async _ =>
{
Interlocked.Increment(ref runningTasks);
Assert.True(runningTasks <= 2, "Превышен лимит параллелизма");
await Task.Delay(100); // Симуляция работы
Interlocked.Decrement(ref runningTasks);
});
await Task.WhenAll(tasks);
}
Счетчик runningTasks
увеличивается при запуске задачи и уменьшается при завершении. Через Assert
проверяем, что одновременно выполняется не более 2 задач.
Итоги
Создание собственного TaskScheduler
необходимо в случаях:
Если стандартный
TaskScheduler
не справляется с ограничением задач.Требуется управление приоритетами.
Нужен специфический контекст выполнения (UI, ограниченный пул потоков).
В завершение скажу пару слов об открытых уроках по ASP.NET, которые в ближайшее время пройдут в OTUS:
28 ноября: «SignalR в ASP.NET Core приложениях». Подробнее
11 декабря: «Мониторинг работоспособности ASP.NET Core приложений». Подробнее