Наращивание памяти при использовании CompletableFuture

class DataItemCache {
    private CompletableFuture future;

    public DataItemCache() {
        future = CompletableFuture.completedFuture(null);
    }

    public void saveItemAsync(Object dataItem) {
        future = future.thenRunAsync(() -> {
            saveItemSync(dataItem); // Saves the item to Elastic Search
        });
    }

    public void waitForWriteComplete() {
        future.get();
    }

Этот класс используется следующим образом:

class DataProcessorIntegrationTest {
    @Inject private DataItemCache dataItemCache;

    @Before
    public void setup() {
        // Setup Guice for injection
    }

    @Test
    public void testWorkflow() {
        int numItems = 1000;
        for (int index = 0; index < numItems; index++) {
            DataItem obj = ... // build data item
            dataItemCache.saveItemAsync(obj);
        }

        // I have code to periodically dump the heap during this wait

        dataItemCache.waitForWriteComplete();

        // assert that Elastic Search has 1000 items
    }
}

Количество элементов данных, выделенных непосредственно перед возвратом функции waitForWriteComplete(), равно 1000, а сразу после этого равно 0. Я ожидаю, что количество элементов данных будет меньше 1000, так как некоторые из них завершили запись в Elastic Search ( Я знаю это из журналов консоли).

Когда я запускаю этот код в производство, я получаю OOM. Проверка кучи во время OOM показывает миллионы объектов DataItem и CompletableFuture. Очевидно, что многие из них должны были завершить написание и вернуться из метода saveItem. Почему такие объекты DataItem и CompletableFuture не освобождаются?

Любые идеи о том, как решить эту проблему?


person Bonton255    schedule 18.01.2018    source источник
comment
Не с таким объемом информации. Как бы вы решили проблему, когда я помещаю много вещей в память, и у меня заканчивается память, у меня есть некоторые теории, но вы, вероятно, не должны им доверять. Псевдокод (правильно? это не ваш настоящий код, не так ли?) не показывает ничего полезного, кроме того, что вы, очевидно, знаете, как вызывать некоторые методы.   -  person Kayaman    schedule 18.01.2018
comment
Очевидно, что все, что вы делаете неправильно, находится в коде, который вы не предоставили. Предоставьте минимально воспроизводимый пример, демонстрирующий проблему.   -  person Jim Garrison    schedule 18.01.2018
comment
Как насчет того, чтобы показать ваш реальный код, но только соответствующие части. Если вы утверждаете, что dataItem и CompletableFutures не освобождаются, то покажите части кода, где вы ожидаете, что они станут подходящими для GC.   -  person Kayaman    schedule 18.01.2018
comment
Я добавил больше деталей к вопросу. Теперь стало яснее?   -  person Bonton255    schedule 18.01.2018
comment
future, который у вас есть в DataItemCache, бесполезен, он вам не нужен для запуска асинхронного кода, а ваш waitForWriteComplete() ничего не будет ждать, так как future уже завершен с самого начала. get() не ждет зависимых задач. На самом деле, я даже не думаю, что thenRunAsync() отслеживает какие-либо зависимые этапы после того, как будущее, на котором оно вызывается, было завершено (это может быть источником утечек памяти). В любом случае, до сих пор нет правильного кода, чтобы понять и воспроизвести проблему. Это все еще не минимально воспроизводимый пример.   -  person Didier L    schedule 18.01.2018
comment
@DidierL: обратите внимание на строку - future = future.thenRunAsync(). Итак, waitForWriteComplete() работает, как и ожидалось (подтверждено фактическим выполнением на производственных данных). И для этого не больше кода, чем то, что у меня здесь, буквально :) Если вы не можете понять проблему, то вы не можете. :)   -  person Bonton255    schedule 18.01.2018
comment
@ Bonton255 извините, пропустил это ... Я думаю, тогда это объясняет причину вашей проблемы, поэтому я опубликовал комментарий, который собирался написать в качестве ответа :-) Мне просто нужно было сделать некоторые предположения о том, как DataItemCache объявляется и используется в производстве так как здесь нет ни аннотаций, ни примера кода — просто юнит-тест, который, как я понимаю, не воспроизводит проблему.   -  person Didier L    schedule 18.01.2018


Ответы (1)


Из-за future, который вы храните в DataItemCache, эта служба сохраняет состояние.

Предполагая, что это синглтон, цепочка CompletableFuture, которую вы создаете, является общей для всех вызывающих объектов. Это означает, что если вы будете вызывать saveItemAsync() параллельно, вы построите огромную цепочку из CompletableFuture. Это может быть причиной вашего OOME.

Кроме того, waitForWriteComplete() также будет ожидать завершения фьючерсов других вызывающих объектов, при этом, возможно, пропустив свои собственные результаты, поскольку при доступе к future нет синхронизации.

Наконец, поскольку все вызовы объединены в цепочку, побочным эффектом является то, что одновременно обрабатывается только один (за исключением проблем с синхронизацией памяти при доступе к future). Таким образом, нет особого смысла использовать здесь CompletableFuture, было бы намного проще иметь synchronized синхронный метод. Это также будет потреблять меньше памяти и, вероятно, улучшит производительность, избегая всех накладных расходов CompletableFuture.

person Didier L    schedule 18.01.2018
comment
Спасибо за ваш вклад. Но я уже знаю, что вы здесь упомянули. Вопрос заключался в том, почему завершенные CompletableFuture не освобождают память. И как я могу заставить их освободить память после того, как они закончат выполнение? В качестве примечания, использование CompletableFuture здесь (вместо синхронного метода), очевидно, не заставляет потребителя ждать завершения вызовов сохранения, пока потребитель может вернуться и выполнить дополнительную работу. - person Bonton255; 19.01.2018
comment
Я думаю, что CompletableFuture довольно хорошо оптимизирован в отношении управления памятью, но сборщику мусора может быть трудно отказаться от них из-за длинных цепочек, которые вы создаете. Вы можете проверить кратчайшие пути к корням GC в Eclipse Memory Analyzer, чтобы лучше понять. Вы уверены, что все они закончили выполнение, когда вы получите OOME? Кроме того, вы можете значительно сократить количество CompletableFuture, обрабатывая полные пакеты DataItem внутри thenRunAsync() без ущерба для производительности, поскольку там уже нет параллелизма. - person Didier L; 19.01.2018