NamedPipeServerStream.ReadAsync() не завершается при отмене запросов CancellationToken

Когда поток NamedPipeServer считывает какие-либо данные из канала, он не реагирует на CancellationTokenSource.Cancel()

Почему это?

Как я могу ограничить время ожидания на сервере данных от клиента?

Код для воспроизведения:

static void Main(string[] args)
{
    Server();
    Clinet();
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

private static async Task Server()
{
    using (var cancellationTokenSource = new CancellationTokenSource(1000))
    using (var server = new NamedPipeServerStream("test",
        PipeDirection.InOut,
        1,
        PipeTransmissionMode.Byte,
        PipeOptions.Asynchronous))
    {
        var cancellationToken = cancellationTokenSource.Token;
        await server.WaitForConnectionAsync(cancellationToken);
        await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
        var buffer = new byte[4];
        await server.ReadAsync(buffer, 0, 4, cancellationToken);
        Console.WriteLine("exit server");
    }
}

private static async Task Clinet()
{
    using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
    {
        var buffer = new byte[4];
        client.Connect();
        client.Read(buffer, 0, 4);
        await Task.Delay(5000);
        await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
        Console.WriteLine("client exit");
    }
}

Ожидаемый результат:

exit server
<client throws exception cuz server closed pipe>

Фактический результат:

client exit
exit server

ИЗМЕНИТЬ

Ответ с CancelIo кажется многообещающим, и он позволяет серверу завершить связь при отмене токена отмены. Однако я не понимаю, почему мой «базовый сценарий» перестал работать при использовании ReadPipeAsync.

Вот код, он включает в себя 2 клиентские функции:

  1. Clinet_ShouldWorkFine - хороший клиент, который вовремя читает/пишет
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - клиент слишком медленный, сервер должен прекратить связь

Ожидал:

  1. Clinet_ShouldWorkFine - выполнение завершается без каких-либо исключений
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - сервер закрывает канал, клиент выдает исключение

Действительный:

  1. Clinet_ShouldWorkFine - сервер останавливается при первом вызове ReadPipeAsync, канал закрывается через 1 с, клиент выдает исключение
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - сервер закрывает канал, клиент выдает исключение

Почему Clinet_ShouldWorkFine не работает, когда сервер использует ReadPipeAsync

class Program
{
    static void Main(string[] args) {
        // in this case server should close the pipe cuz client is too slow
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        // in this case server should exchange data with client fine
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ShouldWorkFine();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        Console.WriteLine("press [enter] to exit");
        Console.ReadLine();
    }

    private static async Task Server()
    {
        using (var cancellationTokenSource = new CancellationTokenSource(1000))
        using (var server = new NamedPipeServerStream("test",
            PipeDirection.InOut,
            1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous))
        {
            var cancellationToken = cancellationTokenSource.Token;
            await server.WaitForConnectionAsync(cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            var buffer = new byte[4];
            var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            Console.WriteLine("exit server");
        }
    }

    private static async Task Clinet_ShouldWorkFine()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }

    private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await Task.Delay(5000);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }
}

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

person inwenis    schedule 03.10.2018    source источник
comment
@MrinalKamboj Я использую new CancellationTokenSource(1000), который вызывает .Cancel() по прошествии указанного времени - в данном случае через 1000 мс.   -  person inwenis    schedule 06.10.2018
comment
@MrinalKamboj, где я должен добавить Task.Delay(1000)? Извините, я не понимаю. Примечание: приведенный выше код точно демонстрирует, где я столкнулся с этой проблемой. Я запускаю скрипт Python из C# и общаюсь с ним через канал. Я не могу просто добавить задержки здесь или там, так как знаю, что C# застревает точно на ReadAsync().   -  person inwenis    schedule 06.10.2018


Ответы (4)


У .NET-программистов возникают ужасные проблемы с async/await, когда они пишут такие маленькие тестовые программы. Он плохо сочиняет, это черепахи на всем протяжении. В этой программе отсутствует финальная черепаха, задачи зашли в тупик. Никто не заботится о выполнении продолжений задач, как это обычно происходит (скажем) в приложении с графическим интерфейсом. Чрезвычайно сложно отлаживать.

Сначала внесите небольшое изменение, чтобы тупик был полностью виден:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Это устраняет неприятный маленький краеугольный случай, метод сервера, который полностью приводит к сообщению «Сервер вышел». Хроническая проблема с классом Task заключается в том, что когда задача завершается или ожидаемый метод завершается синхронно, он пытается запустить продолжение напрямую. Это работает в этой программе. Заставив его получить асинхронный результат, тупик теперь очевиден.


Следующим шагом будет исправление функции Main(), чтобы эти задачи больше не блокировались. Это может выглядеть так:

