Я изо всех сил пытаюсь понять, как объединить ресурсы, и я начинаю подозревать, что мои потоки могут быть проблемой (не на 100%, но я экспериментировал с этим). Суть того, что я пытаюсь сделать, это создать пул каналов на сервер, а затем посмотреть, используют ли их потоки. Мне удалось получить количество каналов, которые будут созданы для такого количества элементов, которые я загружаю (т. каналы по необходимости).
Я подумал, что, возможно, проблема заключается в том, как потоки взаимодействуют с пулом, поэтому я попытался создать newCachedThreadPool
, чтобы потоки не умирали, пока есть работа, но когда я это делаю, я получаю сообщения об ошибках, говорящих о том, что используемый канал был закрыт. В моем пуле есть метод destroyObject
, но я никогда его не вызываю, поэтому я не понимаю, почему он запускается (если я его закомментирую, он работает, но создает только один канал, а загрузка очень-очень медленная, около 300 операций в секунду по сравнению с без многопоточный пул я получаю 30k/сек). Я подозреваю, что он завершается, есть ли способ проверить это и есть ли альтернатива, которую я могу использовать, если он завершается?
Вот код (игнорируйте весь материал rabbitmq, я просто могу следить за результатами):
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class PoolExample {
private static ExecutorService executor_worker;
static {
final int numberOfThreads_ThreadPoolExecutor = 20;
executor_worker = Executors.newCachedThreadPool();
executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
}
private static ObjectPool<Channel> pool;
public static void main(String[] args) throws Exception {
System.out.println("starting..");
ObjectPool<Channel> pool =
new GenericObjectPool<Channel>(
new ConnectionPoolableObjectFactory(), 50);
for (int x = 0; x<500000000; x++) {
executor_worker.submit(new MyRunnable(x, pool));
}
//executor_worker.shutdown();
//pool.close();
}
}
class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
Channel channel;
Connection connection;
public ConnectionPoolableObjectFactory() throws IOException {
System.out.println("hello world");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
channel = connection.createChannel();
}
@Override
public Channel makeObject() throws Exception {
//channel = connection.createChannel();
return channel;
}
@Override
public boolean validateObject(Channel channel) {
return channel.isOpen();
}
@Override
public void destroyObject(Channel channel) throws Exception {
channel.close();
}
@Override
public void passivateObject(Channel channel) throws Exception {
//System.out.println("sent back to queue");
}
}
class MyRunnable implements Runnable{
protected int x = 0;
protected ObjectPool<Channel> pool;
public MyRunnable(int x, ObjectPool<Channel> pool) {
// TODO Auto-generated constructor stub
this.x = x;
this.pool = pool;
}
public void run(){
try {
Channel channel = pool.borrowObject();
String message = Integer.toString(x);
channel.basicPublish( "", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
pool.returnObject(channel);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
p.s. Я в основном задал несколько вопросов, прочитал документацию и попытался понять это, и по пути я, возможно, пошел совершенно в неправильном направлении, поэтому, если есть какие-либо проблемы, которые вы видите, или советы, пожалуйста, пришлите их мне.
Сюжет сгущается:
В цикле for основного метода (где я отправляю работу в потоки) я добавил:
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
System.out.println(threadSet.size()); //number of threads
System.out.println(pool.getNumActive());
Он показывает мне 25 потоков (хотя я сказал 20) и 20 элементов в пуле. Но когда я смотрю на пользовательский интерфейс rabbitmq, я вижу одно соединение только с одним каналом. Если я создаю каналы и отправляю их в исполняемый файл, он создает много каналов (но никогда их не закрывает). Я не понимаю, что происходит и почему результат не такой, как ожидалось.
executor_worker
(строкаexecutor_worker = Executors.newCachedThreadPool();
ничего не делает в вашем коде)? - person assylias   schedule 02.05.2012executor_worker = Executors.newCachedThreadPool(numberOfThreads_ThreadPoolExecutor);
и удалите другую строку. - person assylias   schedule 02.05.2012