Как заблокировать, пока BlockingQueue не станет пустой?

Я ищу способ заблокировать, пока BlockingQueue не станет пустым.

Я знаю, что в многопоточной среде, пока есть производители, помещающие элементы в BlockingQueue, могут быть ситуации, в которых очередь становится пустой, а через несколько наносекунд заполняется элементами.

Но если есть только один производитель, то он может захотеть подождать (и заблокировать), пока очередь не опустеет после того, как он перестанет помещать элементы в очередь.

Java/псевдокод:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

Есть ли у вас какие-либо идеи?

EDIT: я знаю, что обертка BlockingQueue и использование дополнительного условия сделают свое дело, я просто спрашиваю, есть ли готовые решения и/или лучшие альтернативы.


person gd1    schedule 03.03.2013    source источник
comment
Очевидно, что вы можете вызывать peek до тех пор, пока он не вернет null. Что такого в блокировке, что делает это решение неприемлемым?   -  person OldCurmudgeon    schedule 03.03.2013
comment
Ответ на ваше редактирование, AFAIK, нет. Обратите внимание, что, как я уже писал в своем ответе, ваш вариант использования очень своеобразен ... убедитесь, что вам действительно нужно делать то, о чем вы просите. Вы не указываете, почему вам нужно такое поведение.   -  person João Fernandes    schedule 03.03.2013
comment
@JoãoFernandes: мне это сейчас не особо нужно, просто из любопытства. Мне нравится читать мнения по вопросам программирования.   -  person gd1    schedule 03.03.2013
comment
Вот это хороший аргумент :) Подождем еще ответов, чтобы узнать, знает ли кто-то что-то, о чем мы не знаем.   -  person João Fernandes    schedule 03.03.2013


Ответы (6)


Простое решение с использованием wait() и notify():

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}
person niculare    schedule 03.03.2013
comment
Привет, @niculare, может ли этот код вызвать взаимоблокировку? Два фрагмента кода блокируют один и тот же объект queue, что не позволяет производителю и потребителю выполнять свою работу одновременно. Таким образом, очередь может никогда не оказаться пустой, пока производитель ожидает ее. - person liuyaodong; 22.05.2014
comment
@liuyaodong Нет, прочитайте Javadoc для Object.wait(). Он освобождает блокировку до тех пор, пока другой поток не вызовет notify. В нем даже прямо говорится, что текущий поток должен владеть монитором этого объекта. - person Timmos; 25.08.2015
comment
Как это будет работать? Скажем, потребитель запускается первым и блокируется в queue.get(), потому что в очереди еще нет элемента. Теперь производитель не может производить, потому что не может получить блокировку. - person hankduan; 08.08.2017
comment
BlockingQueue не имеет get метода. Вы имеете в виду poll или take? - person Franz Wong; 03.06.2021
comment
Я имел в виду take, так как poll в данном контексте не имеет никакого значения. - person niculare; 09.06.2021

Я понимаю, что у вас уже может быть куча потоков, активно опрашивающих или занимающих очередь, но я все еще чувствую себя не совсем правильно в отношении вашего потока/дизайна.

Очередь становится пустой, это не означает, что ранее добавленные задачи завершены, некоторые элементы могут обрабатываться целую вечность, поэтому не слишком полезно проверять наличие пустых.

Итак, что вам нужно сделать, это забыть о BlockingQueue, вы можете использовать его как любые другие коллекции. Переведите элементы в Collections из Callable и используйте ExecutorService.invokeAll().

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

Этот подход даст вам полный контроль над тем, как долго ваш производитель может ждать, и надлежащей обработкой исключений.

person Henry Wong    schedule 11.07.2013

Это не совсем то, что вы хотите сделать, но использование SynchronousQueue будет иметь эффект, очень похожий на ваш Java/псевдокод, а именно блокировку производителя до тех пор, пока все данные не будут получены каким-либо потребителем.

Единственная разница заключается в том, что производитель блокирует каждый раз, пока потребитель не придет для получения данных, а не только один раз в конце. Не уверен, что в вашем случае это что-то изменит. Я ожидаю, что это будет иметь заметное значение только в том случае, если задача, выполняемая производителем, будет несколько дорогой.

