В реальном мире данные передаются разными способами. Когда дело доходит до анализа этих данных, мы можем использовать пакетную аналитику, аналитику в реальном времени, интерактивную аналитику и прогнозную аналитику. Эта статья расскажет, как можно выполнить прогнозный анализ с помощью WSO2 CEP.

В предиктивной аналитике он проходит разные фазы. Во-первых, мы должны собрать данные, относящиеся к сценарию, который мы собираемся обработать. Затем необходимо провести некоторый анализ этих данных, чтобы проверить их поведенческие модели. Затем мы можем смоделировать сценарий с очищенными данными. После того, как часть моделирования будет завершена, мы можем перейти к этапу прогнозирования по мере поступления данных.

В следующем руководстве будет описано, как моделировать сценарий и делать прогнозы в реальном времени с помощью продуктов WSO2.

Предварительные требования

  1. Панды
  2. H2o.ai
  3. WSO2 CEP
  4. Набор данных

Настройка среды

  1. Загрузите WSO2 CEP и распакуйте его.
  2. Установите h2o.ai в свою среду Python.
  3. Поместите h2o_genmodel.jar в ‹CEP_HOME› / repository / components / lib.
  4. Поместите featureeng-1.0-SNAPSHOT.jar в ‹CEP_HOME› / repository / components / dropins.
  5. Поместите h2opojo-1.0-SNAPSHOT.jar в ‹CEP_HOME› / repository / components / dropins.
  6. Установите pandas в свою среду Python.
  7. Добавьте пакеты featureeng и h2omodelext в свой проект python.

1) Смоделируйте сценарий

Образец данных, который мы собираемся обсудить, - это электростанция с комбинированным циклом. Этот набор данных доступен по адресу

Https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant

Перед обучением модели машинного обучения иногда требуется выполнить некоторый процесс разработки функций для данного набора данных. Эти функции помогут алгоритмам машинного обучения выявлять закономерности, скрытые за данными. Используйте пакет featureeng, чтобы создать больше функций для вашего набора данных.

Здесь мы рассчитали скользящее среднее для всех функций.

import pandas as pd
from featureeng import Frame

# Load csv to pandas frame
data = pd.read_csv('ccpp.csv')

# Create feature processor frame
frame = Frame(data)
columns = ['AT', 'V', 'AP', 'RH']

# Apply feature engineering for each column
for column in columns:
frame.apply_moving_average(input_column=column, window=5)

# Reorder columns in the dataset
column_order = [ 'AT', 'V', 'AP', 'RH', 'AT_ma_5', 'V_ma_5', 'AP_ma_5', 'RH_ma_5', 'PE']
frame.order_columns(column_names=column_order)

# Output modified data frame
frame.save_file(file_name='modified_ccpp.csv')

В конце программы измененный набор данных сохраняется как «modified_ccpp, csv». Теперь ваш набор данных готов к обучению. Здесь мы используем H2O.ai для обучения модели. Фрагмент кода для части, генерирующей модель, приведен ниже.

import h2o
from h2o.estimators import H2ORandomForestEstimator
from h2omodelext import ModelWrapper

# Initialize h2o instance
h2o.init()

# Read csv file
data = h2o.import_file('modified_ccpp.csv')

# Split data into train and test
train, test = data.split_frame(ratios=[0.8])

# Define model
model = H2ORandomForestEstimator(ntrees=50, max_depth=20, nbins=100)

# Define input and response columns
input_columns = ['AT', 'V', 'AP', 'RH', 'AT_ma_5', 'V_ma_5', 'AP_ma_5', 'RH_ma_5']
response_column = 'PE'

# Train model
model.train(x=input_columns, y=response_column, training_frame=train)
print model.model_performance(test_data=test)

# Save model
ModelWrapper.save_model(path='/home/wso2123/PycharmProjects/FeatureProcessor', model=model)

Метод save_model () скомпилирует выходной файл java модели и сохранит сгенерированные файлы классов в папке, названной по имени модели.

Примечание :

MODEL_PATH = Расположение этой папки модели

Теперь мы завершили первую фазу потока. Тогда пришло время заняться потоками данных в реальном времени. Запустите сервер WSO2 CEP.

2) Прогнозирование в реальном времени

Настройка потоков

1) Поток ввода_данных

Источник данных содержит 4 разных значения. Температура (T), вакуум (V), давление окружающей среды (AP) и относительная влажность (RH). Нам нужно 4 потока для передачи этих данных в систему.

Потоки ›Добавить поток событий

Определите каждый атрибут, который содержит ваша полезная нагрузка.

Наконец, нажмите кнопку «Добавить поток событий», чтобы продолжить создание потока.

2) поток feature_engineered

Модель, которую мы обучили, требует 8 входных потоков для генерации прогноза. Итак, нам нужно создать поток, который может содержать 8 атрибутов.

3) поток prediction_out

Наконец, нам нужно вывести результат прогноза. Обычно результат прогноза передается в виде строки. По результатам прогноза вы можете решить, какие атрибуты следует опубликовать.

Чтобы увидеть, что вы выводите, давайте поместим издателя регистратора в поток prediction_out. Он отобразит результаты на вашей консоли.

Теперь вы настроили базовый план для своей среды выполнения. Пора составить план выполнения. Давайте сначала создадим план выполнения для обработки функций. Затем мы можем создать еще один для предсказания.

План выполнения обработки функций

/* Enter a unique ExecutionPlan */
@Plan:name(‘FeatureProcessing’)

/* Enter a unique description for ExecutionPlan */
 — @Plan:description(‘ExecutionPlan’)

/* define streams/tables and write queries here … */
@Import(‘data_input:1.0.0’)
define stream data_input (T double, V double, AP double, RH double);

@Export(‘feature_engineered:1.0.0’)
define stream data_output (T double, V double, AP double, RH double, T_5 double, V_5 double, AP_5 double, RH_5 double);

from data_input#window.length(5)
select T, V, AP, RH, featureeng:movavg(5, T) as T_5, featureeng:movavg(5, V) as V_5, featureeng:movavg(5, AP) as AP_5, featureeng:movavg(5, RH) as RH_5
insert into data_output

План выполнения механизма прогнозирования

Ccpp / DRF_model_python_1479702792496_1 - путь, по которому находится модель.

/* Enter a unique ExecutionPlan */
@Plan:name(‘PredictionEngine’)

/* Enter a unique description for ExecutionPlan */
 — @Plan:description(‘ExecutionPlan’)

/* define streams/tables and write queries here … */
@Import(‘feature_engineered:1.0.0’)
define stream data_input (T double, V double, AP double, RH double, T_5 double, V_5 double, AP_5 double, RH_5 double);

@Export(‘prediction_out:1.0.0’)
define stream data_output (T double, V double, AP double, RH double, prediction string);

from data_input#h2opojo:predict(‘ccpp/DRF_model_python_1479702792496_1’)
select T, V, AP, RH, prediction
insert into data_output

После добавления этих планов выполнения финальный поток будет таким.