Rabbitmq - новый потребитель ничего не получает

Я использую rabbitMQ для отправки задач рабочим (потребителям), которые создаются на ходу. В настоящее время каждый раз, когда создается новая задача, создается новый исполнитель. Проблема такая: -Пользователь создает задачу.

-Создан воркер, затем задача отправляется в очередь для обработки воркерами.

-Рабочий начинает обрабатывать очередь (в основном рабочий какое-то время спит)

-Еще один пользователь создает задачу

-Создан новый работник и задача отправлена ​​в очередь

-Новый рабочий не обрабатывает новую задачу и при этом абсолютно ничего не делает, а новая задача обрабатывается первым работником после того, как он выполнил первую задачу.

Я проверил административную часть rabbitmq, и к очереди привязаны два потребителя, но один из них, кажется, выполняет всю работу, а другой просто ждет.

Вот код рабочего: открытый класс Worker расширяет Thread {

private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;

public Worker() {
    Worker.id++;
    compteur = id;
}
public void run() {
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WORKER " + compteur + " : Message received :" + message);

                String taskName = message.split(" ")[0];
                String length = message.split(" ")[1];

                try {
                    System.out.println("WORKER " + compteur + " : Commencing job :" + message);
                    doWork(length);
                    System.out.println("WORKER " + compteur + " : Job's finished :" + message);
                    taskName += " done by " + compteur;
                   // confirm(taskName);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    System.out.println("WORKER " + compteur + " : Waiting for a new task...");
                }

            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);
    } catch (IOException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    } catch (TimeoutException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    }
}

private static void doWork(String taskLength) throws InterruptedException {
    int temps = Integer.parseInt(taskLength);
    Thread.sleep(temps);
}
}

и код для части, которая помещает сообщения в очередь: public class serveurSD {

private final static String QUEUE_NAME = "Tasks";
private  Manager MANAGER = new Manager();

@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
    if (MANAGER == null){
        MANAGER= new Manager();
        MANAGER.listen();
    }
    System.out.println("SERVER : Message received : " + txt + " " + duree);
    MANAGER.giveTask();
    ConnectionFactory factory = new ConnectionFactory();
    String message = txt + " " + duree;
    System.out.println("SERVER : Sending message to workers : " + message);
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    channel.close();
    connection.close();       
    return "Your task is being processed";
}
}

(Менеджер - это класс, создающий рабочих на ходу.)

Прошу прощения, если подобный вопрос уже задавался, но я не смог его найти. Спасибо за любую возможную помощь :)


person jilako    schedule 29.12.2015    source источник


Ответы (2)


второй параметр метода basicConsume - «автоматическое подтверждение». Установка этого параметра в true означает, что потребитель сообщит RabbitMQ, что сообщение было подтверждено, как только он получит сообщение.

Когда для потребителя установлено значение autoAck true, весьма вероятно, что он немедленно получит следующее доступное сообщение из очереди, даже если для basicQos установлено значение 1. это происходит, потому что предел 1 немедленно уменьшается потребителем, чтобы сказать у него больше нет сообщения, и он может принять следующее.

Изменение параметра auto ack на false предотвращает эту проблему в сочетании с базовой настройкой QoS, равной 1, потому что это заставляет вашего потребителя сказать: «Привет, у меня есть сообщение, и я сейчас работаю над ним. Не отправляйте мне что-нибудь еще, пока я не закончу ".

это позволяет другому потребителю сказать: «Эй, у меня есть свободное место. Давай, отправь мне сообщение».

person Derick Bailey    schedule 31.12.2015

Хорошо, похоже, я нашел способ заставить его работать, проблема в том, что я не знаю, почему это работает, поэтому, если кто-то знает, я был бы рад получить объяснение

Я изменил второй аргумент channel.basicConsume(QUEUE_NAME, true, consumer); на false, но не уверен, что понял, почему это решает мою проблему.

person jilako    schedule 29.12.2015