Улучшение синхронизации для реализации завершения нескольких потоков в определенном порядке.

Постановка задачи: автобус может начать движение в любом порядке, но прибудет к месту назначения в определенном порядке. Таким образом, автобус будет ждать, если его ранее ожидаемый автобус еще не прибыл.

После обнаружения нескольких пропущенных сигналов и их решения я наконец нашел решение ниже. Мои вопросы: во-первых, как мы можем изменить приведенный ниже код, чтобы сделать его надежным с точки зрения дизайна? Во-вторых, есть ли какой-либо другой механизм синхронизации в JAVA, который мы могли бы использовать для решения этой проблемы?

public class BusDestination implements Runnable {

private boolean[] busArrivalStatus;
private List<String> busSeq = new ArrayList<>();


private Lock lock = new ReentrantLock();
private Condition prevBusFlag = lock.newCondition();
private CountDownLatch latch;
boolean signalled = false;

public BusDestination(CountDownLatch latch) {
    busSeq.add("B1");
    busSeq.add("B2");
    busSeq.add("B3");
    busSeq.add("B4");
    busSeq.add("B5");

    busArrivalStatus = new boolean[busSeq.size()];
    for (int i = 0; i < this.busSeq.size(); i++) {
        busArrivalStatus[i] = false;
    }
    this.latch = latch;
}


public void run() {

    String busName = Thread.currentThread().getName();

    try{
        lock.lock();
        int busArrrivalSeq = busSeq.indexOf(busName);
        if(busArrrivalSeq==0){
            //first bus arrived
            System.out.println("********    Bus arriaved : "+busName);
            busArrivalStatus[0] = true;
            System.out.println(busName+" will signall");
            signalled = true;
            prevBusFlag.signalAll();
        } else {
            while(!isValidSeq(busName) && !signalled ){
                System.out.println(busName+" going to wait.");
                prevBusFlag.await();
            }
            System.out.println("********    Bus arriaved : "+busName);
            busArrivalStatus[busSeq.indexOf(busName)] = true;
            signalled = true;//getPreviousBusStatus(busName);
            prevBusFlag.signalAll();
        }
    } catch(Exception ex){
        ex.printStackTrace();
        System.out.println("Except--"+busName);
    }finally{
        lock.unlock();  
    }
    latch.countDown();
}

private boolean isValidSeq(String busName) {
    int prevIndex = busSeq.indexOf(busName)-1;
    if(!busArrivalStatus[prevIndex]){
        signalled = false;
    }
    return busArrivalStatus[prevIndex];
}

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(5);

    BusDestination destination = new BusDestination(latch);

    Thread b1 = new Thread(destination);
    Thread b2 = new Thread(destination);
    Thread b3 = new Thread(destination);
    Thread b4 = new Thread(destination);
    Thread b5 = new Thread(destination);

    b1.setName("B1");
    b2.setName("B2");
    b3.setName("B3");
    b4.setName("B4");
    b5.setName("B5");

    b4.start();
    b5.start();
    b3.start();
    b1.start();
    b2.start();

    latch.await();
}

}


person javaq    schedule 19.06.2016    source источник


Ответы (2)


Это можно упростить, используя только экземпляр CountDownLatch и не требуя экземпляра блокировки. Проблема, которую вы пытаетесь решить, заключается в том, что все потоки n могут выполняться одновременно, но в определенный момент поток t должен ждать, пока поток t-1 не достигнет определенной точки. Вот мое решение.

public class BusDestination {

    private CountDownLatch pre, next;

    public BusDestination(CountDownLatch pre, CountDownLatch next) {
         this.pre = pre;
         this.next = next;
    }

    public void run() {
          // Do work....
          pre.await(); // wait until prior thread{s} are done
          // If required, do more work...
          pre.countDown(); // notify next set of thread{s} that you are done
          // If required, do more work...
    }

    public static void main(String args[]) {
         CountDownLatch first = new CountDownLatch(0); // await will return right away.
         CountDownLatch second = new CountDownLatch(1);
         // Continue creating CountDownLatch....
         CountDownLatch preLast =  new CountDownLatch(1);
         CountDownLatch last = new CountDownLatch(1); // used to wait by the main thread.

         BusDestination firstBus = new BusDestination(first, second);
         // create more destinations
         BusDestination lastBus = new BusDestination(preLast, last);

         // start threads....

        last.await(); // Waits until all of the threads complete

}

Хорошая особенность этого решения заключается в том, что вы можете создать конвейер, в котором шаг n должен будет ждать завершения двух или более потоков на шаге n-1, прежде чем продолжить.

person Claudio Corsi    schedule 19.06.2016

В зависимости от того, чего вы пытаетесь достичь, этот вопрос может вам помочь: created-started">Порядок запуска потоков в том порядке, в котором они были созданы/запущены Однако ответы на этот вопрос сосредоточены на синхронизации после того, как потоки выполнили свою работу, что не совсем то, что вам нужно.

Кроме того, не является ли CountDownLatch избыточным? Если вы синхронизируете свой поток с signal/await, вам все еще нужно ждать их завершения одновременно?

Моя идея заключалась в том, чтобы использовать очередь, которая определяет порядок автобусов. Поток просматривает очередь (не удаляет ее), если настала его очередь, если удаляет элемент из очереди и сигнализирует другим потокам. Поскольку одновременный доступ к очереди осуществляется только в критической секции, можно использовать простой LinkedList.

Проверка if/else для busArrrivalSeq==0 довольно избыточна, на самом деле они делают очень похожие вещи. С помощью очереди его можно упростить до одного случая.

public class BusDestination implements Runnable {


    private Lock lock = new ReentrantLock();
    private Condition prevBusFlag = lock.newCondition();
    private Queue<String> busSeq;

    public BusDestination(Queue<String> busSeq) {

        this.busSeq = busSeq;
    }


    public void run() {

        String busName = Thread.currentThread().getName();

        try {
            lock.lock();

                while (!busSeq.peek().equals(busName) ) {
                    System.out.println(busName + " going to wait.");
                    prevBusFlag.await();
                }
                System.out.println("********    Bus arriaved : " + busName);
                busSeq.remove();

                prevBusFlag.signalAll();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("Except--" + busName);
        } finally {
            lock.unlock();
        }

    }

    public static void main(String[] args) throws InterruptedException {

        Queue<String> busSeq = new LinkedList<>();
        busSeq.add("B1");
        busSeq.add("B2");
        busSeq.add("B3");
        busSeq.add("B4");
        busSeq.add("B5");

        BusDestination destination = new BusDestination(busSeq);

        Thread b1 = new Thread(destination);
        Thread b2 = new Thread(destination);
        Thread b3 = new Thread(destination);
        Thread b4 = new Thread(destination);
        Thread b5 = new Thread(destination);

        b1.setName("B1");
        b2.setName("B2");
        b3.setName("B3");
        b4.setName("B4");
        b5.setName("B5");

        b4.start();
        b5.start();
        b3.start();
        b1.start();
        b2.start();

    }
}
person user140547    schedule 19.06.2016