Перестановка буферов в потоках с одним писателем и несколькими считывателями

История

Существует поток записи, периодически собирающий данные откуда-то (в режиме реального времени, но это не имеет большого значения в вопросе). Многие читатели читают эти данные. Обычное решение для этого — с двумя блокировками чтения-записи и двумя буферами, подобными этому:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Or

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

Проблема

В обоих методах, если операция получить другую блокировку завершается неудачно, замена не выполняется, и модуль записи перезаписывает свои предыдущие данные (поскольку модуль записи работает в режиме реального времени, он не может ждать чтения). Так что в этом случае , все читатели потеряют этот фрейм данных.

Впрочем, это не такая уж большая проблема, считыватели — это мой собственный код, и они короткие, поэтому с двойным буфером эта проблема решена, а если бы была проблема, я мог бы сделать тройной буфер (или больше).

Проблема в задержке, которую я хочу минимизировать. Представьте случай 1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

В **здесь** другие читатели теоретически могли бы прочитать данные buffer0, если бы только писатель мог выполнить обмен после того, как читатель закончит работу, а не ждать следующего периода. В данном случае произошло то, что только из-за того, что один считыватель немного опоздал, все читатели пропустили один кадр данных, а проблемы можно было бы полностью избежать.

Случай 2 аналогичен:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0

Я пробовал смешивать решения, поэтому писатель пытается поменять местами буферы сразу после записи, а если это невозможно, сразу после пробуждения в следующем периоде. Что-то вроде этого:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

Теперь проблема с задержкой сохраняется:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

Опять же, в **этом месте** все читатели могут начать чтение buffer0, что является небольшой задержкой после того, как buffer0 было написано, но вместо этого им приходится ждать следующего периода записи.

Вопрос

Вопрос в том, как мне справиться с этим? Если я хочу, чтобы писатель выполнялся точно в желаемый период, ему нужно дождаться периода, используя функцию RTAI, и я не могу сделать это так, как

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

Это вносит джиттер. потому что несколько раз могут оказаться длиннее, чем ожидание следующего периода, поэтому писатель может пропустить начало своего периода.

Просто чтобы быть более ясным, вот что я хочу сделать:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

Что я уже нашел

Я нашел read-copy-update, который, насколько я понял, продолжает выделять память для буферов и освобождает их до тех пор, пока читатели не закончат с ними работу, что для меня невозможно по многим причинам. Во-первых, потоки распределяются между ядром и пользовательским пространством. Во-вторых, с RTAI вы не можете выделять память в потоке реального времени (потому что тогда ваш поток будет вызывать системные вызовы Linux и, следовательно, нарушать работу в реальном времени! (Не говоря уже о том, что использование собственной реализации RCU в Linux бесполезно из-за по тем же причинам)

Я также подумал о дополнительном потоке, который на более высокой частоте пытается поменять местами буферы, но это не кажется хорошей идеей. Во-первых, ему самому нужно будет синхронизироваться с писателем, а во-вторых, многие из этих писателей-читателей работают в разных частях параллельно, и один дополнительный поток для каждого писателя кажется слишком большим. Один поток для всех писателей кажется очень сложным в плане синхронизации с каждым писателем.


person Shahbaz    schedule 18.10.2011    source источник
comment
Почему модуль записи должен получить блокировку для нового буфера, прежде чем он сбросит блокировку для старого? Если ему не удается получить новую блокировку вовремя, это в основном означает, что читатели не могут идти в ногу, поэтому вы в конечном итоге потеряете данные, что бы вы ни делали, поэтому выбросьте кадр в писателе.   -  person richvdh    schedule 18.10.2011
comment
@richvdh Писателю необходимо получить блокировку перед удалением предыдущей, потому что, если она отбрасывает предыдущую, а другую блокировку нельзя взять, то писатель не может записывать ни в один из буферов, что имеет катастрофические последствия. Писатель должен быть всегда в рабочем состоянии и не блокироваться.   -  person Shahbaz    schedule 18.10.2011
comment
Дело не в том, что читатель не может идти в ногу, он будет успевать, но его вызовы могут быть не согласованы с писателем, поэтому может случиться так, что читатель начнет чтение прямо перед тем, как писатель захочет поменять местами буферы. В этой ситуации фактически текущее решение состоит в том, чтобы отбросить фрейм. Очевидно, что это нежелательно.   -  person Shahbaz    schedule 18.10.2011
comment
Кажется, теперь я вижу. Как говорит janneb, похоже, что тройная буферизация должна решить именно эту проблему.   -  person richvdh    schedule 18.10.2011
comment
@richvdh, см. мой комментарий к его ответу.   -  person Shahbaz    schedule 18.10.2011


Ответы (5)


Какой API вы используете для блокировки чтения-записи? У вас есть временная блокировка, например pthread_rwlock_timedwrlock? Если да, я думаю, что это решение вашей проблемы, как в следующем коде:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

Что здесь происходит, так это то, что для записывающего устройства на самом деле не имеет значения, ждет ли он блокировки или следующего периода, если он обязательно проснется до того, как наступит следующий период. Абсолютный тайм-аут обеспечивает это.

person chill    schedule 10.11.2011
comment
Да, у меня есть временная блокировка. Верно-верно! Это работает! Хотя мне нужно установить запас на tm для возможных ошибок, но он делает то, что мне нужно, почти идеально. Большое спасибо! :) - person Shahbaz; 10.11.2011

