Масштабируемый способ доступа к каждому элементу ConcurrentHashMap‹Element, Boolean› ровно один раз

У меня есть 32 машинных потока и один ConcurrentHashMap<Key,Value> map, который содержит много ключей. Key определил общедоступный метод visit(). Я хочу visit() каждый элемент карты ровно один раз, используя доступную вычислительную мощность и, возможно, какой-то пул потоков.

Что я мог бы попробовать:

  • Я мог бы использовать метод map.keys(). Результирующий Enumeration<Key> можно было бы повторить с помощью nextElement(), но поскольку вызов key.visit() очень короткий, мне не удастся занять потоки. Перечисление по своей сути является однопоточным.
  • Вместо этого я мог бы использовать синхронизированный HashSet<Key>, вызвать метод toArray() и разделить работу над массивом на все 32 потока. Я серьезно сомневаюсь в этом решении, так как метод toArray(), скорее всего, будет узким местом для одного потока.
  • Я мог бы попытаться наследоваться от ConcurrentHashMap, заполучить экземпляры его внутреннего Segment<K,V>, попытаться сгруппировать их в 32 группы и работать с каждой группой отдельно. Хотя это звучит как хардкорный подход.
  • или подобная магия с Enumeration<Key>.

В идеале:

  • В идеале ConcurrentHashMap<Key, Value> определял бы метод keysEnumerator(int approximatePosition), который мог бы отбросить мне перечислитель, в котором отсутствуют примерно первые 1/32 элемента, т.е. map.keysEnumerator(map.size()/32)

Я пропустил что-то очевидное? Кто-нибудь сталкивался с подобной проблемой раньше?

ИЗМЕНИТЬ

Я попробовал профилировать, чтобы увидеть, действительно ли эта проблема повлияет на производительность на практике. Поскольку в данный момент у меня нет доступа к кластеру, я использовал свой ноутбук и попытался экстраполировать результаты на больший набор данных. На моей машине я могу создать 2 миллиона ключей ConcurrentHashMap, и для его повторения требуется около 1 секунды, вызывая метод visit() для каждого ключа. Предполагается, что программа масштабируется до 85 миллионов ключей (и более). Процессор кластера немного быстрее, но итерация по всей карте все равно занимает около 40 секунд. Теперь несколько слов о логике работы программы. Представленная логика является последовательной, то есть ни один поток не может перейти к следующему шагу, пока все потоки на предыдущем шаге не будут завершены:

  1. Create the hash map, create the keys and populate the hash map
  2. Iterate over entire hash map visiting all the keys.
  3. Do some data shuffling which is parallel insertions and deletions.
  4. Repeat step 2 and 3 a few hundred times.

Этот логический поток означает, что 40-секундная итерация будет повторяться несколько сотен раз, скажем, 100. Это дает нам чуть больше часа, потраченного только на посещение узлов. С набором из 32 параллельных итераторов это может сократиться до нескольких минут, что является значительным улучшением производительности.

Теперь несколько слов о том, как работает ConcurrentHashMap (или как я думаю, что это работает). Каждый ConcurrentHashMap состоит из сегментов (по умолчанию 16). Каждая запись в хэш-карту синхронизируется в соответствующем сегменте. Итак, скажем, мы пытаемся записать два новых ключа k1 и k2 в хеш-карту и что они будут разрешены как принадлежащие одному и тому же сегменту, скажем, s1. Если их попытаются записать одновременно, один из них получит блокировку первым и будет добавлен раньше, чем другой. Какова вероятность того, что два элемента будут принадлежать одному и тому же сегменту? В случае, если у нас есть хорошая хеш-функция и 16 сегментов, это 1/16.

Я считаю, что ConcurrentHashMap должен иметь метод concurrentKeys(), который будет возвращать массив Enumerations, по одному на каждый сегмент. У меня есть несколько идей, как добавить его в ConcurrentHashMap через наследование, и я дам вам знать, если у меня получится. На данный момент решение, по-видимому, заключается в создании массива ConcurrentHashMaps и предварительном хэшировании каждого ключа для разрешения одного члена такого массива. Я также поделюсь этим кодом, как только он будет готов.

ИЗМЕНИТЬ

Та же проблема на другом языке:

Параллельные итераторы


person Adam Kurkiewicz    schedule 08.11.2013    source источник
comment
Почему вы не можете просто поместить элементы в какой-либо тип BlockingQueue и заставить рабочие потоки начинать их обработку по мере их добавления?   -  person AngerClown    schedule 08.11.2013
comment
Есть две причины против этого. Во-первых, мне нужен постоянный временной доступ к каждому элементу для вещей, которые происходят в программе после того, как я посетил все узлы. Во-вторых, когда я начинаю работать с ConcurrentHashMap, у меня уже есть все в памяти, поэтому я не могу заставить рабочий поток работать с элементами по мере их размещения.   -  person Adam Kurkiewicz    schedule 08.11.2013


Ответы (3)


Я мог бы попытаться наследоваться от ConcurrentHashMap, получить в свои руки экземпляры его внутреннего сегмента, попытаться сгруппировать их в 32 группы и работать с каждой группой отдельно. Хотя это звучит как хардкорный подход.

Хардкор действительно, но это единственное, что я видел, что будет работать. toArray() строит массив, выполняя перечисление, поэтому здесь нет выигрыша. Я не могу поверить, что синхронизированный HashSet будет лучше, если только соотношение visit() прогонов к другим операциям с картой не будет достаточно высоким.

