Что такое асинхронный/ожидающий эквивалент сервера ThreadPool?

Я работаю над tcp-сервером, который выглядит примерно так, используя синхронный API и пул потоков:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

Предполагая, что моя цель состоит в том, чтобы обрабатывать как можно больше одновременных подключений с наименьшим использованием ресурсов, кажется, что она будет быстро ограничена количеством доступных потоков. Я подозреваю, что с помощью API неблокирующих задач я смогу справиться с гораздо большим количеством задач с меньшими ресурсами.

Мое первое впечатление примерно такое:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

Но мне кажется, что это может вызвать узкие места. Возможно, HandleConnectionAsync займет необычно много времени, чтобы выполнить первое ожидание, и остановит выполнение основного цикла принятия. Будет ли это использовать только один поток, или среда выполнения будет волшебным образом запускать вещи в нескольких потоках по своему усмотрению?

Есть ли способ объединить эти два подхода, чтобы мой сервер использовал ровно столько потоков, сколько ему нужно для количества активно работающих задач, но чтобы он не блокировал потоки без необходимости при операциях ввода-вывода?

Есть ли идиоматический способ максимизировать пропускную способность в такой ситуации?


person captncraig    schedule 09.01.2014    source источник


Ответы (5)


Я бы позволил Framework управлять потоками и не создавал бы никаких дополнительных потоков, если только тесты профилирования не предполагают, что мне это может понадобиться. Особенно, если вызовы внутри HandleConnectionAsync в основном связаны с вводом-выводом.

В любом случае, если вы хотите освободить вызывающий поток (диспетчер) в начале HandleConnectionAsync, есть очень простое решение. Вы можете перейти к новому потоку из ThreadPool с помощью await Yield(). Это работает, если ваш сервер работает в среде выполнения, в которой не установлен контекст синхронизации в исходном потоке (консольное приложение, служба WCF). , что обычно имеет место для TCP-сервера.

Это иллюстрирует следующий пример (исходный код взят из здесь). Обратите внимание, что основной цикл while не создает никаких потоков явно:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}

В качестве альтернативы код может выглядеть так, как показано ниже, без await Task.Yield(). Обратите внимание: я передаю лямбда async в Task.Run, потому что я все еще хочу использовать асинхронные API внутри HandleConnectionAsync и использовать там await:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

Обновлено на основе комментария: если это будет библиотечный код, среда выполнения действительно неизвестна и может иметь нестандартный контекст синхронизации. В этом случае я бы предпочел запустить основной цикл сервера в потоке пула (который свободен от любого контекста синхронизации):

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}

Таким образом, все дочерние задачи, созданные внутри StartListener, не будут затронуты контекстом синхронизации клиентского кода. Таким образом, мне не пришлось бы нигде явно вызывать Task.ConfigureAwait(false).

Обновлено в 2020 году, кто-то только что задал хороший вопрос за пределами сайта:

Мне было интересно, в чем причина использования блокировки здесь? Это не обязательно для обработки исключений. Я понимаю, что блокировка используется, потому что List не является потокобезопасным, поэтому реальный вопрос заключается в том, зачем добавлять задачи в список (и нести затраты на блокировку под нагрузкой).

Так как Task.Run отлично отслеживает запущенные задачи, я думаю, что в этом конкретном примере блокировка бесполезна, однако вы помещаете ее туда, потому что в реальной программе наличие задач в списке позволяет нам для например, повторять текущие задачи и завершать задачи без ошибок, если программа получает сигнал завершения от операционной системы.

Действительно, в реальном сценарии мы почти всегда хотим отслеживать задачи, которые мы начинаем с Task.Run (или любых других объектов Task, которые находятся «в полете»), по нескольким причинам:

  • Для отслеживания исключений задач, которые в противном случае могут быть проглочены без уведомления, если они останутся незамеченными в другом месте.
  • Чтобы иметь возможность асинхронно ожидать завершения всех ожидающих задач (например, рассмотрите возможность использования кнопки запуска/остановки пользовательского интерфейса или обработки запроса на запуск/остановку внутри безголовой службы Windows).
  • Чтобы иметь возможность контролировать (и регулировать/ограничивать) количество задач, которые мы позволяем выполнять одновременно.

Существуют лучшие механизмы для обработки реальных параллельных рабочих процессов (например, библиотека потоков данных TPL), но я специально включил сюда список задач и блокировку, даже в этом простом примере. Может показаться заманчивым использовать подход «выстрелил и забыл», но это почти никогда не бывает хорошей идеей. По моему собственному опыту, когда я действительно хотел запустить и забыть, я использовал для этого методы async void (проверьте это) .