Разве это не именно та проблема, которую должна решить тройная буферизация. Итак, у вас есть 3 буфера, назовем их write1, write2 и read. Поток записи чередуется между записью в записи 1 и запись 2, гарантируя, что они никогда не блокируются и что последний полный кадр всегда доступен. Затем в потоках чтения в какой-то подходящий момент (скажем, непосредственно перед или после чтения кадра) буфер чтения меняется местами с доступным буфером записи.

Хотя это гарантировало бы, что писатели никогда не блокируются (переключение буфера может быть выполнено очень быстро, просто перевернув два указателя, возможно, даже с помощью CAS atomic вместо блокировки), все еще остается проблема, связанная с тем, что читатели должны ждать, пока другие читатели закончат с буфером чтения перед переворачиванием. Я полагаю, что это можно было бы решить немного в стиле RCU с помощью пула буферов чтения, где можно перевернуть доступный.

person janneb    schedule 18.10.2011
comment
Читатели ждут друг друга - не проблема. По завершении они могут просто перейти к последнему обновленному буферу. Однако именно здесь тройной буфер также может дать сбой. Если считыватель номер один читает буфер1, а считыватель номер два читает буфер2, то модуль записи не может переключиться с буфера0. Хотя я должен признать, что это маловероятно, я не могу рисковать, потому что программа работает в режиме реального времени и не может полагаться на случайность. - person Shahbaz; 18.10.2011
comment
@Shahbaz: В схеме тройной буферизации, которую я описал выше в первом абзаце, этого не может произойти, поскольку всегда есть два буфера записи. Вместо этого читатели будут блокировать друг друга. - person janneb; 18.10.2011
comment
Ах да, я думал о том же, но о другом. Я добавил +1 к вашему ответу, потому что он помогает мне думать о способе, но все равно не решает проблему. - person Shahbaz; 18.10.2011

  • Использовать очередь (связанный список FIFO)
  • Модуль записи в реальном времени всегда будет добавлять (ставить в очередь) в конец очереди
  • Читатели всегда будут удалять (извлекать из очереди) из начала очереди
  • Читатели будут блокироваться, если очередь пуста

изменить, чтобы избежать динамического размещения

