При множественных блокировках, когда первый поток заблокирован для первой задачи, как заставить второй поток не сидеть без дела и вместо этого заблокировать следующую задачу?

Класс процессора -

public class Processor extends Thread {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    private void doJob1() {
        synchronized (lock1) {
            System.out.println(Thread.currentThread().getName() + " doing job1");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " completed job1");
        }
    }

    private void doJob2() {
        synchronized (lock2) {
            System.out.println(Thread.currentThread().getName() + " doing job2");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " completed job2");
        }
    }

    public void run() {
        doJob1();
        doJob2();
    }
}

Основной метод -

    final Processor processor1 = new Processor();
    final Processor processor2 = new Processor();

    processor1.start();
    processor2.start();

Здесь при первом запуске либо Thread-0, либо Thread-1 берет блокировку job1(), а другой бездействует в течение 5 секунд.

После того как первый поток снимает блокировку с job1() и блокирует job2(), второй поток берет блокировку на job1().

Я хочу, чтобы второй поток не простаивал, и, поскольку job1() заблокирован первым потоком, вместо этого он должен заблокировать job2(), а затем заблокировать job1().

Как это сделать?

Примечание. Это базовый план. На самом деле я хочу, чтобы мой код работал, даже если есть 100 задач и 5 потоков.


person Payel Senapati    schedule 05.01.2021    source источник
comment
Вы действительно должны изменить свой дизайн. Поток не должен удерживать блокировку во время выполнения задания, а только тогда, когда он фактически манипулирует общим состоянием. Если вас беспокоит, как долго поток может ждать, пытаясь получить блокировку, проблема заключается в том, что какой-то другой поток слишком долго удерживает блокировку. Замки, как правило, просто не должны использоваться таким образом.   -  person David Schwartz    schedule 06.01.2021
comment
Чтобы использовать небольшую аналогию, предположим, что два человека делят машину. Если вы поставите замок на машину, то я даже не смогу проверить, свободна ли машина, не дожидаясь, пока другой человек вернет машину. Это не хороший дизайн. Вместо этого замок должен защищать ангар, в который мы кладем ключи, когда не пользуемся машиной. Приобретите замок, проверьте, есть ли там ключи (возможно, возьмите их, если они есть), снимите замок. Получите замок, верните ключи, откройте замок. Общее состояние — это то, у кого есть ключи, а не положение машины!   -  person David Schwartz    schedule 06.01.2021


Ответы (2)


Вот немного более сложный пример с объектом для задания и условной переменной, указывающей, было ли задание выполнено, а также примеры того, как оболочки могут адаптировать ReentrantLock к оператору try-with-resources.

/**
 * A Job represents a unit of work that needs to be performed once and
 * depends upon a lock which it must hold while the work is performed.
 */
public class Job {
    private final Runnable job;
    private final ReentrantLock lock;
    private boolean hasRun;

    public Job(Runnable job, ReentrantLock lock) {
        this.job = Objects.requireNonNull(job);
        this.lock = Objects.requireNonNull(lock);
        this.hasRun = false;
    }

    /**
     * @returns true if the job has already been run
     */
    public boolean hasRun() {
        return hasRun;
    }

    // this is just to make the test in Processor more readable
    public boolean hasNotRun() {
        return !hasRun;
    }

    /**
     * Tries to perform the job, returning immediately if the job has
     * already been performed or the lock cannot be obtained.
     *
     * @returns true if the job was performed on this invocation
     */
    public boolean tryPerform() {
        if (hasRun) {
            return false;
        }
        try (TryLocker locker = new TryLocker(lock)) {
            if (locker.isLocked()) {
                job.run();
                hasRun = true;
            }
        }
        return hasRun;
    }
}

/**
 * A Locker is an AutoCloseable wrapper around a ReentrantLock.
 */
public class Locker implements AutoCloseable {
    private final ReentrantLock lock;

    public Locker(final ReentrantLock lock) {
        this.lock = lock;
        lock.lock();
    }

    @Override
    public void close() {
        lock.unlock();
    }
}

/**
 * A TryLocker is an AutoCloseable wrapper around a ReentrantLock that calls
 * its tryLock() method and provides a way to test whether than succeeded.
 */
