Действительно ли AtomicInteger производит атомарное целое число?

Я погуглил AtomicInteger и увидел, что кто-то сказал, что мы можем использовать AtomicInteger(AtomicLong) для секвенсора памяти (http://www.cs.hut.fi/u/tlilja/multicore/slides/java_multicore.pdf). Вот мой тест:

public class TestAtomicInteger {

public static void main(String[] args) {

    final AtomicInteger sequencer = new AtomicInteger(1);
    final Set<Integer> integers = new HashSet<>();
    //final Set<Integer> integers = new ConcurrentSkipListSet<>();


    final Runnable task = new Runnable() {

        @Override
        public void run() {
            int next = sequencer.getAndIncrement();
            integers.add(next);
            System.out.println(integers.size());
        }
    };
    for (int i = 0; i < 1000; i++) {
        Thread t = new Thread(task);
        t.start();
    }
}
}

После многократного запуска этого кода я обнаружил, что иногда общее количество атомарных выходных данных не равно 1000. Это означает, что метод getAndIncrement возвращает DUPLICATE.

Кто-нибудь объяснит, почему? Спасибо.

*ПРИМЕЧАНИЕ. В функции запуска. Если я использую System.out.println(next); Иногда я также вижу некоторые отсутствующие секвенсоры. *

ОБРАЗЕЦ РЕЗУЛЬТАТА 1:




person Loc    schedule 02.01.2014    source источник
comment
Можете ли вы сделать такой вывод, который не содержит 1000?   -  person Sotirios Delimanolis    schedule 03.01.2014
comment
Создание 1000 потоков в тесном цикле может серьезно нагрузить JVM. Гораздо более вероятно, что поток либо не завершился, либо вы что-то пропустили в выводе, чем то, что AtomicInteger не работает должным образом.   -  person Ted Hopp    schedule 03.01.2014
comment
Мы видим 1000 в вашем выводе.   -  person Alexis C.    schedule 03.01.2014
comment
Кажется гораздо более вероятным, что именно System.out.println на вашей платформе не любит 1000 потоков, пытающихся использовать его одновременно, чем огромная ошибка в модели памяти Java, которую никто не нашел до тех пор, пока теперь... пробовали ли вы накапливать результаты в потокобезопасной коллекции, а затем синхронно распечатывать их?   -  person Affe    schedule 03.01.2014
comment
@ZouZou: Пожалуйста, посмотрите мое обновление. Это случается когда-нибудь.   -  person Loc    schedule 03.01.2014
comment
@Sotirios Delimanolis: Вы видите мое последнее обновление?   -  person Loc    schedule 03.01.2014
comment
Ваш код в том виде, в котором он опубликован, ошибочен. Вам нужно использовать синхронизированный набор, чтобы безопасно изменять его из нескольких потоков. Вам также необходимо дождаться завершения всех потоков, прежде чем тестировать содержимое набора. Чтобы разработать правильный тест, рассмотрите возможность использования службы-исполнителя, где вы можете дождаться завершения всех потоков.   -  person Ted Hopp    schedule 03.01.2014
comment
Пожалуйста, НЕ редактируйте код в вопросе таким образом - вы изменили характер вопроса, закомментировав одну строку.   -  person Robin Green    schedule 03.01.2014
comment
@Робин Грин. Я обновил его до исходного вопроса. Извините всех. Спасибо.   -  person Loc    schedule 03.01.2014


Ответы (6)


Одна из основных проблем с вашим кодом заключается в том, что вы добавляете числа в HashSet, который не является потокобезопасным. Это причина того, что вы наблюдаете. Если вы использовали потокобезопасную коллекцию, вы получите ожидаемый результат:

final Set<Integer> integers = new ConcurrentSkipListSet<>();

Но это вводит дополнительный уровень синхронизации. Другой альтернативой является использование простого массива int[] numbers = new int[1000]; и использование: numbers[next-1]++; для подсчета вхождений.

Новый код для справки:

public static void main(String[] args) throws Exception {

    final AtomicInteger sequencer = new AtomicInteger(1);
    final int[] integers = new int[1000];

    final Runnable task = new Runnable() {

        @Override
        public void run() {
            int next = sequencer.getAndIncrement();
            integers[next-1]++;
        }
    };
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 1000; i++) {
        Thread t = new Thread(task);
        t.start();
        threads.add(t);
    }
    for (Thread t : threads) {
        t.join();
    }
    for (int i = 0; i < 1000; i++) {
        if (integers[i] != 1) System.out.println(i + " -> " + integers[i]);
    }
}
person assylias    schedule 02.01.2014
comment
Я использовал ConcurrentSkipListSet и повторил тест. Я была такая же проблема. ( Когда-то). - person Loc; 03.01.2014
comment
@Loc - Вы гарантируете, что все потоки завершатся перед проверкой результата (как это делает assylias во втором цикле)? - person Ted Hopp; 03.01.2014
comment
@Loc HashSet (или, скорее, HashMap, который его поддерживает) начинается с размера по умолчанию 16 и удваивается при необходимости. Итак, 16, 32, 64, 128, 256, 512, 1024 и т. д. 1000 должно быть потеряно при изменении размера. - person Sotirios Delimanolis; 03.01.2014

Если я запускаю вашу программу на своем компьютере, я наблюдаю неправильный вывод примерно в 1 из 10 раз. (Это с эмулятором консоли в Netbeans 7.)

