Создание наблюдателя асинхронных ресурсов в С# (ресурс очереди сервис-брокера)

Отчасти в качестве упражнения по изучению асинхронности я бы попытался создать класс ServiceBrokerWatcher. Идея почти такая же, как у FileSystemWatcher — наблюдайте за ресурсом и вызывайте событие, когда что-то происходит. Я надеялся сделать это с помощью async, а не фактически создавать поток, потому что природа зверя означает, что большую часть времени он просто ожидает оператора SQL waitfor (receive ...). Это казалось идеальным использованием асинхронности.

Я написал код, который «работает», когда я отправляю сообщение через брокера, класс замечает это и запускает соответствующее событие. Я думал, что это было супер аккуратно.

Но я подозреваю, что где-то в корне ошибся в своем понимании того, что происходит, потому что, когда я пытаюсь остановить наблюдателя, он ведет себя не так, как я ожидал.

Сначала краткий обзор компонентов, а затем собственно код:

У меня есть хранимая процедура, которая выдает waitfor (receive...) и возвращает набор результатов клиенту при получении сообщения.

Существует Dictionary<string, EventHandler>, который сопоставляет имена типов сообщений (в результирующем наборе) с соответствующим обработчиком событий. Для простоты у меня есть только один тип сообщения в примере.

Класс наблюдателя имеет асинхронный метод, который зацикливается «навсегда» (до тех пор, пока не будет запрошена отмена), который содержит выполнение процедуры и инициирование событий.

Так в чем проблема? Что ж, я попытался разместить свой класс в простом приложении winforms, и когда я нажимаю кнопку для вызова метода StopListening() (см. ниже), выполнение не отменяется сразу, как я думал. Строка listener?.Wait(10000) на самом деле будет ждать 10 секунд (или столько, сколько я установил тайм-аут). Если я посмотрю, что происходит с профилировщиком SQL, я увижу, что событие внимания отправляется «сразу же», но функция все равно не завершается.

Я добавил комментарии к коду, начинающиеся с "!" где я подозреваю, что я что-то неправильно понял.

Итак, главный вопрос: почему мой метод ListenAsync не "соблюдает" мой запрос на отмену?

Кроме того, правильно ли я думаю, что эта программа (большую часть времени) потребляет только один поток? Я сделал что-нибудь опасное?

Далее следует код, я попытался сократить его настолько, насколько мог:

// class members //////////////////////
private readonly SqlConnection sqlConnection;
private CancellationTokenSource cts;
private readonly CancellationToken ct;
private Task listener;
private readonly Dictionary<string, EventHandler> map;

public void StartListening()
{
    if (listener == null)
    {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        // !I suspect assigning the result of the method to a Task is wrong somehow...
        listener = ListenAsync(ct); 
    }
}

public void StopListening()
{
    try
    {
        cts.Cancel(); 
        listener?.Wait(10000); // !waits the whole 10 seconds for some reason
    } catch (Exception) { 
        // trap the exception sql will raise when execution is cancelled
    } finally
    {
        listener = null;
    }
}

private async Task ListenAsync(CancellationToken ct)
{
    using (SqlCommand cmd = new SqlCommand("events.dequeue_target", sqlConnection))
    using (CancellationTokenRegistration ctr = ct.Register(cmd.Cancel)) // !necessary?
    {
        cmd.CommandTimeout = 0;
        while (!ct.IsCancellationRequested)
        {
            var events = new List<string>();    
            using (var rdr = await cmd.ExecuteReaderAsync(ct))
            {
                while (rdr.Read())
                {
                    events.Add(rdr.GetString(rdr.GetOrdinal("message_type_name")));
                }
            }
            foreach (var handler in events.Join(map, e => e, m => m.Key, (e, m) => m.Value))
            {
                if (handler != null && !ct.IsCancellationRequested)
                {
                    handler(this, null);
                }
            }
        }
    }
}

person allmhuran    schedule 26.10.2018    source источник
comment
Приложение: если я запускаю профилировщик запросов, я вижу, что событие внимания попадает в SQL Server почти сразу же, как только я вызываю StopListening(), так что эта часть, кажется, работает, но listenerTask все еще не завершается. Я чувствую, что мог пропустить что-то совершенно очевидное здесь...   -  person allmhuran    schedule 26.10.2018
comment
Позвоните ct.ThrowIfCancellationRequested() вместо проверки ct.IsCancellationRequested в ListenAsync   -  person Collin Dauphinee    schedule 26.10.2018
comment
Спасибо, Коллин, но я, должно быть, делаю что-то неправильно. Я пытался добавить ct.ThrowIfCancellationRequested() в несколько разных мест функции (в начале и в конце цикла while), но поведение было таким же.   -  person allmhuran    schedule 26.10.2018


Ответы (2)


