Flink Pipeline выглядит следующим образом:
- читать сообщения (строку) из темы кафка.
- сопоставление с образцом через преобразование grok в формат json.
- Агрегации по временному окну по извлеченному из json полю.
Ниже приведен код для сопоставления с образцом с использованием Grok.
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
В приведенном выше коде запись grok.compile(pattern)
внутри функции карты работает нормально. В противном случае возникает следующая ошибка
Реализация MapFunction не сериализуема.
Вызвано: java.io.NotSerializableException: com.google.code.regexp.Pattern
Есть ли способ удалить grok.compile за пределы карты? Насколько я понимаю, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.
PS: Я импортировал пакет oi.thekraken.grok.api.Grok
РЕДАКТИРОВАТЬ:
Я просмотрел реализацию Grok, а класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java