person Garns    schedule 03.03.2013

Ваш вариант использования должен быть совершенно особенным, потому что чаще всего вы хотите заблокировать производителя только тогда, когда очередь заполнена, но не ждать, пока она опустеет.

Во всяком случае, это выполнимо. Я считаю, что вращение до тех пор, пока isEmpty не вернет true, не НАСТОЛЬКО неэффективно, потому что производитель будет вращаться локально, то есть будет обращаться к своему собственному кешу, а не стучать по шине. Однако это будет потреблять процессорное время, поскольку поток остается планируемым. Но местный спиннинг определенно проще. В противном случае я вижу два варианта:

  1. Использование wait + notify, как предложил @niculare
  2. Каким-то образом заставить первого потребителя, который заметит, что очередь пуста, уведомить производителя без блокировок; это будет медленнее, но более изящно деградирует
person João Fernandes    schedule 03.03.2013

Ответ @niculare кажется нормальным, но посмотрите комментарий @hankduan.

Я увидел этот вопрос, когда искал решение похожей проблемы.
В конце я более-менее переписал LinkedBlockingQueue.
Он имеет подмножество своих методов и не реализует Collection или Iterable.
Он управляет несколькими производителями и потребителями.
Для меня это работает.

/**********************************************************************************************************************
 * Import specifications
 *********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

/**********************************************************************************************************************
 * This class implements a completely reentrant FIFO.
 *********************************************************************************************************************/
public class BlockingFIFO<E>
{
  /********************************************************************************************************************
   * The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
   *******************************************************************************************************************/
  public BlockingFIFO()
  {
    // -----------------
    // Initialize object
    // -----------------
    this(Integer.MAX_VALUE);

  } // constructor

  /********************************************************************************************************************
   * The constructor creates an empty FIFO with the specified capacity.
   *
   * @param capacity_ipar The maximum number of elements the FIFO may contain.
   *******************************************************************************************************************/
  public BlockingFIFO(int capacity_ipar)
  {
    // ---------------------
    // Initialize attributes
    // ---------------------
    lock_attr = new ReentrantLock();
    not_empty_attr = lock_attr.newCondition();
    not_full_attr = lock_attr.newCondition();
    head_attr = null;
    tail_attr = null;
    capacity_attr = capacity_ipar;
    size_attr = 0;

  } // constructor

  /********************************************************************************************************************
   * This method removes all of the elements from the FIFO.
   *
   * @return The number of elements in the FIFO before it was cleared.
   *******************************************************************************************************************/
  public int clear()
  {
    // -----------------
    // Initialize result
    // -----------------
    int result;
    result = 0;

    // ----------
    // Clear FIFO
    // ----------
    lock_attr.lock();
    try
    {
      result = size_attr;
      head_attr = null;
      tail_attr = null;
      size_attr = 0;
      not_full_attr.signalAll();
    }
    finally
    {
      lock_attr.unlock();
    }

    // ----
    // Done
    // ----
    return (result);

  } // clear

