C# 8 – Асинхронные потоки

C# logoC# 8 принес ряд очень интересных нововведений. Одним из них является поддержка асинхронных потоков (async streams). Давайте посмотрим что это такое, а также разберемся как они создаются и используются.

Асинхронные потоки требуют поддержки не только со стороны C#, но и платформы. Поэтому их использование возможно при наличии поддержки .NET Standard 2.1 и выше. На данный момент это только .NET Core начиная с версии 3.0.

Асинхронные потоки

В C# уже достаточно давно существует интерфейс IEnumerable<T>, который позволяет перебрать последовательность элементов заданного типа T. При этом каждый элемент может быть как сгенерирован заранее, так и создан в момент обращения к нему.

До C# 8 получение очередного элемента могло быть только синхронной операцией. Но не всегда такой подход является оптимальным. Например, если данные должны быть получены от внешнего сервиса или устройства, то лучше использовать асинхронный код.

C# 8 появился новый интерфейс IAsyncEnumerable<T>, который представляет последовательность элементов, каждое значение которой может быть получено асинхронно, и называется асинхронным потоком.

Объявление интерфейса похоже на IEnumerable<T>, но c поправкой на асинхронность:

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken=default);
}

Метод GetAsyncEnumerator(…) создает экземпляр IAsyncEnumerator<T>, который отвечает за создание или получение очередного элемента асинхронного потока.

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }

    ValueTask<bool> MoveNextAsync();
}

Свойство Current содержит последнее полученное значение. MoveNextAsync(…) осуществляет переход к следующему элементу. Если перечисление завершено, то метод вернет значение false.

Стоит обратить внимание, что MoveNextAsync(…) возвращает ValueTask<T>. Данный тип, аналог класса Task<T>, является структурой, а значит её, значение будет расположено в стеке. Это позволяет уменьшить расход памяти при асинхронном переборе больших или даже условно-бесконечных последовательности.

Последний из рассматриваемых интерфейсов IAsyncDisposable позволяет, при необходимости, освободить ресурсы по завершению перебора асинхронного потока.

public interface IAsyncDisposable
{
    ValueTask DisposeAsync();
}

Стоит отметить, что DisposeAsync(…) будет вызван даже в случае выброса исключения (включая срабатывание CancellationToken).

Если посмотреть объявления IEnumerable<T>, IEnumerator<T> и IDisposable, то аналогии будут более чем очевидны.

Использование асинхронного потока

Для перебора значений асинхронного потока используется конструкция foreach c добавлением ключевого слова await:

await foreach (T [переменная] in [асинхронный поток])
{
    …
}

, где T – это тип значений. Например:

await foreach (string item in source)
    Console.WriteLine(item);

Дополнительно можно:

  • задать токен отмены операции (cancellation token) при помощи вызова метода WithCancellation(…);
  • настроить работу с контекстом при помощи обращения к ConfigureAwait(…).

Метод WithCancellation(…) является унифицированным способом передачи токена отмены операции в асинхронный поток. Он позволяет передать заданное значение в IAsyncEnumerable<T>.GetAsyncEnumerator(…). При этом реализация IAsyncEnumerable<T> может быть создана любым способом, рассмотренным ниже, за исключением "утиной типизации" (duck typing).

Добавим вызовы рассматриваемых методов в пример, который был приведен выше:

await foreach (string item in source.WithCancellation(ct).ConfigureAwait(false))
    Console.WriteLine(item);

Способы реализации асинхронного потока

Асинхронный метод

Самый простой вариант реализации асинхронных потоков – создание асинхронного метода, использующего yield return для возврата значений. По сути, это аналог yield return в синхронном методе.

Метод, который реализует асинхронный поток, должен соблюдать следующие правила:

  • Объявлен с использованием модификатора async.
  • Возвращать IAsyncEnumerable<T>.
  • Содержать yield return для возврата очередного значения.

При компиляции, аналогично IEnumerable<T>, будет автоматически сгенерировал класс, реализующий IAsyncEnumerable<T>.  Здесь стоит отметить два важных момента.

  1. Автоматическая реализация IAsyncDisposable будет пустой. Поэтому, при работе с неуправляемыми ресурсами необходимо вовремя и корректно их освобождать.
  2. Очень желательно поддержать использование метода WithCancellation(…) для создаваемой реализации. Для этого в созданный метод нужно добавить параметр типа CancellationToken c значением по умолчанию default и атрибутом [EnumeratorCancellation]. При выполнении кода в него будет передано значение токена отмены операции из вызова WithCancellation(…).

Описанный вариант поддержки CancellationToken не является единственным или строго обязательным. Но его использование является хорошей практикой при работе с IAsyncEnumerable<T>, т.к. позволяет использовать метод WithCancellation(…) для передачи токена вне зависимости от способа реализации самого асинхронного потока. Более того, код может просто не знать, что за реализация скрывается за IAsyncEnumerable<T>.

Перейдем к примеру. Предположим есть метод GetResutsAsync(…) позволяющий получить данные от некоего сервиса, который представлен методом GetServiceDataAsync(…). Задача – асинхронно перебрать последовательность из 10 результатов.

public async IAsyncEnumerable<int> GetResultsAsync(CancellationToken ct = default)
{
    for (int i = 0; i < 10; i++) {
        int result = await GetServiceDataAsync(ct)
            .ConfigureAwait(false);

        yield return result;
    }
}

