Производитель/Потребитель, проблема с буфером потока

Я пытаюсь написать буферный менеджер, который управляет 3 потоками. Типичное использование будет с медленным производителем и быстрым потребителем. Идея трех буферов заключается в том, что производитель ВСЕГДА имеет буфер для записи, а потребитель ВСЕГДА получает самые последние произведенные данные.

Теперь у меня уже есть это, и это работает.

namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private Stream writebuf; ///<value>The stream used for writing</value>
        private Stream readbuf; ///<value>The stream used for reading</value>
        private Stream swapbuf; ///<value>The stream used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers
        /// </summary>
        public YariIFStream()
        {
            sync = new Object();
            eerste = true;

            writebuf = new MemoryStream();
            readbuf = new MemoryStream();
            swapbuf = new MemoryStream();
        }

        /// <summary>
        /// Returns the stream with the buffer with new data ready to be read
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetReadBuffer()
        {
            lock (sync)
            {
                Monitor.Wait(sync);
                Stream tempbuf = swapbuf;
                swapbuf = readbuf;
                readbuf = tempbuf;
            }
            return readbuf;
        }

        /// <summary>
        /// Returns the stream with the buffer ready to be written with data
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetWriteBuffer()
        {
            lock (sync)
            {
                Stream tempbuf = swapbuf;
                swapbuf = writebuf;
                writebuf = tempbuf;
                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;

                }
            }
            //Thread.Sleep(1);
            return writebuf;
        }

    }
}

Первая проверка используется, потому что при первом запросе буфера записи он не может отправить импульс потребителю, потому что буфер все еще должен быть записан данными. Когда буфер записи запрашивается во второй раз, мы можем быть уверены, что предыдущий буфер содержит данные.

У меня есть два потока, один производитель и один потребитель. Это мой вывод:

prod: uv_hjd`alv   cons: N/<]g[)8fV
prod: N/<]g[)8fV   cons: 5Ud*tJ-Qkv
prod: 5Ud*tJ-Qkv   cons: 4Lx&Z7qqjA
prod: 4Lx&Z7qqjA   cons: kjUuVyCa.B
prod: kjUuVyCa.B

Теперь все в порядке, потребитель отстает на один, так и должно быть. Как видите, я теряю свою первую строку данных, что является моей главной проблемой.

Другие проблемы таковы:

  • если я удалю первую проверку, она работает. Но по моему не должно...
  • если я добавлю Thread.Sleep(1); в GetWriteBuffer() это также работает. Что-то я не понимаю.

Заранее спасибо за любое просветление.


person Xtreme_Machine    schedule 26.07.2010    source источник


Ответы (1)


Я исправил свою проблему. Я заменил все экземпляры Stream на byte[]. Теперь он работает нормально. Не знаю, почему Stream не работает, не хочу тратить больше времени на выяснение этого.

Вот новый код для тех, кто сталкивается с той же проблемой.

/// <summary>
/// This namespace provides a crossthread-, concurrentproof buffer manager. 
/// </summary>
namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private byte[] writebuf; ///<value>The buffer used for writing</value>
        private byte[] readbuf; ///<value>The buffer used for reading</value>
        private byte[] swapbuf; ///<value>The buffer used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers with a initial capacity as specified
        /// </summary>
        /// <param name="capacity">Initial capacity of the buffers</param>
        public YariIFStream(int capacity)
        {
            sync = new Object();
            firsttime = true;

            writebuf = new byte[capacity];
            readbuf = new byte[capacity];
            swapbuf = new byte[capacity];
        }

        /// <summary>
        /// Returns the buffer with new data ready to be read
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetReadBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                Monitor.Wait(sync);
                tempbuf = swapbuf;
                swapbuf = readbuf;
            }
            readbuf = tempbuf;

            return readbuf;
        }

        /// <summary>
        /// Returns the buffer ready to be written with data
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetWriteBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                tempbuf = swapbuf;
                swapbuf = writebuf;

                writebuf = tempbuf;

                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;
                }
            }
            return writebuf;
        }
    }
}
person Xtreme_Machine    schedule 28.07.2010
comment
Почему 4 буфера? Для подкачки вам нужен только буфер чтения, буфер записи и временный буфер. - person eat_a_lemon; 26.03.2011
comment
Нет синхронизации между Pulse и Wait. Класс Monitor не поддерживает состояние, указывающее, что был вызван метод Pulse. Таким образом, если вы вызываете Pulse, когда нет ожидающих потоков, следующий поток, который вызывает Wait, блокируется, как если бы Pulse никогда не вызывался. - person eat_a_lemon; 26.03.2011