Одна из новых возможностей C# 8 – асинхронные потоки. Рассмотрим на примере как её использование может улучшить уже существующий код. |
Для примера возьмем следующую достаточно типовую задачу: необходимо считывать и заданным образом обрабатывать данные. Принципиальная деталь – хранилище данных возвращает записи постранично (как, например, это делают некоторые хранилища в Azure).
Хранилище данных
Чтобы не делать пример зависимым от какого-либо конкретного хранилища данных, создадим класс PagedDataSet, эмулирующий описанное выше поведение. Он будет возвращать до 5 страниц, каждая их которых будет содержать по 3 записи:
public class PagedDataSet
{
private const int _totalPages = 5;
private const int _pageSize = 3;
private int _currentPage = 0;
public PagedDataSet(string filter)
{
}
public bool HasMoreResults => this._currentPage + 1 <= _totalPages;
public async Task<IReadOnlyCollection<int>> GetNextPageAsync()
{
if (!this.HasData)
return new int[0];
IEnumerable<int> result = Enumerable.Range(this._currentPage * _pageSize + 1, _pageSize);
await Task.Delay(100); // эмулируем асинхронную работу
this._currentPage++;
Console.WriteLine($"Загружена страница {this._currentPage}.");
return result.ToArray();
}
}
Параметр filter здесь добавлен в конструктор чтобы показать, что данные выбираются по определенному условию. Однако, для простоты примера, реализация фильтра и его значения в вызовах отсутствуют.
Будем считать, что данный код находиться в сторонней библиотеке (например, в SDK) и возможности его изменить нет.
Обычная реализация
Прежде всего стоит отметить, что постраничное чтение данных это деталь реализации самого хранилища данных. А значит, эту подробность лучше оставить в слое доступа к данным. Поэтому начнем реализацию с класса доступа к данным (DAL):
public class ValuesDataAccess
{
public async Task<IReadOnlyCollection<int>> GetAsync(string filter)
{
var dataSource = new PagedDataSet(filter);
var result = new List<int>();
while (dataSource.HasMoreResults)
result.AddRange(await dataSource.GetNextPageAsync());
return result;
}
}
Здесь все просто – метод GetAsync(…) считывает из хранилища все данные, удовлетворяющие заданному фильтру, сохраняет их в виде списка и передает вызывающий стороне.
Теперь перейдем к обработчику данных:
public class DataProcessor
{
public async Task ProcessAsync()
{
int counter = 0;
int maxItems = 4;
var dataAccess = new ValuesDataAccess();
foreach (int x in await dataAccess.GetAsync(filter: "").ConfigureAwait(false)) {
Console.WriteLine($"Обработка записи {x}.");
counter++;
if (maxItems <= counter) {
Console.WriteLine("Завершение обработки...");
break;
}
}
}
}
Метод ProcessAnyc(…) эмулирует обработку полученных данных. При этом он остановит её, если число обработанных записей будет больше maxItems.
Если запустить пример, то в консоль будет выведен следующий результат:
Загружена страница 1.
Загружена страница 2.
Загружена страница 3.
Загружена страница 4.
Загружена страница 5.
Обработка записи 1.
Обработка записи 2.
Обработка записи 3.
Обработка записи 4.
Завершение обработки...
Как и ожидалось, данная реализация сначала загрузила все необходимые данные из хранилища (5 страниц). После чего начала обработку, завершим ее после 4 записи. Можно заметить, что достаточно бы было загрузить всего 2 страницы.
Разумеется, данные код можно оптимизировать, сделав и обработку данных также постраничной. Однако, как было отмечено выше, это приведет к тому, что реализация бизнес логики будет зависеть от деталей реализации конкретного хранилища данных.
Реализация с асинхронными потоками
Использование асинхронных потоков позволяет улучшить код, не раскрывая детали реализации хранилища. Начнем изменения с слоя данных. Вместо перепишем класс ValuesDataAccess так, чтобы метод GetAsync(…) вместо коллекции возвращал асинхронный поток:
public class ValuesDataAccess
{
public async IAsyncEnumerable<int> GetAsync(string filter)
{
var dataSource = new PagedDataSet(filter);
while (dataSource.HasMoreResults) {
var pageData = await dataSource.GetNextPageAsync();
foreach (int value in pageData)
yield return value;
}
}
}
Теперь GetAsync(…) будет читать по одной странице с записями и отдавать данные с нее по одному значению. И только после того, как данные на текущей странице закончатся, будет прочитана следующая страница.
Так же необходимо внести минимальные изменения в класс DataProcessor для работы с асинхронным потоком:
public class DataProcessor
{
public async Task ProcessAsync()
{
int counter = 0;
int maxItems = 4;
var dataAccess = new ValuesDataAccess();
await foreach (int x in dataAccess.GetAsync(filter: "").ConfigureAwait(false)) {
Console.WriteLine($"Обработка записи {x}.");
counter++;
if (maxItems <= counter) {
Console.WriteLine("Завершение обработки...");
break;
}
}
}
}
Как можно заметить, изменилась только одна строка – foreach стал асинхронным.
Запустим новый пример:
Загружена страница 1.
Обработка записи 1.
Обработка записи 2.
Обработка записи 3.
Загружена страница 2.
Обработка записи 4.
Завершение обработки...
Теперь данные загружаются только по необходимости. Как следствие – меньше обращений к хранилищу и меньший расход памяти по сравнению с исходным примером. При этом, DataProcessor ничего по прежнему не знает про то, как устроено хранилище и что данные загружаются постранично.