Вы не показываете, как вы привязали его к приложению WinForms, но если вы используете обычные методы void button1click, вы можете столкнуться с эта проблема.

Таким образом, ваш код будет нормально работать в консольном приложении (это происходит, когда я его пытаюсь), но блокируется при вызове через поток пользовательского интерфейса.

Я бы предложил изменить класс вашего контроллера, чтобы открыть методы запуска и остановки async и вызвать их, например:

    private async void btStart_Click(object sender, EventArgs e)
    {
        await controller.StartListeningAsync();
    }

    private async void btStop_Click(object sender, EventArgs e)
    {
        await controller.StopListeningAsync();
    }
person Peter Wishart    schedule 26.10.2018
comment
Спасибо, Питер. Да, я использовал простые обработчики пустых кнопок, которые создаются автоматически, когда вы просто дважды щелкаете элемент управления в области дизайна. Приложение уже реагировало на нажатие кнопки запуска. Я могу сделать этот вызов асинхронным, но он ведет себя так же — события в очереди запускают обработчик, который добавляет строку текста в текстовое поле. Программу еще можно таскать и так далее. Однако я не уверен, что бы я await использовал в методе StopListening(). Там нет никаких ожиданий, просто cts.Cancel(), а затем я даю задаче возможность выйти. - person allmhuran; 26.10.2018
comment
При дальнейшем изучении кажется, что только метод остановки должен быть асинхронным. Программа по-прежнему правильно работает с исходным определением StartListening(). Кроме того, в методе StopListening() мне не нужно добавлять .ConfigureAwait(false). Я могу просто сделать cts.Cancel(); await listener;. Но я до сих пор не совсем понимаю, что вызывает 10-секундное ожидание в исходном определении. Какие два процесса конкурируют за какие ресурсы в тупике? Если одним из них является сам метод ListenAsync(), почему он также не вызывает взаимоблокировку при вызове StartListening()? - person allmhuran; 26.10.2018
comment
Насколько мне известно, тупик возникает из-за того, что поток пользовательского интерфейса блокирует ожидание завершения задачи, задаче требуется асинхронный метод для запуска продолжения, прежде чем она сможет завершиться, и задача захватила контекст потока пользовательского интерфейса для планирования завершения. Чего не может быть, потому что поток пользовательского интерфейса заблокирован. - person Peter Wishart; 26.10.2018
comment
StartListening() не затрагивается, потому что на самом деле ничего не ожидает, так что да, это может быть просто обычный метод void. Или он может вернуть Task.CompletedTask и быть ожидаемым, не являясь на самом деле async, просто для симметрии. - person Peter Wishart; 26.10.2018

У Питера был правильный ответ. Несколько минут я был в замешательстве по поводу того, что зашло в тупик, но потом меня ударили по лбу. Это продолжение ListenAsync после отмены ExecuteReaderAsync, поскольку это просто задача, а не отдельный поток. Ведь в этом была вся суть!

Затем я подумал... Хорошо, а что, если я скажу асинхронной части ListenAsync(), что ей не нужен поток пользовательского интерфейса. Я позвоню ExecuteReaderAsync(ct) с .ConfigureAwait(false)! Ага! Теперь методы класса больше не должны быть асинхронными, потому что в StopListening() я могу просто listener.Wait(10000), ожидание продолжит выполнение задачи внутри другого потока, а потребитель ни о чем не догадывается. О, мальчик, такой умный.

Но нет, я не могу этого сделать. По крайней мере, не в приложении веб-форм. Если я это сделаю, текстовое поле не обновится. И причина этого кажется достаточно ясной: внутренности ListenAsync вызывают обработчик событий, и этот обработчик событий является функцией, которая хочет обновить текст в текстовом поле, что, несомненно, должно происходить в потоке пользовательского интерфейса. Таким образом, он не блокируется, но также не может обновлять пользовательский интерфейс. Если я устанавливаю точку останова в обработчике, который хочет обновить пользовательский интерфейс, строка кода срабатывает, но пользовательский интерфейс нельзя изменить.

Так что, в конце концов, кажется, что единственное решение в этом случае — это действительно «полностью асинхронизировать». Или в этом случае вверх!

Я надеялся, что мне не придется этого делать. Тот факт, что внутренности моего Watcher используют асинхронные методологии, а не просто порождают поток, является, на мой взгляд, «деталью реализации», о которой вызывающая сторона не должна заботиться. Но FileSystemWatcher имеет точно такую ​​же проблему (необходимость control.Invoke, если вы хотите обновить графический интерфейс на основе события наблюдателя), так что это не так уж плохо. Если бы я был потребителем, которому нужно было выбирать между использованием асинхронности и Invoke, я бы выбрал асинхронность!

person allmhuran    schedule 26.10.2018