Apache beam - это инструмент обработки данных с открытым исходным кодом, который обеспечивает унифицированную модель как для пакетных, так и для потоковых конвейеров данных.

Beam полезен для задач параллельной обработки данных, в которых проблема может быть разложена на множество более мелких пакетов данных, которые можно обрабатывать независимо и параллельно. Его также можно использовать в задачах ETL.

Луч Apache построен на Scala, но поддерживает SDK для Java, Python и GO. SDK для лучей предоставляют унифицированную модель программирования, которую можно использовать как для ограниченного (пакетного), так и для неограниченного (потокового) набора данных.

Используя один из пакетов SDK Beam с открытым исходным кодом, мы можем создать программу, которая определяет конвейер. Затем этот конвейер транслируется обработчиками Beam в API, совместимый с выбранной нами серверной частью распределенной обработки.

В настоящее время Beam поддерживает Direct runner, Apache Flink runner, Apache Spark runner, Google Cloud Data Flow runner, Apache Nemo runner, Apache Samza runner, Hazlecast Jet runner и Twister2.

Прямой бегун используется для локального тестирования и отладки.

Компоненты Apache Beam

  • PCollection - представляет собой набор данных, который может быть фиксированным пакетом или потоком данных.
  • PTransform - операция обработки данных, которая принимает одну или несколько коллекций PCollections и выводит ноль или несколько коллекций PCollections.
  • Конвейер - представляет собой направленный ациклический граф PCollection и Transform и, следовательно, инкапсулирует всю работу по обработке данных.
  • Преобразования ввода-вывода - преобразования, которые читают или записывают данные.

Как работает балка?

  • Создайте объект конвейера и задайте параметры выполнения конвейера, включая конвейер-исполнитель.
  • Создайте начальную коллекцию PCollection для данных конвейера.
  • Примените PTransforms к каждой коллекции ПК.
  • Используйте операции ввода-вывода, чтобы записать окончательные преобразованные коллекции PCollection во внешний источник.
  • Запустите трубопровод с помощью назначенного бегунка трубопровода.

Пример кода на Python

Ниже приведен пример кода Python, который считает слова в тексте.

with beam.Pipeline(options=PipelineOptions()) as p:
file = '../data/kinglear.txt'
output_file = '../data/output.txt'
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(file)
split_lines = lines | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
counts = split_lines | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(output_file)
  1. Сначала мы создаем конвейер с параметрами конвейера, и весь код от начала до конца упоминается внутри него.
with beam.Pipeline(options=PipelineOptions()) as p:

Если бегун не указан, то по умолчанию выбирается прямой бегун.

2. Затем мы читаем входной файл и создаем lines PCollection.

lines = p | 'Read' >> ReadFromText(file)

3. Это преобразование разбивает строки в PCollection<String>, где каждый элемент представляет собой отдельное слово в тексте.

split_lines = lines | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

4. Затем применяются два преобразования: одно объединяет каждое слово, а другое подсчитывает каждое слово для каждой клавиши.

counts = split_lines | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum)

5. Наконец, мы выполняем преобразование ввода-вывода, в котором мы берем кортеж слова и его количество и сохраняем в выходном тексте.

output | 'Write' >> WriteToText(output_file)

Выходные файлы будут доступны по пути назначения.

Преимущества луча

  • Объединение как пакетного, так и потокового API в рамках единого API - с минимальными изменениями кода мы заставляем один и тот же код работать как для потоковой передачи, так и для пакетной передачи данных.
  • Переносимость между средами выполнения. Первоначально, если задачи луча выполняются в Spark runner, переключиться на Google Data Flow runner чрезвычайно просто.
  • API повышают уровень абстракции - сосредоточьтесь на нашей логике, а не на основных деталях.

Код для этого доступен на GitHub.

Удачного обучения!