Я хочу использовать Stream
для распараллеливания обработки разнородного набора удаленно хранимых файлов JSON с неизвестным номером (количество файлов заранее неизвестно). Файлы могут сильно различаться по размеру: от 1 записи JSON на файл до 100 000 записей в некоторых других файлах. Запись JSON в данном случае означает автономный объект JSON, представленный одной строкой в файле.
Я действительно хочу использовать Streams для этого, поэтому я реализовал это Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Проблема, с которой я сталкиваюсь, заключается в том, что, хотя Stream сначала прекрасно распараллеливается, в конечном итоге самый большой файл обрабатывается в одном потоке. Я считаю, что ближайшая причина хорошо задокументирована: сплитератор «несбалансирован».
Более конкретно, похоже, что метод trySplit
не вызывается после определенного момента в жизненном цикле Stream.forEach
, поэтому дополнительная логика для распределения небольших пакетов в конце trySplit
выполняется редко.
Обратите внимание, что все разделители, возвращенные из trySplit, используют один и тот же итератор paths
. Я думал, что это действительно умный способ сбалансировать работу между всеми разделителями, но этого недостаточно для достижения полного параллелизма.
Я хотел бы, чтобы параллельная обработка выполнялась сначала по файлам, а затем, когда несколько больших файлов все еще остаются разделенными, я хочу распараллелить фрагменты оставшихся файлов. Это было целью блока else
в конце trySplit
.
Есть ли легкий/простой/канонический способ обойти эту проблему?
Long.MAX_VALUE
вызывает чрезмерное и ненужное разбиение, в то время как любая оценка, отличная отLong.MAX_VALUE
, приводит к остановке дальнейшего разбиения, убивая параллелизм. Возврат набора точных оценок, по-видимому, не приводит к какой-либо разумной оптимизации. - person Alex R   schedule 10.11.2019AbstractSpliterator
, но переопределяетеtrySplit()
, что является плохой комбинацией для всего, кромеLong.MAX_VALUE
, поскольку вы не адаптируете оценку размера вtrySplit()
. ПослеtrySplit()
оценка размера должна быть уменьшена на количество отщепленных элементов. - person Holger   schedule 11.11.2019