public class TryLocker implements AutoCloseable {
    private final ReentrantLock lock;

    public TryLocker(final ReentrantLock lock) {
        this.lock = lock.tryLock() ?  lock : null;
    }

    public boolean isLocked() {
        return lock != null;
    }

    @Override
    public void close() {
        if (isLocked()) {
            lock.unlock();
        }
    }
}

/**
 * A modified version of the Processor class from the question.
 */
public class Processor extends Thread {
    private static final ReentrantLock lock1 = new ReentrantLock();
    private static final ReentrantLock lock2 = new ReentrantLock();

    private void snooze(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void doJob1() {
        System.out.println(Thread.currentThread().getName() + " doing job1");
        snooze(5000);
        System.out.println(Thread.currentThread().getName() + " completed job1");
    }

    private void doJob2() {
        System.out.println(Thread.currentThread().getName() + " doing job2");
        snooze(5000);
        System.out.println(Thread.currentThread().getName() + " completed job2");
    }

    public void run() {
        Job job1 = new Job(() -> doJob1(), lock1);
        Job job2 = new Job(() -> doJob2(), lock2);

        List<Job> jobs = List.of(job1, job2);

        while (jobs.stream().anyMatch(Job::hasNotRun)) {
            jobs.forEach(Job::tryPerform);
        }
    }

    public static void main(String[] args) {
        final Processor processor1 = new Processor();
        final Processor processor2 = new Processor();

        processor1.start();
        processor2.start();
    }
}

Несколько заметок:

  • Метод run() в Processor теперь обобщается до списка из n заданий. Пока какие-либо задания не были выполнены, он попытается их выполнить, завершив работу после того, как все задания будут выполнены.
  • Класс TryLocker — это класс AutoCloseable, поэтому блокировку и разблокировку в Job можно выполнить, создав его экземпляр в операторе try-with-resources.
  • Класс Locker здесь не используется, но демонстрирует, как то же самое можно сделать для блокирующего вызова lock() вместо вызова tryLock().
  • TryLocker также может взять период времени и вызвать перегрузку tryLock, которая при желании ждет некоторое время, прежде чем сдаться; эта модификация оставлена ​​читателю в качестве упражнения.
  • Метод hasNotRun() из Job нужен только для того, чтобы сделать anyMatch(Job::hasNotRun) немного более читаемым в методе run() из Processor; это, вероятно, не тянет его вес, и от него можно было бы отказаться.
  • Классы шкафчиков не проверяют, что переданная блокировка не является нулевой, используя Objects.requireNonNull; они используют его сразу, вызывая для него метод, поэтому, если он равен нулю, они все равно будут генерировать NPE, но добавление явного requireNonNull может сделать их более понятными.
  • Классы шкафчиков не удосуживаются проверить, разблокировали ли они уже ReentrantLock перед вызовом unlock(), чтобы сделать их идемпотентными; в этом случае они выкинут IllegalMonitorStateException. В прошлом я написал вариант этого с переменной флага, чтобы избежать этого, но поскольку намерение состоит в том, чтобы использовать их в операторе try-with-resources, который вызовет метод close() только один раз, Я думаю, что лучше позволить им взорваться, если кто-то вручную вызовет метод close.
person David Conrad    schedule 05.01.2021
comment
Job::tryPerform показывает необработанное исключение - person Payel Senapati; 06.01.2021
comment
Я должен был поймать исключение там - person Payel Senapati; 06.01.2021
comment
@PayelSenapati Это не для меня. На какой линии? С какой версией Явы? - person David Conrad; 06.01.2021
comment
В классе Processor внутри блока while Job:tryPerform, который сопоставляется с классом Job tryPerform(), где я должен поставить попытку /catch - person Payel Senapati; 06.01.2021
comment
Версия Java — 11.0.9 Открытый JDK для Linux - person Payel Senapati; 06.01.2021
comment
Это странно, потому что Job::tryPerform не объявлен как генерирующий какие-либо исключения. У вас все еще есть doJob1() и doJob2(), вызывающие Thread.sleep() напрямую? Я изменил их, так как метод run() Runnable не может генерировать исключение. - person David Conrad; 06.01.2021
comment
Я сделал точно так, как вы сказали, - person Payel Senapati; 06.01.2021
comment
Может быть, что-то конкретное для Open JDK, а не для Oracle JDK? - person Payel Senapati; 06.01.2021
comment
Очень странный. Я не знаю, почему это было бы так. Это не требовалось ни для одной из версий JDK, которые я пробовал. - person David Conrad; 06.01.2021

Я полагаю, вы ищете способ получить блокировку, если это возможно, и сделать что-то еще, если это платно.

Вы можете сделать это с помощью ReentrantLock#tryLock.

Метод:

Получает блокировку только в том случае, если она не удерживается другим потоком во время вызова.

И он возвращает:

true если блокировка была свободна и была получена текущим потоком, или блокировка уже удерживалась текущим потоком; и false иначе

Вот модифицированная версия кода из вопроса:

