Я использую rabbitMQ для отправки задач рабочим (потребителям), которые создаются на ходу. В настоящее время каждый раз, когда создается новая задача, создается новый исполнитель. Проблема такая: -Пользователь создает задачу.
-Создан воркер, затем задача отправляется в очередь для обработки воркерами.
-Рабочий начинает обрабатывать очередь (в основном рабочий какое-то время спит)
-Еще один пользователь создает задачу
-Создан новый работник и задача отправлена в очередь
-Новый рабочий не обрабатывает новую задачу и при этом абсолютно ничего не делает, а новая задача обрабатывается первым работником после того, как он выполнил первую задачу.
Я проверил административную часть rabbitmq, и к очереди привязаны два потребителя, но один из них, кажется, выполняет всю работу, а другой просто ждет.
Вот код рабочего: открытый класс Worker расширяет Thread {
private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;
public Worker() {
Worker.id++;
compteur = id;
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WORKER " + compteur + " : Message received :" + message);
String taskName = message.split(" ")[0];
String length = message.split(" ")[1];
try {
System.out.println("WORKER " + compteur + " : Commencing job :" + message);
doWork(length);
System.out.println("WORKER " + compteur + " : Job's finished :" + message);
taskName += " done by " + compteur;
// confirm(taskName);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("WORKER " + compteur + " : Waiting for a new task...");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
} catch (TimeoutException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void doWork(String taskLength) throws InterruptedException {
int temps = Integer.parseInt(taskLength);
Thread.sleep(temps);
}
}
и код для части, которая помещает сообщения в очередь: public class serveurSD {
private final static String QUEUE_NAME = "Tasks";
private Manager MANAGER = new Manager();
@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
if (MANAGER == null){
MANAGER= new Manager();
MANAGER.listen();
}
System.out.println("SERVER : Message received : " + txt + " " + duree);
MANAGER.giveTask();
ConnectionFactory factory = new ConnectionFactory();
String message = txt + " " + duree;
System.out.println("SERVER : Sending message to workers : " + message);
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
return "Your task is being processed";
}
}
(Менеджер - это класс, создающий рабочих на ходу.)
Прошу прощения, если подобный вопрос уже задавался, но я не смог его найти. Спасибо за любую возможную помощь :)