Andrey on .NET | Использование асинхронных потоков на практике

Использование асинхронных потоков на практике

C# logoОдна из новых возможностей C# 8 – асинхронные потоки. Рассмотрим на примере как её использование может улучшить уже существующий код.

Для примера возьмем следующую достаточно типовую задачу: необходимо считывать и заданным образом обрабатывать данные. Принципиальная деталь – хранилище данных возвращает записи постранично (как, например, это делают некоторые хранилища в Azure).

Хранилище данных

Чтобы не делать пример зависимым от какого-либо конкретного хранилища данных, создадим класс PagedDataSet, эмулирующий описанное выше поведение. Он будет возвращать до 5 страниц, каждая их которых будет содержать по 3 записи:

public class PagedDataSet
{
    private const int _totalPages = 5;
    private const int _pageSize = 3;

    private int _currentPage = 0;

    public PagedDataSet(string filter)
    {
    }

    public bool HasMoreResults => this._currentPage + 1 <= _totalPages;

    public async Task<IReadOnlyCollection<int>> GetNextPageAsync()
    {
        if (!this.HasData)
            return new int[0];

        IEnumerable<int> result = Enumerable.Range(this._currentPage * _pageSize + 1, _pageSize);
        await Task.Delay(100); // эмулируем асинхронную работу
        this._currentPage++;

        Console.WriteLine($"Загружена страница {this._currentPage}.");

        return result.ToArray();
    }
}

Параметр filter здесь добавлен в конструктор чтобы показать, что данные выбираются по определенному условию. Однако, для простоты примера, реализация фильтра и его значения в вызовах отсутствуют.

Будем считать, что данный код находиться в сторонней библиотеке (например, в SDK) и возможности его изменить нет.

Обычная реализация

Прежде всего стоит отметить, что постраничное чтение данных это деталь реализации самого хранилища данных. А значит, эту подробность лучше оставить в слое доступа к данным.  Поэтому начнем реализацию с класса доступа к данным (DAL):

public class ValuesDataAccess
{
    public async Task<IReadOnlyCollection<int>> GetAsync(string filter)
    {
        var dataSource = new PagedDataSet(filter);
        var result = new List<int>();

        while (dataSource.HasMoreResults)
            result.AddRange(await dataSource.GetNextPageAsync());

        return result;
    }
}

Здесь все просто – метод GetAsync(…) считывает из хранилища все данные, удовлетворяющие заданному фильтру, сохраняет их в виде списка и передает вызывающий стороне.

Теперь перейдем к обработчику данных:

public class DataProcessor
{
    public async Task ProcessAsync()
    {
        int counter = 0;
        int maxItems = 4;

        var dataAccess = new ValuesDataAccess();

        foreach (int x in await dataAccess.GetAsync(filter: "").ConfigureAwait(false)) {
            Console.WriteLine($"Обработка записи {x}.");
            counter++;

            if (maxItems <= counter) {
                Console.WriteLine("Завершение обработки...");
                break;
            }
        }
    }
}

Метод ProcessAnyc(…) эмулирует обработку полученных данных. При этом он остановит её, если число обработанных записей будет больше maxItems.

Если запустить пример, то в консоль будет выведен следующий результат:

Загружена страница 1.
Загружена страница 2.
Загружена страница 3.
Загружена страница 4.
Загружена страница 5.
Обработка записи 1.
Обработка записи 2.
Обработка записи 3.
Обработка записи 4.
Завершение обработки...

Как и ожидалось, данная реализация сначала загрузила все необходимые данные из хранилища (5 страниц). После чего начала обработку, завершим ее после 4 записи. Можно заметить, что достаточно бы было загрузить всего 2 страницы.

Разумеется, данные код можно оптимизировать, сделав и обработку данных также постраничной. Однако, как было отмечено выше, это приведет к тому, что реализация бизнес логики будет зависеть от деталей реализации конкретного хранилища данных.

Реализация с асинхронными потоками

Использование асинхронных потоков позволяет улучшить код, не раскрывая детали реализации хранилища. Начнем изменения с слоя данных. Вместо перепишем класс ValuesDataAccess так, чтобы метод GetAsync(…) вместо коллекции возвращал асинхронный поток:

public class ValuesDataAccess
{
    public async IAsyncEnumerable<int> GetAsync(string filter)
    {
        var dataSource = new PagedDataSet(filter);

        while (dataSource.HasMoreResults) {
            var pageData = await dataSource.GetNextPageAsync();

            foreach (int value in pageData)
                yield return value;
        }
    }
}

Теперь GetAsync(…) будет читать по одной странице с записями и отдавать данные с нее по одному значению. И только после того, как данные на текущей странице закончатся, будет прочитана следующая страница.

Так же необходимо внести минимальные изменения в класс DataProcessor для работы с асинхронным потоком:

public class DataProcessor
{
    public async Task ProcessAsync()
    {
        int counter = 0;
        int maxItems = 4;

        var dataAccess = new ValuesDataAccess();

        await foreach (int x in dataAccess.GetAsync(filter: "").ConfigureAwait(false)) {
            Console.WriteLine($"Обработка записи {x}.");
            counter++;

            if (maxItems <= counter) {
                Console.WriteLine("Завершение обработки...");
                break;
            }
        }
    }
}

Как можно заметить, изменилась только одна строка – foreach стал асинхронным.

Запустим новый пример:

Загружена страница 1.
Обработка записи 1.
Обработка записи 2.
Обработка записи 3.
Загружена страница 2.
Обработка записи 4.
Завершение обработки...

Теперь данные загружаются только по необходимости. Как следствие – меньше обращений к хранилищу и меньший расход памяти по сравнению с исходным примером. При этом, DataProcessor ничего по прежнему не знает про то, как устроено хранилище и что данные загружаются постранично.

Добавить комментарий