person noseratio    schedule 09.01.2014
comment
Очень интересно. Если я не могу гарантировать среду, в которой он будет работать (это в библиотеке), похоже, что документация для Task.Yield говорит, что не следует рассчитывать на правильное переключение SynchronizatonContext обратно. Task.Run может быть моим самым безопасным вариантом. - person captncraig; 10.01.2014
comment
@captncraig, в документации подразумевается, что его нельзя использовать для длительной обработки в потоке пользовательского интерфейса (например, в тесном цикле с await Task.Yeild()). Это потому, что син. контекст потока пользовательского интерфейса использует PostMessage для этого глубоко внутри, что может взять на себя другие сообщения ввода пользователя, такие как мышь и клавиатура, и заблокировать пользовательский интерфейс. Все это неприменимо к контекстно-свободной среде, где Task.Yield() просто использует ThreadPool.QueueUserWorkItem, являясь удобным ожидаемым ярлыком. Дополнительная информация: stackoverflow.com/q/20319769/1768303 - person noseratio; 10.01.2014
comment
@Noseratio Не могли бы вы расширить использование Task.Run внутри вашего кода? как я понимаю, это в первую очередь операция ввода-вывода, ожидающая получения данных TCPClient. Почему должен запускаться новый поток ThreadPool для цикла while (true)? - person Yuval Itzchakov; 06.06.2014
comment
@YuvalItzchakov, вас беспокоит второй или третий фрагмент кода? - person noseratio; 06.06.2014
comment
Третий - StartListener - person Yuval Itzchakov; 06.06.2014
comment
@YuvalItzchakov, это нужно только для того, чтобы избавиться от любого контекста синхронизации, который клиентский код может иметь в вызывающем потоке. Это одноразовая вещь, и ее можно заменить на await tcpListener.AcceptTcpClientAsync().ConfigureAwait(false). - person noseratio; 06.06.2014
comment
Разве это не пустая трата времени, чтобы раскрутить новую тему только для того, чтобы стряхнуть SynchronizationContext? - person Yuval Itzchakov; 06.06.2014
comment
@YuvalItzchakov, я не думаю, что в данном случае это пустая трата времени. Во-первых, он будет запускать поток new только тогда, когда все потоки ThreadPool заняты, что не так, поскольку прослушиватель только что запущен. Во-вторых, этот поток возвращается в пул в точке 1-го await, а затем быстро повторно используется внутри HandleConnectionAsync при подключении следующего клиента (из-за await Task.Yield() в 1-м фрагменте или Task.Run во 2-м). Так что это не пустая трата, и это позволяет мне не беспокоиться о синхронизации. контексте, особенно если я использую сторонний код, который может не иметь ConfigureAwait. - person noseratio; 06.06.2014
comment
@YuvalItzchakov, если вам также интересно, почему там await Task.Yield()/Task.Run, у меня тоже есть объяснение этому :) - person noseratio; 06.06.2014
comment
@Noseratio Я был бы рад, если бы вы тоже поделились этим объяснением :) - person Yuval Itzchakov; 06.06.2014
comment
@YuvalItzchakov, в этом простом примере HandleConnectionAsync выполняет только неблокирующий асинхронный ввод-вывод, поэтому await Task.Yield() (или Task.Run) является избыточным. Я по-прежнему помещаю его туда, потому что если внутри HandleConnectionAsync реального приложения есть какая-либо блокирующая или связанная с процессором активность, она заблокирует прослушиватель. Другие клиенты не смогут подключиться, пока HandleConnectionAsync не вернется (или пока 1-й await внутри него). OP на самом деле хотел использовать некоторые синхронные API, как стоит вопрос. - person noseratio; 06.06.2014
comment
Здорово. Спасибо за объяснение! - person Yuval Itzchakov; 06.06.2014
comment
В какой степени этот код будет масштабироваться (скажем) до 10 000+ одновременных клиентов? - person agnsaft; 28.09.2017
comment
@agnsaft, мне тоже было бы интересно это узнать) Я полагаю, что он должен хорошо масштабироваться, потому что он не блокирует потоки, но я не делал никакого профилирования. - person noseratio; 28.09.2017

В существующих ответах правильно предлагается использовать Task.Run(() => HandleConnection(client));, но не объясняется, почему.

Вот почему: вы обеспокоены тем, что HandleConnectionAsync может потребоваться некоторое время, чтобы выполнить первое ожидание. Если вы придерживаетесь асинхронного ввода-вывода (как и следует в данном случае), это означает, что HandleConnectionAsync выполняет работу, связанную с ЦП, без какой-либо блокировки. Это идеальный случай для пула потоков. Это сделано для запуска короткой, неблокирующей работы ЦП.

И вы правы, что цикл приема будет дросселироваться HandleConnectionAsync, прежде чем вернуться (возможно, потому, что в нем есть значительная работа, связанная с процессором). Этого следует избегать, если вам нужна высокая частота новых подключений.

Если вы уверены, что нет значительной работы по регулированию цикла, вы можете сохранить дополнительный пул потоков Task и не делать этого.