Я бы, вероятно, использовал циклическую очередь... Я бы использовал встроенные атомарные операции __sync. http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html#Atomic-Builtins

  • Круговая очередь (массив FIFO 2d)
    • ex: byte[][] Array = new byte[MAX_SIZE][BUFFER_SIZE];
    • Start and End index pointers
  • Writer перезаписывает буфер в массиве[End][]
    • Writer can increment Start if it ends up looping all the way around
  • Читатель получает буфер из Array[Start][]
    • Reader blocks if Start == End
person Louis Ricci    schedule 18.10.2011
comment
Боюсь, как я упоминал в тексте вопроса, это абсолютно невозможно, потому что я не могу динамически распределять память в потоке реального времени с использованием RTAI. - person Shahbaz; 18.10.2011
comment
В этом случае вам нужно будет использовать круговую очередь как двумерный массив. я обновлю свой ответ - person Louis Ricci; 18.10.2011
comment
Чем круговая очередь отличается от его текущего подхода с двойной буферизацией (т. е. очереди с двумя элементами)? Я не думаю, что это ответ на вопрос... - person Nemo; 18.10.2011
comment
@Nemo - вы можете реализовать циклическую очередь, чтобы блокировать (синхронизировать) только читателей (несколько). Писателю (в единственном числе) будет разрешено двигаться без блокировки. Если писатель каждый раз доходит до точки, где он перекрывает читатели, он будет подталкивать указатель чтения вперед (используя атомарную операцию), по существу отбрасывая (отбрасывая) самый старый буфер. - person Louis Ricci; 18.10.2011
comment
@LastCoder, как упомянул Немо, вы предлагаете именно то, что я уже сделал. Раньше это был двойной буфер, но теперь я расширил его до нескольких буферов. Однако в целом этот метод не годится, потому что вы не можете увеличить количество читателей, не создавая проблем. - person Shahbaz; 19.10.2011

Если вы не хотите, чтобы писатель ждал, возможно, ему не следует устанавливать блокировку, которую может удерживать кто-то другой. Тем не менее, я бы заставил его выполнить какую-то синхронизацию, чтобы убедиться, что то, что он пишет, действительно записано - как правило, большинство вызовов синхронизации вызывают выполнение инструкции очистки памяти или барьера, но детали будут зависеть от модели памяти. вашего процессора и реализации вашего пакета потоков.

Я бы посмотрел, нет ли вокруг какого-либо другого примитива синхронизации, который лучше подходит, но если дело дойдет до драки, я бы заблокировал и разблокировал блокировку записи, которую никто другой никогда не использует.

Затем читатели должны быть готовы время от времени пропускать что-то и должны уметь определять, когда они что-то пропустили. Я бы связал флаг достоверности и счетчик длинной последовательности с каждым буфером, а писатель сделал бы что-то вроде «очистить флаг достоверности, увеличить счетчик последовательности, синхронизировать, записать в буфер, увеличить счетчик последовательности, установить флаг достоверности, синхронизировать». Если читатель считывает количество последовательностей, синхронизируется, видит, что флаг достоверности верен, считывает данные, синхронизируется и повторно считывает тот же самый счетчик последовательностей, то, возможно, есть некоторая надежда, что он не получил искаженных данных.

Если вы собираетесь это сделать, я бы проверил это исчерпывающе. Мне это кажется правдоподобным, но может не работать с вашей конкретной реализацией всего, от компилятора до модели памяти.

Еще одна идея или способ проверить это — добавить контрольную сумму в свой буфер и записать ее в последнюю очередь.

См. также поиск по алгоритмам без блокировки, таким как http://www.rossbencina.com/code/lockfree

Чтобы пойти с этим, вы, вероятно, хотите, чтобы писатель подавал сигнал спящим читателям. Для этого вы можете использовать семафоры Posix - например. пусть читатель попросит писателя вызвать sem_post() для определенного семафора, когда он достигнет заданного порядкового номера или когда буфер станет действительным.

