Итерация IAsyncEnumerable в функции, возвращающей IAsyncEnumerable с отменой

Как сказано в названии, я должен выполнять следующие функции:

public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    {
        yield return job;
    }
}

У меня проблемы с осознанием того, куда идет жетон отмены, и неприятное ощущение, что я использую его в слишком многих местах.

Что на самом деле происходит здесь, когда вы разбираете все причудливые асинхронные штуки? И есть ли лучшие способы написать эту функцию?


person Rick de Water    schedule 07.11.2019    source источник
comment
AsAsyncEnumerable() уже возвращает IAsyncEnumerable<Job>. Вам не нужен остальной код, просто верните его, т.е. return context.Jobs.Where(job => job.Pipeline.Id == pipelineId) .AsAsyncEnumerable()   -  person Panagiotis Kanavos    schedule 08.11.2019
comment
@PanagiotisKanavos Однако он не поддерживает CancellationToken, а результат WithCancellation не может быть преобразован в IAsyncEnumerable   -  person Rick de Water    schedule 09.11.2019
comment
Эти вызовы имеют смысл только на сайте вызывающего абонента. Вам придется добавить их снова, когда вы попытаетесь перебрать IAsyncEnumerable   -  person Panagiotis Kanavos    schedule 11.11.2019
comment
Кроме того, в данном случае ни то, ни другое не имеет особого эффекта. Вы не можете отменить запрос, даже если отмените итерацию, в то время как ConfigureAwait(false) передает решение о том, где возобновить выполнение, вызывающей стороне.   -  person Panagiotis Kanavos    schedule 11.11.2019


Ответы (2)


Для начала этот метод можно свести к:

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();
}

или даже

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

Этот метод ничего не делает с job, поэтому повторять его не нужно.

Отмена

Что, если метод действительно использовал job, где следует использовать токен отмены?

Давайте немного очистим метод. Эквивалент:

public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId, 
      [EnumeratorCancellation] CancellationToken ct = default)
{
    //Just a query, doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    {
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    }
}

IQueryable query ничего не запускает, он представляет запрос. Не требует отмены.

AsAsyncEnumerable(), AsEnumerable(), ToList() и т. Д. выполнить запрос и вернуть результат. ToList() и т. Д. Потребляют все результаты, в то время как As...Enumerable() методы дают результаты только по запросу. Запрос не может быть отменен, методы As_Enumerable() ничего не вернут, пока не будут запрошены, поэтому они не нуждаются в отмене.

await foreach будет перебирать весь асинхронный поток, поэтому, если мы хотим иметь возможность его прервать, нам действительно необходимо передать токен отмены.

Наконец, нужна ли DoSometingExpensive(job); отмена? Неужели это так дорого, что мы хотим избавиться от него, если это займет слишком много времени? Или мы можем дождаться завершения, прежде чем выйти из цикла? Если требуется отмена, ему также понадобится CancellationToken.

ConfigureAwait

Наконец, ConfigureAwait(false) не участвует в отмене и может вообще не понадобиться. Без него после каждого await выполнения выполняется возврат к исходному контексту синхронизации. В настольном приложении это означало поток пользовательского интерфейса. Это то, что позволяет нам изменять пользовательский интерфейс в обработчике асинхронных событий.

Если GetByPipeline работает в настольном приложении и хочет изменить пользовательский интерфейс, ему придется удалить ConfugureAwait:

await foreach (var job in jobStream.WithCancellation(ct))
{
        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);
}

С ConfigureAwait(false) выполнение продолжается в потоке пула потоков, и мы не можем прикоснуться к пользовательскому интерфейсу.

Код библиотеки не должен влиять на возобновление выполнения, поэтому большинство библиотек используют ConfigureAwait(false) и оставляют окончательное решение разработчику пользовательского интерфейса.

Если GetByPipeline - библиотечный метод, используйте ConfigureAwait(false).

person Panagiotis Kanavos    schedule 08.11.2019

Представьте, что где-то глубоко внутри Entity Framework находится метод GetJobs, который извлекает Job объекты из базы данных:

private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await dataReader.ReadAsync(cancellationToken))
    {
        yield return new Job()
        {
            Id = (int)dataReader["Id"],
            Data = (byte[])dataReader["Data"]
        };
    }
}

Теперь представьте, что свойство Data содержит огромный массив байтов с данными, соответствующими Job. Получение массива каждого Job может занять некоторое нетривиальное время. В этом случае разрыва цикла между итерациями будет недостаточно, потому что будет заметная задержка между вызовом метода Cancel и повышением OperationCanceledException. Вот почему метод DbDataReader.ReadAsync требуется CancellationToken, чтобы запрос можно было немедленно отменить.

Теперь проблема состоит в том, как передать CancellationToken, переданный клиентским кодом, методу GetJobs, когда свойство, подобное context.Jobs, уже используется. Решением является расширение WithCancellation. , который хранит токен и передает его глубже, методу, принимающему аргумент, украшенный атрибутом _ 15_.

Значит в вашем случае вы все сделали правильно. Вы включили аргумент cancellationToken в метод возврата IAsyncEnumerable, что является рекомендуемой практикой. Таким образом, последующие WithCancellation связанные с вашим GetByPipeline методом не будут потрачены впустую. Затем вы добавили WithCancellation после AsAsyncEnumerable внутри вашего метода, что тоже правильно. В противном случае CancellationToken не достигнет своего конечного пункта назначения, метода GetJobs.

person Theodor Zoulias    schedule 08.11.2019