Apache beam - это инструмент обработки данных с открытым исходным кодом, который обеспечивает унифицированную модель как для пакетных, так и для потоковых конвейеров данных.
Beam полезен для задач параллельной обработки данных, в которых проблема может быть разложена на множество более мелких пакетов данных, которые можно обрабатывать независимо и параллельно. Его также можно использовать в задачах ETL.
Луч Apache построен на Scala, но поддерживает SDK для Java, Python и GO. SDK для лучей предоставляют унифицированную модель программирования, которую можно использовать как для ограниченного (пакетного), так и для неограниченного (потокового) набора данных.
Используя один из пакетов SDK Beam с открытым исходным кодом, мы можем создать программу, которая определяет конвейер. Затем этот конвейер транслируется обработчиками Beam в API, совместимый с выбранной нами серверной частью распределенной обработки.
В настоящее время Beam поддерживает Direct runner, Apache Flink runner, Apache Spark runner, Google Cloud Data Flow runner, Apache Nemo runner, Apache Samza runner, Hazlecast Jet runner и Twister2.
Прямой бегун используется для локального тестирования и отладки.
Компоненты Apache Beam
- PCollection - представляет собой набор данных, который может быть фиксированным пакетом или потоком данных.
- PTransform - операция обработки данных, которая принимает одну или несколько коллекций PCollections и выводит ноль или несколько коллекций PCollections.
- Конвейер - представляет собой направленный ациклический граф PCollection и Transform и, следовательно, инкапсулирует всю работу по обработке данных.
- Преобразования ввода-вывода - преобразования, которые читают или записывают данные.
Как работает балка?
- Создайте объект конвейера и задайте параметры выполнения конвейера, включая конвейер-исполнитель.
- Создайте начальную коллекцию PCollection для данных конвейера.
- Примените PTransforms к каждой коллекции ПК.
- Используйте операции ввода-вывода, чтобы записать окончательные преобразованные коллекции PCollection во внешний источник.
- Запустите трубопровод с помощью назначенного бегунка трубопровода.
Пример кода на Python
Ниже приведен пример кода Python, который считает слова в тексте.
with beam.Pipeline(options=PipelineOptions()) as p: file = '../data/kinglear.txt' output_file = '../data/output.txt' # Read the text file[pattern] into a PCollection. lines = p | 'Read' >> ReadFromText(file) split_lines = lines | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) counts = split_lines | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum) output = counts | 'Format' >> beam.MapTuple(format_result) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | 'Write' >> WriteToText(output_file)
- Сначала мы создаем конвейер с параметрами конвейера, и весь код от начала до конца упоминается внутри него.
with beam.Pipeline(options=PipelineOptions()) as p:
Если бегун не указан, то по умолчанию выбирается прямой бегун.
2. Затем мы читаем входной файл и создаем lines
PCollection.
lines = p | 'Read' >> ReadFromText(file)
3. Это преобразование разбивает строки в PCollection<String>
, где каждый элемент представляет собой отдельное слово в тексте.
split_lines = lines | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
4. Затем применяются два преобразования: одно объединяет каждое слово, а другое подсчитывает каждое слово для каждой клавиши.
counts = split_lines | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum)
5. Наконец, мы выполняем преобразование ввода-вывода, в котором мы берем кортеж слова и его количество и сохраняем в выходном тексте.
output | 'Write' >> WriteToText(output_file)
Выходные файлы будут доступны по пути назначения.
Преимущества луча
- Объединение как пакетного, так и потокового API в рамках единого API - с минимальными изменениями кода мы заставляем один и тот же код работать как для потоковой передачи, так и для пакетной передачи данных.
- Переносимость между средами выполнения. Первоначально, если задачи луча выполняются в Spark runner, переключиться на Google Data Flow runner чрезвычайно просто.
- API повышают уровень абстракции - сосредоточьтесь на нашей логике, а не на основных деталях.
Код для этого доступен на GitHub.
Удачного обучения!