Странное поведение arrayBlockingQueue с элементами массива

У меня какое-то странное поведение при использовании ArrayBlockingQueue, которое я использую для связи между определенными шагами в приложении Java.

Я использую 1 статическую очередь ArrayBlockingQueue, инициализированную следующим образом:

protected static BlockingQueue<long[]> commandQueue;

За ним следует конструктор, в одной из строк которого есть this:

commandQueue = new ArrayBlockingQueue<long[]>(amountOfThreads*4);

Где amountOfThreads задается как аргумент конструктора.

Затем у меня есть производитель, который создает массив long[2], дает ему некоторые значения, а затем предлагает его в очередь, затем я изменяю одно из значений массива сразу после него и еще раз предлагаю его в очередь:

long[] temp = new long[2];
temp[0] = currentThread().getId();
temp[1] = gyrAddress;//Address of an i2c sensor
CommunicationThread.commandQueue.offer(temp);//CommunicationThread is where the commandqueue is located
temp[1] = axlAddress;//Change the address to a different sensor
CommunicationThread.commandQueue.offer(temp);

Затем потребитель возьмет эти данные и откроет соединение i2c с определенным датчиком, получит некоторые данные от указанного датчика и передаст данные обратно, используя другую очередь. На данный момент, однако, я установил, что потребитель просто потребляет голову и печатает данные.

long[] command = commandQueue.take();//This will hold the program until there is at least 1 command in the queue
if (command.length!=2){
    throw new ArrayIndexOutOfBoundsException("The command given is of incorrect format");
}else{
    System.out.println("The thread with thread id " + command[0] + " has given the command to get data from address " +Long.toHexString(command[1]));
}

Теперь для тестирования у меня есть поток производителя с этими адресами (byte) 0x34, (byte)0x44 Если все идет правильно, мой вывод должен быть:

The thread with thread id 14 has given the command to get data from address 44
The thread with thread id 14 has given the command to get data from address 34

Однако я получаю:

The thread with thread id 14 has given the command to get data from address 34
The thread with thread id 14 has given the command to get data from address 34

Это означало бы, что он отправляет временный массив после его изменения.

Что я сделал, чтобы попытаться это исправить: я попробовал сон, если я добавил сон на 150 мс, тогда ответ правильный. Однако этот метод совершенно очевидно повлияет на производительность... Поскольку метод предложения возвращает истину, я попробовал следующий фрагмент кода.

boolean tempBool = false;
while(!tempBool){
    tempBool = CommunicationThread.commandQueue.offer(temp);
    System.out.println(tempBool);
}

Который выводит истину. Это не повлияло.

Я попытался напечатать temp[1] после этого цикла while, и в этот момент это правильное значение. (Он печатает 44, однако потребитель получает 34)

Скорее всего, это проблема синхронизации, однако я думал, что цель объекта на основе BlockingQueue состоит в том, чтобы решить эту проблему.

Мы будем очень признательны за любую помощь или предложение по работе этой BlockingQueue. Позвольте мне закончить на заметке, что я впервые работаю с очередями между потоками в java и что окончательная программа будет работать на raspberry pi с использованием библиотеки pi4j для связи с датчиками.


person Ylva    schedule 24.11.2015    source источник
comment
Я подозреваю, что у вас есть статическая переменная, которая инициализируется в конструкторе. Статические переменные не принадлежат экземпляру, поэтому их не следует инициализировать при создании экземпляра; значение будет перезаписано в следующий раз, когда вы создадите экземпляр.   -  person Andy Turner    schedule 24.11.2015
comment
Вы изменяете массивы, которые совместно используются потоками без какой-либо синхронизации. Не. И что еще хуже, вы, кажется, всегда храните в очереди один и тот же массив. Это не может работать. Вместо использования массивов из двух длинных значений создайте неизменяемый класс, содержащий два длинных поля, и сохраните его в очереди. Или, по крайней мере, не изменяйте массив, когда он находится в очереди.   -  person JB Nizet    schedule 24.11.2015
comment
@AndyTurner Я делаю это, потому что мне нужно установить размер в зависимости от количества потоков, я создаю только один экземпляр вне этого класса, поэтому это не должно иметь большого значения.   -  person Ylva    schedule 24.11.2015
comment
@JBNizet, так что вы имеете в виду, создать класс с двумя локальными переменными и этим экземпляром? Я не думал об этом. Однако оба не решают вопрос о наложении, почему он будет отправлять измененное значение, даже если оно было изменено только после того, как оно вернуло истину.   -  person Ylva    schedule 24.11.2015
comment
@Ylva, это не значит, что вам нужна статическая переменная. Вы можете внедрить экземпляр очереди в качестве параметра конструктора, а не емкость.   -  person Andy Turner    schedule 24.11.2015


Ответы (1)


Поскольку вы спросили, как именно работает BlockingQueue, давайте начнем с этого:

Блокирующая очередь — это очередь, которая блокируется, когда вы пытаетесь удалить из нее очередь, пока она пуста, или когда вы пытаетесь поставить в нее элементы, когда очередь уже заполнена. Поток, пытающийся исключить из пустой очереди, блокируется до тех пор, пока какой-либо другой поток не вставит элемент в очередь.

Итак, эти блокирующие очереди не позволяют различным потокам читать/писать в очередь, пока это еще невозможно, потому что она либо пуста, либо заполнена.

Как уже объясняли Andy Turner и JB Nizet, переменные статически совместно используются в памяти. Это означает, что когда ваш поток читает очередь, он находит ссылку (т.е. указатель) на эту переменную (в памяти) и использует этот указатель в своем следующем коде. Однако до того, как ему удастся прочитать эти данные, вы уже изменили переменную, обычно в непоточных приложениях это не будет проблемой, поскольку только один поток будет пытаться читать из памяти, и он всегда будет выполняться в хронологическом порядке. Способ обойти это — создать новую переменную/массив (которая будет назначаться новой памяти) с переменными данными каждый раз, когда вы добавляете запись в очередь, таким образом, вы убедитесь, что не перезаписываете переменную в памяти до он обрабатывается другим потоком. Простой способ сделать это:

long[] tempGyr = new long[2];
tempGyr[0] = currentThread().getId();
tempGyr[1] = gyrAddress;
CommunicationThread.commandQueue.offer(tempGyr);//CommunicationThread is where the commandqueue is located

long[] tempAxl = new long[2];
tempAxl[0] = currentThread().getId();
tempAxl[1] = axlAddress;
CommunicationThread.commandQueue.offer(tempAxl);

Надеюсь, это объясняет тему, если нет: не стесняйтесь задавать дополнительные вопросы :)

person SonnyX    schedule 24.11.2015