static void Main(string[] args) {
    try {
        var tasks = new Task[3];
        tasks[0] = Server();
        tasks[1] = tasks[0].ContinueWith(c => {
            Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
        });
        tasks[2] = Clinet();
        Task.WhenAll(tasks).Wait();
    }
    catch (Exception ex) {
        Console.WriteLine(ex);
    }
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

Теперь у нас есть шанс продвинуться вперед и решить проблему отмены. Класс NamedPipeServerStream не реализует сам ReadAsync, он наследует метод от одного из своих базовых классов, Stream. У него есть маленькая крысиная деталь, которая полностью задокументирована. Вы можете увидеть это, только если посмотрите на исходный код фреймворка. Он может обнаружить отмену только тогда, когда отмена произошла до вызова ReadAsync(). Как только чтение началось, оно больше не может видеть отмену. Конечная проблема, которую вы пытаетесь решить.

Это поправимая проблема, но у меня есть смутное представление, почему Microsoft не сделала этого для PipeStreams. Обычный способ принудительного завершения метода BeginRead() — это метод Dispose() объекта, а также единственный способ, которым можно прервать Stream.ReadAsync(). Но есть и другой способ: в Windows можно прервать операцию ввода-вывода с помощью Отмена(). Давайте сделаем это методом расширения:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

И, наконец, настройте сервер, чтобы использовать его:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Имейте в виду, что этот обходной путь специфичен для Windows, поэтому он не может работать в программе .NETCore, предназначенной для разновидности Unix. Затем рассмотрим более тяжелый молот, вызовите pipe.Close() в методе CancelPipeIo().

person Community    schedule 06.10.2018
comment
Это действительно интересно. Теперь я понимаю, почему сервер был deaf для отмены сообщений. Однако я не знаю, почему использование ReadPipeAsync не работает в моем Happy path. В мой исходный пост добавлен код для демонстрации проблемы с хорошим клиентом. - person inwenis; 07.10.2018
comment
Пожалуйста, будьте более ясны в отношении того, что не работает и что означает счастливый путь. Как написано, клиент не должен работать, вы должны получить сообщение об исключении Pipe is disabled. - person Hans Passant; 08.10.2018
comment
Я добавил правку в исходный пост. Включает в себя 2 клиента. 1. Clinet_ServerShouldEndCommunication_CuzClientIsSlow() представляет собой медленного клиента, у него есть Task.Delay(5000), в этом случае сервер должен прекратить связь, потому что клиент слишком медленный. 2. Clinet_ShouldWorkFine() представляет собой HappyPath т.е. хорошо работающий клиент, сервер должен иметь возможность обмениваться данными с Clinet_ShouldWorkFine() без каких-либо ошибок/исключений. - person inwenis; 08.10.2018
comment
Здесь должно быть AsyncCallback вместо null? Почти все подобные коды именованных каналов, которые я вижу, имеют обратный вызов. Просто интересно, почему это опущено здесь? - person Michael Parker; 08.03.2019
comment
Отличное предложение Ганс. Я использовал CancelIo в своих неуправляемых приложениях, не знаю, почему мне не пришло в голову использовать его в моем управляемом приложении. - person WBuck; 13.09.2019
comment
Я попробовал это и, как и в пересмотренном вопросе ОП выше, обнаружил, что это не работает для меня. Что работает, так это заменить new Task<int> в ReadPipeAsync на Task.Run. Но я не претендую на то, что знаю об этом достаточно, чтобы понять, какой еще хаос я могу вызвать! - person Trygve; 06.02.2020

ReadAsync Сначала проверьте отмену, затем начните чтение, если токен отменен, он не действует.

добавить следующую строку

отменаToken.Register(server.Disconnect);

using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
    PipeDirection.InOut,
    1,
    PipeTransmissionMode.Byte,
    PipeOptions.Asynchronous))
{
    var cancellationToken = cancellationTokenSource.Token;
    cancellationToken.Register(server.Disconnect);
    await server.WaitForConnectionAsync(cancellationToken);
    await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
    Console.WriteLine("exit server");
}
person Milad    schedule 06.10.2018
comment
Как Sever.Disconnect помогает здесь при вызове отмены. ОП ищет Исключение, которого не происходит - person Mrinal Kamboj; 06.10.2018
comment
если канал сервера закрыт, метод Clinet() выдаст исключение. как и ожидаемый результат. - person Milad; 06.10.2018
comment
Подлинное исключение должно исходить от сервера, клиент не должен создавать исключение на основе простого события, такого как Disconnect. - person Mrinal Kamboj; 06.10.2018
comment
@Милад, спасибо за это. На самом деле в настоящее время я использую cancellationToken.Register(server.Disconnect); в качестве обходного пути, и он действительно обрабатывает первоначальный случай, который я опубликовал правильно. Однако это не работает, когда клиент не читает данные из канала. В этом случае сервер ждет WriteAsync, и когда происходит server.Disconnect, WriteAsync выдает PipeClosedException или подобное. - person inwenis; 07.10.2018

Я просто смотрю на ваш код и, возможно, смотрю на него свежим взглядом...

Насколько я могу судить, как в вашем исходном, так и в более сложных сценариях... вы передаете уже отмененный токен отмены, что довольно непредсказуемо, как другие реализуют (если таковые имеются) исключения, выбрасываемые в методах...

Используйте свойство IsCancellationRequested, чтобы проверить, отменен ли токен, и не передавать отмененные токены.

Вот пример добавления этого в ваш код из исходного вопроса (вы можете сделать то же самое для своего более позднего метода ReadPipeAsync.

var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);

if(!cancellationToken.IsCancellationRequested)
{
    await server.WriteAsync(new byte[] { 1, 2, 3, 4 }, 0, 4, cancellationToken);
}

if(!cancellationToken.IsCancellationRequested)
{
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
}

Console.WriteLine("exit server");

приведенный выше код приведет к

exit server
client exit

который, я думаю, был и вашим очень оригинальным вопросом...

person Svek    schedule 12.10.2018

Ответ Ханса Пассана идеален... почти. Единственная проблема заключается в том, что CancelIo() отменяет запрос, сделанный из того же потока. Это не сработает, если задача будет возобновлена ​​в другом потоке. К сожалению, у меня недостаточно очков репутации, чтобы напрямую прокомментировать его ответ, поэтому я отвечаю отдельно.

Поэтому последнюю часть его примера кода следует переписать следующим образом:

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIoEx(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIoEx(SafePipeHandle handle, IntPtr _ = default);

Обратите внимание, что CancelIoEx() доступен в Vista/Server 2008 и более поздних версиях, а CancelIo() также доступен в Windows XP.

person Vadim Zhukov    schedule 09.06.2021