Проблема с работой с Segments заключается в том, что вам придется быть чрезвычайно осторожным, чтобы ваш код был устойчивым, потому что я предполагаю, что другие потоки могут изменять таблицу в то же время, когда вы посещаете узлы, и вам нужно избежать неизбежных условия гонки. Деликатный наверняка.

У меня большой вопрос, нужно ли это? Профилировщик или временные прогоны показали вам, что это занимает слишком много времени для visit() каждого из ключей в одном потоке? Вы пытались создать пул потоков для каждого вызова visit() и иметь один поток, выполняющий перечисление, и потоки пула, выполняющие visit()?

person Gray    schedule 08.11.2013
comment
Чем больше исходного кода я читаю, тем менее хардкорным он кажется. Единственная проблема заключается в том, что большинство внутренних классов ConcurrentHashMap на самом деле финальные, поэтому мне нужно разветвить код. Я бы не стал сильно беспокоиться о параллелизме здесь, поскольку, когда я посещаю узлы, я не делаю никаких вставок/удалений. Только после того, как я заканчиваю посещать их, я начинаю перетасовывать вещи. Перечисление - это последняя однопоточная вещь, которую я делаю в этой программе, все остальное было хорошо многопоточным, поэтому, если это не требует слишком много усилий, я бы хотел, чтобы это тоже было сделано. - person Adam Kurkiewicz; 08.11.2013
comment
Я бы все же убедился, что у вас есть проблемы с производительностью @AdamKurkiewicz, прежде чем вы пойдете по этому пути. У меня есть несколько относительно больших распределенных многопоточных карт, на которых у меня все еще есть фоновые задачи, которые проходят через однопоточность. - person Gray; 08.11.2013
comment
@AdamKurkiewicz: Tbh, потому что другие вещи многопоточные, кажется довольно плохой причиной для создания настраиваемого класса стандартной библиотеки. Во-первых, многопоточность не означает более высокую производительность. Если вы просто хотите поэкспериментировать или развлечься, я полностью за, но если это производственный код, а я был вашим коллегой, и вы дали мне такую ​​причину, я должен сказать, что я был бы очень расстроен. Просто говорю... - person Enno Shioji; 08.11.2013
comment
@Gray, пожалуйста, прочитайте мое редактирование, я добавил немного профилирования и еще одну идею для решения. - person Adam Kurkiewicz; 09.11.2013
comment
@EnnoShioji Возможно, мне удастся настроить его с помощью обычного наследования, так что в конце концов это может быть не так уж плохо. - person Adam Kurkiewicz; 09.11.2013

На вашем месте я бы просто попробовал сначала повторить набор ключей ConcurrentHashMap. Вы можете попробовать передать обработку ключей в пул потоков (в пакетах, если задача слишком легкая) или даже в задачу ForkJoin, но вы должны делать это, только если это действительно необходимо.

Сказав, что вы можете использовать ConcurrentSkipListMap, в котором вы можете получить NavigableSet ключей. Затем вы можете удалить из него разделы, используя метод subSet. Однако ConcurrentHashMap будет иметь лучшую производительность для операций put, get (обратите внимание, что будет использоваться compareTo, а не hashCode). Ситуации, когда это лучше, кажутся маловероятными.

person Enno Shioji    schedule 08.11.2013
comment
Я не знал о ConcurrentSkipListMap, но, похоже, это совсем другой вариант использования. И log(n) получает и ставит, вероятно, приведет к падению производительности. - person Adam Kurkiewicz; 08.11.2013

Решение, которое я в конечном итоге выберу, - это массив ConcurrentHashMaps вместо одного ConcurrentHashMap. Это специально, но, похоже, имеет отношение к моему варианту использования. Меня не волнует медленный второй шаг, поскольку он не влияет на производительность моего кода. Решение:

Создание объекта:

  1. Create an array of size t of ConcurrentHashMaps, where t is a number of threads.
  2. Create an array of Runnables, also of size t.

Население массива (однопоточный, не проблема):

  1. Create the keys and apply pre-hash function, which will return an int in the range 0 ... t-1. In my case simply modulo t.
  2. Put the key in the hashmap, by accessing appropriate entry in the array. E.g. if the pre-hash has resulted in index 4, then go for hashArray[4].put(key)

Итерация массива (хорошо многопоточная, прирост производительности):

  1. Assign every thread from Runnables array a job of iterating over the hashmap with a corresponding index. This should give give a t times shorter iteration as opposed to single threaded.

Чтобы увидеть код проверки концепции (поскольку у него есть некоторые зависимости от проекта, который я не могу опубликовать здесь), перейдите по адресу мой проект на github

ИЗМЕНИТЬ

На самом деле, реализация приведенного выше доказательства концепции для моей системы оказалась трудоемкой, подверженной ошибкам и крайне разочаровывающей. Кроме того, я обнаружил, что упустил бы многие функции стандартной библиотеки ConcurrentHashMap. Недавно я исследовал решение, которое выглядит гораздо менее специфичным и гораздо более многообещающим, — это использование Scala, который создает байт-код, полностью совместимый с Java. Доказательство концепции основано на потрясающей библиотеке, описанной в этой статье и, насколько мне известно, в ней в настоящее время НЕВОЗМОЖНО достичь соответствующего решения в vanilla Java без написания тысяч строк кода, учитывая текущее состояние стандартной библиотеки и соответствующих сторонних библиотек.

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}
person Adam Kurkiewicz    schedule 09.11.2013