// Эмулируем асинхронное получение данных
private async Task<int> GetServiceDataAsync(CancellationToken ct)
{
    ct.ThrowIfCancellationRequested();

    await Task.Delay(100, ct).ConfigureAwait(false);
    return new Random().Next(100);
}

…

// Пример использования метода GetResultsAsync(…)
await foreach (int r in GetResultsAsync().WithCancellation(cancellationToken).ConfigureAwait(false))
    Console.WriteLine(r);

В реальном приложении, вместо GetServiceDataAsync(…) может быть метод, считывающий данные с диска, получающий их от некоего API и т.д.

Реализация интерфейса IAsyncEnumerable<T>

Другим походом создания асинхронного потока является непосредственная реализация IAsyncEnumerable<T>. Этот вариант удобен при работе с неуправляемыми ресурсами, т.к. позволяет явно реализовать IAsyncDisposable для их освобождения. При этом DisposeAsync(…) будет вызван как при успешном завершении перечисления, так и в при выбросе исключения (включая OperationCanceledException).

CancellationToken из метода WithCancellation(…) будет передан в вызов IAsyncEnumerable<T>.GetAsyncEnumerator(..).

Перепишем код из первого примера:

public class CustomAsyncDataProvider : IAsyncEnumerable<int>
{
    public IAsyncEnumerator<int> GetAsyncEnumerator(CancellationToken cancellationToken)
    {
        return new CustomAsyncDataEnumerator(cancellationToken);
    }

    private class CustomAsyncDataEnumerator : IAsyncEnumerator<int>
    {
        private readonly CancellationToken _ct;
        private readonly Random _rnd;
        private int _index;

        public int Current { get; private set; }

        public CustomAsyncDataEnumerator(CancellationToken ct)
        {
            this._rnd = new Random();
            this.Current = this._rnd.Next(100);
            this._ct = ct;
            this._index = 0;
        }

        public async ValueTask<bool> MoveNextAsync()
        {
            if (10 < this._index)
                return false;

            this._ct.ThrowIfCancellationRequested();

            // Эмулируем асинхронное получение значения
            await Task.Delay(200, this._ct).ConfigureAwait(false);
            this.Current = this._rnd.Next(100);

            this._index++;

            return true;
        }

        public ValueTask DisposeAsync()
        {
            Console.WriteLine("CustomAsyncDataEnumerator > DisposeAsync");
            return new ValueTask(Task.CompletedTask);
        }
    }
}

Поскольку в данном примере нет неуправляемых ресурсов, то DisposeAsync(…) просто выводит сообщение в консоль, чтобы было можно убедиться в обращении к данному методу.

Использование данного класса аналогично прядущему варианту:

var aStream = new CustomAsyncDataProvider();

await foreach (int value in aStream.WithCancellation(token).ConfigureAwait(false))
    Console.WriteLine($"Value: {value:D3}");

Утиная типизация (duck typing)

Асинхронный вариант foreach, как и синхронный, поддерживает "утиную типизацию" (duck typing). Это означает, что может работать с любым классом, у которого

  • есть реализованный GetAsyncEnumerator() без параметров и возвращающий произвольный тип, который в свою очередь содержит:
    • свойство Current;
    • метод ValueTask<bool> MoveNextAsync();
    • опциональный метод ValueTask DisposeAsync() для освобождения ресурсов при завершении перечисления или выбросе исключения.

От способов реализации интерфейса, рассмотренных выше, есть следующие принципиальные отличия:

  • Экземпляр такого класса не реализует IAsyncEnumerable<T>, а значит
    • не может быть передан в качестве параметра указанного типа в какой-либо метод.
    • не может использоваться совместно с WithCancellation(…) и ConfigureAwait(…).
  • GetAsyncEnumerator() должен быть без параметров.
  • Тип результата определяется типом свойства Current.

Еще раз перепишем пример, но теперь уже с использованием “утиной типизации”. Для передачи токена отмены будет использоваться конструктор класса.

public class DuckTypingAsyncDataProvider
{
    private readonly CancellationToken _ct;

    public DuckTypingAsyncDataProvider(CancellationToken ct)
    {
        this._ct = ct;
    }

    public CustomAsyncDataEnumerator GetAsyncEnumerator()
    {
        return new CustomAsyncDataEnumerator(this._ct);
    }
}

public class CustomAsyncDataEnumerator
{
    private readonly CancellationToken _ct;
    private readonly Random _rnd;
    private int _index;

    public int Current { get; private set; }

    public CustomAsyncDataEnumerator(CancellationToken ct)
    {
        this._rnd = new Random();
        this.Current = this._rnd.Next(100);
        this._ct = ct;
        this._index = 0;
    }

    public async ValueTask<bool> MoveNextAsync()
    {
        if (10 < this._index)
            return false;

        this._ct.ThrowIfCancellationRequested();

        // Эмулируем асинхронное получение значения
        await Task.Delay(200, this._ct).ConfigureAwait(false);
        this.Current = this._rnd.Next(100);

        this._index++;

        return true;
    }

    public ValueTask DisposeAsync()
    {
        Console.WriteLine("CustomAsyncDataEnumerator > DisposeAsync");
        return new ValueTask(Task.CompletedTask);
    }
}

Использование созданного класса в качестве асинхронного потока аналогично предыдущим вариантам, за исключением возможности использовать дополнительные методы:

var duckTypingAsyncStream = new DuckTypingAsyncDataProvider(token);

await foreach (int value in duckTypingAsyncStream)
    Console.WriteLine($"Value: {value:D3}");

Добавить комментарий