  • если блокировка для первой задачи свободна, поток запустит первую задачу
  • иначе он запустит его после второго

Processor.java:

public class Processor extends Thread {
    private static final Lock lock1 = new ReentrantLock();
    private static final Lock lock2 = new ReentrantLock();

    @SneakyThrows
    private void doJob1() {
        System.out.println(Thread.currentThread().getName() + " doing job1");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + " completed job1");
    }

    @SneakyThrows
    private void doJob2() {
        System.out.println(Thread.currentThread().getName() + " doing job2");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + " completed job2");
    }

    public void run() {
        boolean executedFirst = false;
        if (lock1.tryLock()) {
            try {
                doJob1();
                executedFirst = true;
            } finally {
                lock1.unlock();
            }
        }
        try {
            lock2.lock();
            doJob2();
        } finally {
            lock2.unlock();
        }

        if (!executedFirst) {
            try {
                lock1.lock();
                doJob1();
            } finally {
                lock1.unlock();
            }
        }
    }

    public static void main(String[] args) {
        new Processor().start();
        new Processor().start();
    }
}

Пример вывода:

Thread-1 doing job2
Thread-0 doing job1
Thread-0 completed job1
Thread-1 completed job2
Thread-1 doing job1
Thread-0 doing job2
Thread-1 completed job1
Thread-0 completed job2

Обратите внимание, что вызовы lock/tryLock и unlock заключены в try/finally.


Переход в корзины и мячи:

Color.java:

public enum Color {
    RED,
    GREEN,
    BLUE;

    public static Color fromOrdinal(int i) {
        for (Color value : values()) {
            if (value.ordinal() == i) {
                return value;
            }
        }
        throw new IllegalStateException("Unknown ordinal = " + i);
    }
}

Basket.java:

@Data(staticConstructor = "of")
public class Basket {
    private final Color color;
    // balls that this basket has
    private final List<Ball> balls = new ArrayList<>();
    private final Lock lock = new ReentrantLock();
}

Ball.java

@Value(staticConstructor = "of")
public class Ball {
    Color color;
}

Boy.java

  • Каждый мальчик выбирает мяч (queue.poll())
  • Бежит к корзине (baskets.get(color) одного цвета)
  • And depending on behavior when a basket is occupied he:
    • throws ball away and tries again (option a in code)
    • ждет освобождения корзины (опция b)
  • Обратите внимание, что с опцией a некоторые boy могут закончиться между ними, когда другой выбрасывает ball, и никто еще не подобрал его (в любом случае, кто-то подберет его и положит в корзину)
@RequiredArgsConstructor
public class Boy implements Runnable {
    private final Map<Color, Basket> baskets;
    private final Queue<Ball> balls;

    @Override
    public void run() {
        Ball ball;
        while ((ball = balls.poll()) != null) {
            Color color = ball.getColor();
            Basket basket = baskets.get(color);
            // a
            if (basket.getLock().tryLock()) {
                try {
                    basket.getBalls().add(ball);
                } finally {
                    basket.getLock().unlock();
                }
            } else {
                balls.offer(ball);
            }

            // b
            /*
            try {
                basket.getLock().lock();
                basket.getBalls().add(ball);
            } finally {
                basket.getLock().unlock();
            }
             */
        }
    }
}

и, наконец, main:

Queue<Ball> balls = new LinkedBlockingQueue<>();
ThreadLocalRandom.current().ints(0, 3)
        .mapToObj(Color::fromOrdinal)
        .map(Ball::of)
        .limit(1000)
        .forEach(balls::add);