  /********************************************************************************************************************
   * This method returns the number of elements in the FIFO.
   *
   * @return The number of elements in the FIFO.
   *******************************************************************************************************************/
  public int size()
  {
    // -----------
    // Return size
    // -----------
    lock_attr.lock();
    try
    {
      return (size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // size

  /********************************************************************************************************************
   * This method returns the number of additional elements that the FIFO can ideally accept without blocking.
   *
   * @return The remaining capacity the FIFO.
   *******************************************************************************************************************/
  public int remainingCapacity()
  {
    // -------------------------
    // Return remaining capacity
    // -------------------------
    lock_attr.lock();
    try
    {
      return (capacity_attr - size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // remainingCapacity

  /********************************************************************************************************************
   * This method waits for the FIFO to become empty.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public void waitEmpty()
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
        not_full_attr.await();
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for the FIFO to become empty.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum time to wait for the FIFO to become empty.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long    timeout_ipar,
                           TimeUnit unit_ipar)
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));

  } // waitEmpty

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
   * exceeding the queue's capacity.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar The element to add to the FIFO.
   * @return              True if and only if the element was added to the FIFO.
   *******************************************************************************************************************/
  public boolean offer(E element_ipar)
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      if (capacity_attr > size_attr)
      {
        push(element_ipar);
        return (true);
      }
      else
        return (false);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum number of milliseconds to wait for space to become available.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E    element_ipar,
                       long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      push(element_ipar);
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum time to wait for space to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E        element_ipar,
                       long     timeout_ipar,
                       TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ----------------------------
    // Try to add specified element
    // ----------------------------
    return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public void put(E element_ipar)
    throws InterruptedException
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
        not_full_attr.await();
      push(element_ipar);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // put

  /********************************************************************************************************************
   * This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E peek()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (head_attr.contents);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // peek

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E poll()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for an element to become available.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
      {
        if (!not_empty_attr.awaitUntil(deadline))
          return (null);
      }
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum time to wait for an element to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long     timeout_ipar,
                TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ------------------------
    // Try to get first element
    // ------------------------
    return (poll(unit_ipar.toMillis(timeout_ipar)));

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
   *
   * @return                      The head of the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public E take()
    throws InterruptedException
  {
    // ---------------------------
    // Try to return first element
    // ---------------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
        not_empty_attr.await();
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // take

  /********************************************************************************************************************
   * This class implements as node within the FIFO.
   *******************************************************************************************************************/
  private class Node
  {
    E    contents;
    Node next;

  } // class Node

  /********************************************************************************************************************
   * This method adds the specified element to the end of the FIFO.
   * <br>It sends a signal to all threads waiting for the FIFO to contain something.
   * <br>The caller should have locked the object and have made sure the list is not full.
   *******************************************************************************************************************/
  private void push(E element_ipar)
  {
    // -----------
    // Create node
    // -----------
    Node node;
    node = new Node();
    node.contents = element_ipar;
    node.next = null;

    // --------------
    // Add to the end
    // --------------
    if (head_attr == null)
      head_attr = node;
    else
      tail_attr.next = node;
    tail_attr = node;

    // ----------------------
    // We got another element
    // ----------------------
    size_attr++;
    not_empty_attr.signalAll();

  } // push

  /********************************************************************************************************************
   * This method removes the first element from the FIFO and returns it.
   * <br>It sends a signal to all threads waiting for the FIFO to have space available.
   * <br>The caller should have locked the object and have made sure the list is not empty.
   *******************************************************************************************************************/
  private E pop()
  {
    // ------------
    // Isolate node
    // ------------
    Node node;
    node = head_attr;
    head_attr = node.next;
    if (head_attr == null)
      tail_attr = null;

    // --------------------------
    // We removed another element
    // --------------------------
    size_attr--;
    not_full_attr.signalAll();

    // ----
    // Done
    // ----
    return (node.contents);

  } // pop

  /********************************************************************************************************************
   * This attribute represents the lock on the FIFO.
   *******************************************************************************************************************/
  private Lock lock_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being empty.
   *******************************************************************************************************************/
  private Condition not_empty_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being full.
   *******************************************************************************************************************/
  private Condition not_full_attr;

  /********************************************************************************************************************
   * This attribute represents the first element of the FIFO.
   *******************************************************************************************************************/
  private Node head_attr;

  /********************************************************************************************************************
   * This attribute represents the last element of the FIFO.
   *******************************************************************************************************************/
  private Node tail_attr;

  /********************************************************************************************************************
   * This attribute represents the capacity of the FIFO.
   *******************************************************************************************************************/
  private int capacity_attr;

  /********************************************************************************************************************
   * This attribute represents the size of the FIFO.
   *******************************************************************************************************************/
  private int size_attr;

} // class BlockingFIFO
person Robert Kock    schedule 17.04.2018

Согласно документации, BlockingQueue может быть заблокированным на "Удалить" методами: take() или poll(time, unit) вместо poll()

person Vitaly    schedule 20.06.2016