Запись сетевого потока заблокирована

Я работаю над приложением С# (.net 4), которое принимает несколько TCP-соединений от разных клиентов. Существует один прослушиватель tcp, который принимает сокет. Связь ч/б узлов в дуплексе. Данные отправляются с помощью метода Networkstream.Write и считываются с помощью метода Networkstream.read. Для каждого tcp-соединения создается отдельный поток.

Проблема в том, что несколько дней назад мы заметили, что один из клиентов перестал читать данные (из-за бага) на 20 минут. Поскольку соединение не было прервано, на сервере не было исключения (IO). Однако мы заметили, что данные на других клиентах тоже не шли. Через 20 минут этот клиент снова начал получать данные, и вскоре другие клиенты также начали получать данные.

Я знаю, что метод записи сетевого потока является методом блокировки, и мы не используем никаких тайм-аутов. Таким образом, существует вероятность того, что запись заблокирована (описано здесь). Но, как я понял, для каждого tcp-соединения должен быть отдельный буфер записи или есть что-то еще. Может ли блокировка отправки при TCP-соединении влиять на другие TCP-соединения в том же приложении?

Вот псевдокод операции записи. Для каждого соединения существует отдельный процесс исходящей очереди отдельным потоком.

public class TCPServerListener : baseConnection
{

    private readonly int _Port;
    private TcpListener _tcpListener;
    private Thread _thread;
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
    private long _messageDiscardTimeout;
    private bool LoopForClientConnection = true;

    public TCPServerListener(int port, ThreadPriority threadPriority)
    {
        try
        {
            // init property
        }
        catch (Exception ex)
        {
            // log
        }
    }

    public void SendMessageToAll(int type)
    {
        base.EnqueueMessageToSend(type, _tcpClientDataList);
    }
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
    {
        base.EnqueueMessageToSend(type, tcpClientList);
    }
    public void SendMessage(int type, TcpClient tcpClient)
    {
        base.EnqueueMessageToSend(type, tcpClient);
    }



    private void AcceptClientConnections()
    {
        while (LoopForClientConnection)
        {
            try
            {
                Socket socket = _tcpListener.AcceptSocket();
                TcpClientData tcpClientData = new TcpClientData();
                tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
                tcpClientData.tcpClientThread.Priority = _threadPriority;
                tcpClientData.tcpClientThread.IsBackground = true;
                tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
                tcpClientData.tcpClient = new TcpClient();
                tcpClientData.tcpClient.Client = socket;
                _tcpClientDataList.Add(tcpClientData);
                tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
            }
            catch (ThreadAbortException ex)
            {
                //log

            }
            catch (Exception ex)
            {
                //log
            }
        }
    }




    public override void Start()
    {
        base.Start();
        _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);

        _thread = new Thread(AcceptClientConnections);
        _thread.Priority = _threadPriority;
        _thread.IsBackground = true;

        _tcpListener.Start();
        _thread.Start();
    }

    public override void Stop()
    {
       // stop listener and terminate threads
    }
}


public class baseConnection
{
    private Thread _InCommingThread;
    private Thread _OutGoingThread;
    protected ThreadPriority _threadPriority;
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();

    public void StartAsync(Object oTcpClient)
    {
        TcpClient tcpClient = oTcpClient as TcpClient;
        if (tcpClient == null)
            return;

        using (tcpClient)
        {
            using (NetworkStream stream = tcpClient.GetStream())
            {
                stream.ReadTimeout = Timeout.Infinite;
                stream.WriteTimeout = Timeout.Infinite;

                BinaryReader bodyReader = new BinaryReader(stream);

                while (tcpClient.Connected)
                {
                    try
                    {
                        int messageType = bodyReader.ReadInt32();

                        // checks to verify messages 

                        // enqueue message in incoming queue
                        _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
                    }
                    catch (EndOfStreamException ex)
                    {
                        // log
                        break;
                    }
                    catch (Exception ex)
                    {
                        // log
                        Thread.Sleep(100);
                    }
                }
                //RaiseDisconnected(tcpClient);
            }
        }
    }


    public virtual void Start()
    {
        _InCommingThread = new Thread(HandleInCommingMessnge);
        _InCommingThread.Priority = _threadPriority;
        _InCommingThread.IsBackground = true;
        _InCommingThread.Start();

        _OutGoingThread = new Thread(HandleOutgoingQueue);
        _OutGoingThread.Priority = _threadPriority;
        _OutGoingThread.IsBackground = true;
        _OutGoingThread.Start();
    }


    public virtual void Stop()
    {
       // stop the threads and free up resources
    }

    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
    {
        tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
    }
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
    {
        foreach (TcpClient tcpClient in tcpClientList)
        {
            _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
        }
    }
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
    {
        _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
    }


    private void HandleOutgoingQueue()
    {
        while (true)
        {
            try
            {

                MessageToSend message = _OutgoingMessageQueue.Take();

                if (message.tcpClient.Connected)
                {
                    BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
                    writer.Write(message.type);
                }
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    private void HandleInCommingMessnge()
    {
        while (true)
        {
            try
            {
                MessageReceived messageReceived = _InComingMessageQueue.Take();

                // handle message
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                // log
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    public class MessageReceived
    {
        public MessageReceived(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class MessageToSend
    {
        public MessageToSend(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class TcpClientData
    {
        public Thread tcpClientThread;
        public TcpClient tcpClient;
    }
}

person Umer Azaz    schedule 10.01.2013    source источник
comment
Он не должен блокироваться, а буферы независимы друг от друга - используют ли потоки какой-то lock вокруг метода записи?   -  person C.Evenhuis    schedule 10.01.2013
comment
Я добавил пример кода, и замков нет. Спасибо.   -  person Umer Azaz    schedule 10.01.2013


Ответы (1)


Вы упомянули, что для каждого соединения создается отдельный поток, но показанный вами код, по-видимому, может удалить сообщение из очереди для любого соединения.

Если этот код выполняется в нескольких потоках, программа заблокируется, как только каждый поток в данный момент попытается отправить сообщение блокирующему соединению. Другая проблема, с которой вы можете столкнуться, если этот цикл выполняется в нескольких потоках, заключается в том, что сообщения могут не поступать в правильном порядке для одного и того же соединения.

person C.Evenhuis    schedule 10.01.2013
comment
Если вы посмотрите на метод HandleOutgoingQueue(), он работает в одном потоке и синхронно обрабатывает исходящие сообщения для каждого подключенного клиента. В то время как одно Write() блокируется, ни одно из других соединений не будет записано. - person C.Evenhuis; 10.01.2013