В качестве альтернативы вы можете одновременно запускать несколько приемов. Замените await Serve(); на (например):

var serverTasks =
    Enumerable.Range(0, Environment.ProcessorCount)
    .Select(_ => Serve());
await Task.WhenAll(serverTasks);

Это устраняет проблемы масштабируемости. Обратите внимание, что await проглотит все ошибки, кроме одной здесь.

person usr    schedule 09.01.2014

Пытаться

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    Task.Run(() => this.HandleConnection(client));
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}
person Aron    schedule 09.01.2014
comment
Как это отвечает на первоначальный вопрос? - person Leri; 09.01.2014
comment
Разве это не порождает новый поток для каждого соединения, как это делает моя первоначальная реализация? - person captncraig; 09.01.2014
comment
Я думал, вы хотите сделать это с помощью Tasks... в его нынешнем виде вы ничего не можете сделать с вашим сервером пула потоков, что может ограничить количество ваших потоков. Вам необходимо использовать неблокирующие API для каждого из ваших вызовов ввода-вывода. - person Aron; 09.01.2014
comment
Разве это не порождает новый поток для каждого номера соединения. Он порождает новую Задачу, но одна Задача не равна одному Потоку. Один поток может обрабатывать более одной задачи, а новые потоки создаются по запросу. - person DasKrümelmonster; 09.01.2014
comment
Task.Run — это определенно не запуск нового потока. Task.Run будет использовать существующий поток из ThreadPool, а Thread.Start запустит новый поток, который никогда не будет использоваться повторно. - person Panagiotis Kanavos; 09.01.2014
comment
Чтобы было ясно, OP также напрямую использует пул потоков. Если неверно говорить, что Task.Run() порождает новые потоки, также неверно говорить о попытке OP. Игнорирование возвращенного Task по существу эквивалентно коду OP. - person Marc L.; 15.04.2020

Согласно http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType не следует использовать возвращаемый тип void, так как он не может перехватывать исключения. Как вы указали, вам нужны задачи «запустить и забыть», поэтому я пришел к выводу, что вы всегда должны возвращать Task (как сказал Microsoft), но вы должны поймать ошибку, используя:

TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);

Пример, который я использовал в качестве доказательства, приведен ниже:

public static void Main()
{
    Awaitable()
        .ContinueWith(
            i =>
                {
                    foreach (var exception in i.Exception.InnerExceptions)
                    {
                        Console.WriteLine(exception.Message);
                    }
                },
            TaskContinuationOptions.OnlyOnFaulted);
    Console.WriteLine("This needs to come out before my exception");
    Console.ReadLine();
}

public static async Task Awaitable()
{
    await Task.Delay(3000);
    throw new Exception("Hey I can catch these pesky things");
}
person Luke    schedule 25.09.2014

Есть ли причина, по которой вам нужно принимать асинхронные соединения? Я имею в виду, дает ли вам какое-либо значение ожидание любого подключения клиента? Единственной причиной для этого будет то, что в ожидании соединения на сервере будет выполняться какая-то другая работа. Если есть, вы, вероятно, могли бы сделать что-то вроде этого:

    public async void Serve()
    {
        while (true)
        {
            var client = await _listener.AcceptTcpClientAsync();
            Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
        }
    }

Таким образом, принятие освобождает текущий поток, оставляя возможность для выполнения других действий, а обработка выполняется в новом потоке. Единственными накладными расходами будет создание нового потока для обработки клиента, прежде чем он сразу вернется к принятию нового соединения.

Изменить: только что понял, что это почти тот же код, который вы написали. Думаю, мне нужно еще раз прочитать ваш вопрос, чтобы лучше понять, что вы на самом деле спрашиваете: S

Редактировать2:

Есть ли способ объединить эти два подхода, чтобы мой сервер использовал ровно столько потоков, сколько ему нужно для количества активно работающих задач, но чтобы он не блокировал потоки без необходимости при операциях ввода-вывода?

Думаю, мое решение действительно отвечает на этот вопрос. Хотя так ли это необходимо?

Edit3: Task.Factory.StartNew() фактически создает новый поток.

person EMB    schedule 09.01.2014
comment
Прием соединения асинхронно добавляет ценности, поскольку не тратит поток на то, чтобы ничего не делать, кроме ожидания соединения. В большинстве случаев ожидание соединения занимает гораздо больше времени, чем фактическая обработка запроса. Кроме того, Task.StartNew не создает новый поток, а использует поток из ThreadPool. - person Panagiotis Kanavos; 09.01.2014
comment
Конечно, это добавляет ценности, если есть другая работа, поэтому мой вопрос в моем первоначальном ответе. Я действительно не вижу выигрыша, если нет другой работы, если только нет огромной разницы между заблокированным потоком и приостановленным потоком. Намек на TaskCreationOptions.LongRunning должен запускать это в новом потоке, но вы были правы насчет этого. - person EMB; 09.01.2014