Блокирующая очередь+Поток+Последовательность выполнения потоков

У меня есть очередь блокировки, которая состоит из моих объектов потока. Для меня важен порядок, в котором формировались эти темы. Также каждый поток связан с ключом. Итак, что я хотел сделать, так это то, что если поток для ключа запущен, все остальные потоки для ключа должны быть заблокированы. Но когда поток завершает свое выполнение, поток с тем же ключом в очереди, но с самым старым, должен быть уведомлен и выполнен. Поэтому я планировал создать очередь блокировки хэш-карты, где в хэш-карте мой ключ является ключом, а значение — моим объектом потока. Моя проблема 1. КАК мне искать поток, связанный с определенным ключом в моей очереди. 2. Как уведомить этот поток о выполнении сейчас.


person dtyagi    schedule 04.04.2015    source источник
comment
Как насчет того, чтобы создать очередь блокировки ваших ключевых объектов вместо Thread objects. Каждая запись в очереди блокировки представляет собой List<Thread>. Теперь, если несколько потоков хотят обработать этот ключ и если он находится в BlockingQueue, вы просто добавляете его в список. Таким образом, обработка каждого ключа происходит в том порядке, в котором для него запланирован поток. Как только один поток для этого ключа будет выполнен, вы можете начать со следующего (нет необходимости уведомлять или блокировать, поскольку теперь только один поток обрабатывает один ключ за раз). Надеюсь, это имеет смысл   -  person Neo    schedule 04.04.2015
comment
Привет. Спасибо за ответ. Не могли бы вы предоставить пример сортировки кода.....   -  person dtyagi    schedule 04.04.2015


Ответы (1)


Вот о чем я говорил. Это должно дать вам достаточно подсказок. Это не тестировалось и не заботится о синхронизации, которая может вам понадобиться в зависимости от вашего варианта использования.

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

// E - Element of type key.
public class Design1<E> {

  private BlockingQueue<List<MyThread<E>>> myQueue;

  // Provides a quicker lookup for the threads for a certain key instead of checking all elements in Queue.
  private Map<E, List<MyThread<E>>> myKeyMap;

  public Design1() {
    // Initialize both myQueue and myKeyMap here.
    myQueue = new ArrayBlockingQueue<>(100);
    myKeyMap = new HashMap<>();
  }

  class MyThread<K> extends Thread {
    K key;

    public K getKey() {
      return key;
    }
    public void run() {
      // Process the key

    }
  }

  public void addToQueue(E key) {
    List<MyThread<E>> keyThreads = myKeyMap.get(key);
    if (keyThreads == null) {
      keyThreads = new ArrayList<>();
    }

    // Create new thread for this key
    MyThread<E> t = new MyThread<E>();
    keyThreads.add(t);

    myKeyMap.put(key, keyThreads);

    try {
      // Block if full.
      myQueue.put(keyThreads);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

  public void processQueue(E key) {
    try {
      while (true) {
        // Fetch elems from Queue - block if empty
        List<MyThread<E>> keyThreads =  myQueue.take();
        E myKey = null;
        while (keyThreads.size() > 0) {
          // Process all the threads for the same key
          MyThread<E> thread = keyThreads.remove(0);
          myKey = thread.getKey();
          thread.start();
        }
        if (myKey != null) {
          // Clean up hashmap entry too.
          myKeyMap.remove(myKey);
        }
      }
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
person Neo    schedule 04.04.2015