newFixedThreadPool не работает должным образом с пулом объектов

Я изо всех сил пытаюсь понять, как объединить ресурсы, и я начинаю подозревать, что мои потоки могут быть проблемой (не на 100%, но я экспериментировал с этим). Суть того, что я пытаюсь сделать, это создать пул каналов на сервер, а затем посмотреть, используют ли их потоки. Мне удалось получить количество каналов, которые будут созданы для такого количества элементов, которые я загружаю (т. каналы по необходимости).

Я подумал, что, возможно, проблема заключается в том, как потоки взаимодействуют с пулом, поэтому я попытался создать newCachedThreadPool, чтобы потоки не умирали, пока есть работа, но когда я это делаю, я получаю сообщения об ошибках, говорящих о том, что используемый канал был закрыт. В моем пуле есть метод destroyObject, но я никогда его не вызываю, поэтому я не понимаю, почему он запускается (если я его закомментирую, он работает, но создает только один канал, а загрузка очень-очень медленная, около 300 операций в секунду по сравнению с без многопоточный пул я получаю 30k/сек). Я подозреваю, что он завершается, есть ли способ проверить это и есть ли альтернатива, которую я могу использовать, если он завершается?

Вот код (игнорируйте весь материал rabbitmq, я просто могу следить за результатами):

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;

    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker = Executors.newCachedThreadPool();
        executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
                                           new LinkedBlockingDeque<Runnable>());
    }

    private static ObjectPool<Channel> pool;

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(), 50);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x, pool));
        }
        //executor_worker.shutdown();
        //pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;
     Connection connection;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {  
        //channel = connection.createChannel(); 
        return channel; 
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x, ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicPublish( "", "task_queue", 
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

p.s. Я в основном задал несколько вопросов, прочитал документацию и попытался понять это, и по пути я, возможно, пошел совершенно в неправильном направлении, поэтому, если есть какие-либо проблемы, которые вы видите, или советы, пожалуйста, пришлите их мне.

Сюжет сгущается:

В цикле for основного метода (где я отправляю работу в потоки) я добавил:

    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    System.out.println(threadSet.size()); //number of threads
    System.out.println(pool.getNumActive());

Он показывает мне 25 потоков (хотя я сказал 20) и 20 элементов в пуле. Но когда я смотрю на пользовательский интерфейс rabbitmq, я вижу одно соединение только с одним каналом. Если я создаю каналы и отправляю их в исполняемый файл, он создает много каналов (но никогда их не закрывает). Я не понимаю, что происходит и почему результат не такой, как ожидалось.


person Lostsoul    schedule 02.05.2012    source источник
comment
Вероятно, это не связано, но: вы намеренно назначаете 2 исполнителя executor_worker (строка executor_worker = Executors.newCachedThreadPool(); ничего не делает в вашем коде)?   -  person assylias    schedule 02.05.2012
comment
@assylias Я прочитал учебник, и в их примере именно так они реализовали newCachedThreadPool. Я не понимал, что это неправильно, я думал, что так можно сказать исполнителю использовать пул кэшированных потоков.   -  person Lostsoul    schedule 02.05.2012
comment
Если вы хотите использовать стандартный пул кэшированных потоков, просто напишите: executor_worker = Executors.newCachedThreadPool(numberOfThreads_ThreadPoolExecutor); и удалите другую строку.   -  person assylias    schedule 02.05.2012
comment
Это не сработало, поэтому вместо этого я изменил его на newFixedThreadPool, чтобы проверить, но у меня все еще возникает та же проблема. Кажется, что он убивает пул при каждой работе, которую выполняет поток.   -  person Lostsoul    schedule 02.05.2012


Ответы (1)


Я думаю, проблема в том, что ваш ConnectionPoolableObjectFactory всегда создает только один объект Channel. Кажется, что он должен создавать новый канал каждый раз, когда вызывается makeObject.

Так что, возможно, это должно быть реализовано примерно так:

public class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private final Connection connection;

    private ConnectionPoolableObjectFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
    }

    @Override
    public Channel makeObject() throws Exception {
        return connection.createChannel();
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

Это предполагает, что каждая фабрика создает несколько каналов из одного соединения.

person Jeremy    schedule 02.05.2012
comment
Интересно. Ваш код создает соединение для каждого потока, а затем один канал для каждого соединения. Это определенно на шаг ближе для меня, но я пытался поддерживать порождение нескольких каналов только из одного соединения. Я попытался изменить ваш метод кода newConnectionFactory, чтобы вернуть новое соединение с переменной, но затем я получаю исключение нулевого указателя. Любая идея, как создать соединение только один раз? - person Lostsoul; 02.05.2012
comment
Откуда берется исключение NullPointerException? Я изменил свой ответ, чтобы новые каналы создавались из одного соединения. Я не могу запустить его, потому что у меня нет времени, чтобы настроить полную программу. - person Jeremy; 02.05.2012
comment
Большое спасибо, Джереми. Это сработало! Нулевой указатель исходил из области, в которой я заимствовал канал (что в основном означало, что он не получал канал). Ваш фикс сработал, у меня теперь одно подключение с несколькими каналами. - person Lostsoul; 03.05.2012
comment
Без проблем! Я обновил свой код, чтобы экземпляр Connection не был статичным. Это было упущением с моей стороны. Если бы он был статическим, то каждый экземпляр ConnectionPoolableObjectFactory использовал бы одно и то же соединение, что я не считаю желательным. - person Jeremy; 03.05.2012
comment
Большое спасибо, Джереми. На самом деле я хочу, чтобы все экземпляры имели общее соединение, но получали уникальные каналы. Оба ваших кода, похоже, работают, но оба блокируются как сумасшедшие. Я провел большую часть сегодняшнего дня, пытаясь понять, почему. Когда я проверяю пользовательский интерфейс кролика, я вижу 1 соединение и 20 каналов, чего я и хочу, но когда я профилирую и проверяю темы, я вижу их все красными, кроме одного за раз. вывод статистики в пуле показывает, что есть 20 каналов, но я не могу понять, почему это происходит. Извините, если я вас слишком много беспокою, но у вас есть какие-либо идеи о том, как я могу решить эту проблему? - person Lostsoul; 04.05.2012
comment
когда я создаю аналогичный сценарий, в котором одно соединение создает несколько ступеней в многопоточном цикле for, тогда блокировка нулевая, но на самом деле я не объединяюсь, потому что каждый раз создаю новые каналы. Я не думаю, что это имеет какое-либо отношение к кролику (я отправил им письмо по электронной почте в службу поддержки). Я думаю, что это как-то связано с тем, как объекты распределяются. Это так странно, он запускается, затем замедляется до нуля и не двигается. - person Lostsoul; 04.05.2012
comment
Означает ли красный канал, что он блокируется? Я предполагаю, что каналы не возвращаются в пул, учитывая, что вы не возвращаете их в блоке finally. Кроме того, вам может понадобиться реализовать пассивацию объекта, если вам нужно что-то сделать, чтобы подготовить канал для повторного использования. Я поэкспериментировал с созданием простого примера здесь. Не стесняйтесь играть с ним, я просто пытался воспроизвести вашу проблему без использования rabbitmq, но я не думаю, что это очень поможет. - person Jeremy; 04.05.2012
comment
Да, красный цвет означает его блокировку. Спасибо за пример примера, я пытался понять ваши мысли, но вы проделали так много работы, я очень ценю это, и мне жаль отнимать ваше время. Но даже ваш пример кода блокируется. То, как я это тестировал, изменило ваш цикл for на 900000000 (чтобы он не заканчивался слишком быстро), затем запустил его и запустил Java visualvm (он включен в java). В основном на вкладке потоков написано monitor, что на вашем наборе блокируется. 2 или 3 из 4 потоков заблокированы в любой момент времени - person Lostsoul; 04.05.2012
comment
На самом деле, когда я посмотрел на ваш комплект, он сказал, что несколько вещей блокируют его, sysout блокирует (что имеет смысл), поэтому я закомментировал эту часть запуска, чтобы не было вывода и повторного запуска, но я получаю ту же блокировку (2- 3 темы заблокированы), но на этот раз блокирует объект заимствования. Я читал в списке рассылки, что он может блокироваться при создании нового объекта, так что, может быть, если увеличить срок службы, это поможет? просто мысль.. - person Lostsoul; 04.05.2012