Однако, если я изменю его, чтобы проверить размер набора после завершения всех потоков, он всегда будет содержать 1000 уникальных целых чисел. Это справедливо даже в том случае, если в выходных данных, генерируемых отдельными потоками во время их выполнения, есть ошибки. (То есть в строках, напечатанных потоками, печатаются дубликаты, но истинный размер набора после завершения выполнения на самом деле 1000)

Поэтому я прихожу к выводу, что проблема заключается в том, что 1000 потоков, записывающих все в PrintStream, который ваша платформа предоставляет для System.out, искажают вывод. На моей платформе я вижу поврежденный вывод во время выполнения, но в конце всегда есть 1000 уникальных целых чисел.

public static void main(String[] args) throws InterruptedException {

    final AtomicInteger sequencer = new AtomicInteger(1);
    final Set<Integer> integers = new ConcurrentSkipListSet<Integer>();

    final Runnable task = new Runnable() {

        @Override
        public void run() {
            int next = sequencer.getAndIncrement();
            integers.add(next);
            System.out.println(next);
        }
    };
    List<Thread> threads = new ArrayList<Thread>(1000);
    for (int i = 0; i < 1000; i++) {
        Thread t = new Thread(task);
        t.start();
        threads.add(t);
    }

    for (Thread t : threads) {
        t.join();
    }
    System.out.println(integers.size());
}
person Affe    schedule 02.01.2014
comment
@SotiriosDelimanolis Я перепроверил размер набора целых чисел после завершения всех потоков и убедился, что в нем есть 1000 уникальных целых чисел, чтобы подтвердить мое утверждение, что проблема заключается в том, что реализация PrintStream, установленная на System.out, не обрабатывает 1000 потоков. писать его одновременно и получать повреждения. - person Affe; 03.01.2014
comment
@SotiriosDelimanolis переформулировал это, чтобы попытаться сказать это более ясно :) - person Affe; 03.01.2014
comment
Ага, это тоже. Печать должна выполняться внутри синхронизированного метода. Хороший улов. - person David Tonhofer; 03.01.2014

AtomicInteger вызывает класс Unsafe, который, в свою очередь, выполняет собственные вызовы. Так что да, AtomicInteger теоретически может возвращать неатомарные обновления. Это будет зависеть от JVM и базовой архитектуры, на которой она работает.

Однако, учитывая характер потоков, я всегда предполагаю, что ошибка в моем коде, а не в JVM.

Я запустил этот код и не смог обнаружить никаких столкновений. JDK 1_7_45 победа 7

  public static void main(String[] args) {

      final AtomicInteger sequencer = new AtomicInteger(1);
      final Set<Integer> integers = new HashSet<Integer>();

      final Runnable task = new Runnable() {

          @Override
          public void run() {
              int next = sequencer.getAndIncrement();
              synchronized (integers){
                  if(integers.contains(next)){
                      System.out.println("duplicate detected " + next);
                  }
                  integers.add(next);
              }
          }
      };


      for(int j = 0; j < 1000; j++){
          System.out.print("testing " + j +" ");
          sequencer.set(0);
          integers.clear();
          List<Thread> threads = new ArrayList<Thread>(10000);
          for (int i = 0; i < 1000; i++) {
              Thread t = new Thread(task);
              threads.add(t);
              t.start();
          }
          for (Thread t : threads) {
              try {
                  t.join();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
          System.out.println("integers size " + integers.size());
      }
  }
person BevynQ    schedule 02.01.2014

Как говорили другие:

  1. Дождитесь завершения всех потоков в конце цикла, используя Thread.join(). Ссылки на все потоки должны быть сохранены в коллекции, конечно
  2. close() STDOUT перед выходом

Вокруг программы:

  1. Направьте вывод через sort и/или wc -l
  2. Используйте diff для проверки списка чисел, созданного простым циклом
  3. Выход с предупреждением, если diff находит разницу

Повторите 10 000 раз в запрограммированном цикле.

Доложить.

person David Tonhofer    schedule 02.01.2014
comment
Я повторно обновил свой код и протестировал 1000 потоков. Как я уже сказал, он произвел неправильный вывод. (Редко бывает) - person Loc; 03.01.2014
comment
@Loc - вы не ждете завершения всех потоков. Это очень важно для правильного теста. - person Ted Hopp; 03.01.2014

Обновленный вывод содержит 7 целых чисел, напечатанных дважды.

(взял ваш вывод | sort | uniq --count | sort )

печатает это в конце (первый столбец - это количество просмотров, 2-й столбец - это значение)

  1 991
  1 992
  1 993
  2 519
  2 52
  2 661
  2 848
  2 875
  2 985
  2 995

Это связано с тем, что вы печатаете размер HashSet, который не является потокобезопасным, поэтому иногда размер не показывает обновленное значение.

Переход на поточно-ориентированную реализацию Set должен работать.

person dkatzel    schedule 02.01.2014

Для меня это из-за ConcurrentSkipListSet.size(). Этот тип слабо согласованной коллекции, хотя и потокобезопасный, не является точным в отношении итераций или подсчета в целом.

Из Javadoc ConcurrentSkipListSet.size()

Имейте в виду, что, в отличие от большинства коллекций, метод size не является операцией с постоянным временем. Из-за асинхронного характера этих наборов определение текущего количества элементов требует обхода элементов. Кроме того, не гарантируется атомарное выполнение массовых операций addAll, removeAll, keepAll и containsAll. Например, итератор, работающий одновременно с операцией addAll, может просматривать только некоторые добавленные элементы.

person Pitiphan    schedule 02.01.2014