Как на самом деле работает ForkJoinPool#awaitQuiescence?

У меня есть следующая реализация RecursiveAction, единственная цель этого класса - печатать от 0 до 9, но из разных потоков, если это возможно:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }
}

И я подумал, что вызов awaitQuiescence заставит текущий поток ждать, пока все задачи (отправленные и разветвленные) не будут выполнены:

public class Main {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.execute(new MyRecursiveAction(0));
        System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
    }
}

Но я не всегда получаю правильный результат, вместо того, чтобы печатать 10 раз, печатает от 0 до 10 раз.

Но если я добавлю helpQuiesce к своей реализации RecursiveAction:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();//here
    }
}

Все работает нормально.

Я хочу знать, чего собственно awaitQuiescence ждете?


person zexed640    schedule 15.03.2021    source источник


Ответы (2)


Вы получаете представление о том, что происходит, когда вы меняете System.out.println(num); на System.out.println(num + " " + Thread.currentThread());

Это может напечатать что-то вроде:

0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,5,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,5,main]

Когда awaitQuiescence обнаруживает, что есть ожидающие задачи, он помогает, украв одну из них и выполнив ее напрямую. Его документация гласит:

Если вызывается ForkJoinTask, работающим в этом пуле, эквивалентен ForkJoinTask.helpQuiesce(). В противном случае ожидает и/или пытается помочь в выполнении задач, пока этот пул не будет переведен в состояние Quiescent() или пока не истечет указанный тайм-аут.

Выделение добавлено мной

Это происходит здесь, как мы видим, задача печатает «main» в качестве своего исполняемого потока. Затем поведение fork() указывается как:

Организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если применимо, или с использованием ForkJoinPool.commonPool(), если не inForkJoinPool().

Поскольку поток main не является рабочим потоком ForkJoinPool, fork() отправит новую задачу в commonPool(). С этого момента fork(), вызванный из рабочего потока общего пула, также отправит следующую задачу в общий пул. Но awaitQuiescence, вызванный в пользовательском пуле, не ждет завершения задач общего пула, и JVM завершает работу слишком рано.

Если вы скажете, что это неправильный дизайн API, я не буду возражать.

Решение состоит в том, чтобы использовать awaitQuiescence только для общего пула¹. Обычно RecursiveAction, который отделяет подзадачи, должен ждать их завершения. Затем вы можете дождаться завершения корневой задачи, чтобы дождаться завершения всех связанных задач.

Вторая половина этого ответа содержит пример такой реализации RecursiveAction.

¹ awaitQuiescence полезно, когда у вас нет реальных фьючерсов, например, с параллельным потоком, который отправляется в общий пул.

person Holger    schedule 16.03.2021

Все работает нормально.

Нет, вам повезло, что это сработало, когда вы вставили:

RecursiveAction.helpQuiesce();

Чтобы объяснить это, давайте немного изменим ваш пример:

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }

}


public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}

Если вы запустите это, вы заметите, что получаете результат, который ожидаете получить. И на это есть две основные причины. Во-первых, метод fork выполнит задачу в common pool, как уже объяснялось в другом ответе. Во-вторых, потоки в общем пуле являются потоками daemon. JVM не ждет их завершения перед выходом, она существует раньше. Так что, если это так, вы можете спросить, почему это работает. Это происходит из-за этой строки:

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));

что заставляет поток main (который не является потоком демона) спать в течение двух секунд, давая достаточно времени для ForkJoinPool для выполнения вашей задачи.

Теперь давайте изменим код ближе к вашему примеру:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}

в частности, вы используете: forkJoinPool.awaitQuiescence(...), который задокументирован как:

В противном случае ждет и/или пытается помочь в выполнении задач...

Он не говорит, что будет обязательно ждать, он говорит, что будет ждать и/или попытку..., в данном случае это больше или, чем и. Таким образом, он попытается помочь, но все равно не будет ждать завершения всех задач. Это странно или даже глупо?

Когда вы вставляете RecursiveAction.helpQuiesce();, вы в конечном итоге вызываете один и тот же awaitQuiescence (с другими аргументами) под капотом - так что по существу ничего не меняется; основная проблема осталась:

static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);

public static void main(String[] args) {
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
    System.out.println(res.get());
}

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10_000) {
            res.incrementAndGet();
            System.out.println(num + " thread : " + Thread.currentThread().getName());
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();

    }

}

Когда я запускаю это, оно никогда не печатает 10000, показывая, что вставки этой строки ничего не меняют.


Обычный способ обработки таких вещей по умолчанию — fork, а затем join. И еще один join в звонилке, на ForkJoinTask, который вы получаете при вызове submit. Что-то вроде:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
    task.join();
}

static class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            MyRecursiveAction ac = new MyRecursiveAction(num + 1);
            ac.fork();
            ac.join();
        }
    }
} 
person Eugene    schedule 10.06.2021