Map<Color, Basket> baskets = Map.of(
        Color.RED, Basket.of(Color.RED),
        Color.GREEN, Basket.of(Color.GREEN),
        Color.BLUE, Basket.of(Color.BLUE)
);
List<Thread> threads = IntStream.range(0, 100)
        .mapToObj(ignore -> new Boy(baskets, balls))
        .map(Thread::new)
        .collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
    thread.join();
}

baskets.forEach((color, basket) -> System.out.println("There are "
        + basket.getBalls().size() + " ball(-s) in " + color + " basket"));

Пример вывода:

There are 331 ball(-s) in GREEN basket
There are 330 ball(-s) in BLUE basket
There are 339 ball(-s) in RED basket
person caco3    schedule 05.01.2021
comment
Ударь меня. :) Я бы сделал еще один шаг, обернув задание и его условную переменную в объект и имея вспомогательный метод, который пытается заблокировать, выполнить и установить флаг, а затем вы можете перебирать их, пока они все не будут выполнены. . Я бы также создал оболочку Autocloseable, которая вызывает разблокировку, чтобы я мог использовать попытку с ресурсами, но это, очевидно, выходит за рамки этого вопроса и ответа. - person David Conrad; 05.01.2021
comment
Я также сначала подумал о более сложном подходе, но мне показалось, что ОП просто ищет tryLock :) - person caco3; 06.01.2021
comment
@DavidConrad, пожалуйста, добавьте ответ - person Payel Senapati; 06.01.2021
comment
@DenisZavedeev что такое @SneakyThrows? - person Payel Senapati; 06.01.2021
comment
@PayelSenapati, это из lombok — какая-то магия, позволяющая не объявлять проверенные исключения для выдачи - person caco3; 06.01.2021
comment
@DenisZavedeev здесь все работает нормально, но предположим, что работа намного сложнее, скажем, 10 заданий и 3 потока, такой подход сделает огромный и грязный код, есть ли лучший подход? - person Payel Senapati; 06.01.2021
comment
Это зависит от самой проблемы: хотите ли вы, чтобы все три thread выполняли каждую из десяти задач с наименьшей возможной блокировкой потоков без какого-либо гарантированного порядка выполнения задач? Или вы хотите что-то еще? - person caco3; 06.01.2021
comment
Мне нужна случайность, предположим, есть 1000 шариков чтения, зеленого, синего и три корзины красного, зеленого, синего. Шары нужно положить в корзины одного цвета. У меня 10 болл-боев, и в каждой корзине одновременно может находиться только 1 бол-бой. Также мальчик с мячом не пойдет в корзину 2, если корзина 1, как правило, занята, он пойдет либо в корзину 2, либо в корзину 3, т.е. любая корзина, которая свободна. - person Payel Senapati; 06.01.2021
comment
Что делать другим мальчикам? Мальчики берут корзину и никогда не отпускают ее, или они берут мяч, берут соответствующую корзину и кладут в нее мяч? А если корзина не свободна, то мяч выбрасывают? Я думаю, что ключом здесь в любом случае будет инкапсуляция заданий в некоторые Runnable и тщательное сопоставление Basket с соответствующим lock - person caco3; 06.01.2021
comment
@PayelSenapati Я добавил ответ с подробно проработанными идеями, о которых я упоминал выше. - person David Conrad; 06.01.2021
comment
@DenisZavedeev Каждый мальчик берет любой мяч наугад и бежит к корзине, 3 мальчика, которым удается занять корзину, занимают ее в любом порядке, не обязательно 1, 2 и 3, в то время как другие мальчики ждут, пока какая-либо корзина не освободится. Самое главное, что нет жесткой и быстрой корзины 1, 2, 3 - полная случайность, какую корзину сможет занять мальчик. - person Payel Senapati; 06.01.2021
comment
@PayelSenapati Я добавил пример кода, который пытается подражать мальчикам и корзинам. Надеюсь это поможет - person caco3; 06.01.2021
comment
@DenisZavedeev Я сожалею, что не могу проголосовать за ответ во второй раз за корзины и мячи. Ваше здоровье. - person David Conrad; 06.01.2021