person mcdowella    schedule 18.10.2011
comment
То, что я делаю, похоже на вашу первую идею, но проще. Представьте себе множество буферов, записывающее устройство при каждой записи переключается в буфер, который не читает ни один читатель. Блокирует его. На нем пишет. При следующем обмене сообщает читателям, что последним записанным буфером является тот. Проблема в том, что если все остальные буфера считываются другими считывателями, модуль записи не может поменять местами буферы и перезаписывает предыдущий, что означает, что все считыватели пропустят этот фрейм данных. - person Shahbaz; 19.10.2011
comment
О вашей второй идее разбудить читателей я тоже это сделал. Но это один из вариантов программы. Дело в том, что если писатель пишет с частотой 100 Гц, сигнализация считывателю означает, что читатель также будет читать с частотой 100 Гц. Но возможно, что одному читателю нужны данные с меньшей скоростью или даже непериодически. - person Shahbaz; 19.10.2011
comment
Я считаю, что предложенные здесь дополнительные функции справляются с этими проблемами. Writer никогда не останавливается в ожидании читателя, поэтому читатели, которые могут не отставать, получают все данные. Читатель, который не может идти в ногу с писателем, может пропускать буферы. Каждый читатель может проверить порядковые номера, чтобы узнать, какой самый последний действительный буфер. Если он не справился с этим, он может идти вперед. Если он справился с этим, он может попросить модуль записи опубликовать семафор, когда следующий буфер будет готов, а затем ожидать этого семафора. - person mcdowella; 19.10.2011
comment
это именно то, что я делаю прямо сейчас. Хотя это не решит проблему. Я не знаю, как объяснить это в письменном тексте достаточно хорошо, но вы можете мне доверять. - person Shahbaz; 19.10.2011

Другой вариант — придерживаться блокировки, но следить за тем, чтобы считыватели никогда не зависали слишком долго, удерживая блокировку. Читатели могут сократить время, затрачиваемое на удержание блокировки, и сделать его предсказуемым, ничего не делая, пока они удерживают эту блокировку, а копируя данные из буфера записи. Единственная проблема заключается в том, что чтение с низким приоритетом может быть прервано задачей с более высоким приоритетом на полпути записи, и лекарством от этого является http://en.wikipedia.org/wiki/Priority_ceiling_protocol.

Учитывая это, если поток записи имеет высокий приоритет, в худшем случае работа, которую нужно выполнить для каждого буфера, — это чтобы поток записи заполнил буфер, а каждый поток чтения скопировал данные из этого буфера в другой буфер. Если вы можете себе это позволить в каждом цикле, то поток записи и некоторый объем данных чтения будут всегда завершены, в то время как читатели, обрабатывающие скопированные данные, могут выполнить свою работу, а могут и не выполнить. Если они этого не сделают, они отстанут и заметят это, когда в следующий раз захватят блокировку и оглянутся, чтобы увидеть, какой буфер они хотят скопировать.

FWIW, мой опыт чтения кода в реальном времени (когда требуется показать, что ошибки есть, а не в нашем коде) заключается в том, что это невероятно и намеренно просто, очень четко изложено и не обязательно более эффективно, чем это необходимо уложиться в сроки, поэтому некоторое явно бессмысленное копирование данных для того, чтобы заставить работать простую блокировку, может быть хорошей сделкой, если вы можете себе это позволить.

person mcdowella    schedule 19.10.2011
comment
Я позволяю себе это. На самом деле, я так и делаю, и именно так, как вы сказали, я сокращаю читатель. Читатели также профилируют свое время. Позвольте мне прояснить одну вещь. При том количестве считывателей, которое может быть в моей системе, я могу назначить (статически, но установить в модуле insmod) достаточно большое количество буферов, чтобы свести вероятность того, что буферы не будут заменены, почти до нуля. Мой вопрос был в основном о том, как я могу разблокировать блокировку записи в последнем записанном буфере, как только последний читатель закончит чтение из буфера, в который писатель хочет записать? (Звучит сложно, правда? ;) (также, позволив писателю wait_period) - person Shahbaz; 20.10.2011