Использование Grok в потоковой передаче flink

Flink Pipeline выглядит следующим образом:

  1. читать сообщения (строку) из темы кафка.
  2. сопоставление с образцом через преобразование grok в формат json.
  3. Агрегации по временному окну по извлеченному из json полю.

Ниже приведен код для сопоставления с образцом с использованием Grok.

    SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
                    .map(new MapFunction<String, JSONObject>() {    
                        private static final long serialVersionUID = 6;
                        
                        @Override
                        public JSONObject map(String value) throws Exception {
                            JSONObject logJson = new JSONObject();  
                            grok.compile(pattern); //pattern is some pattern defined in the class
                            Match gm = grok.match(value);
                            gm.captures();
                            logJson.putAll(gm.toMap());
                            return logJson;
                        }})

В приведенном выше коде запись grok.compile(pattern) внутри функции карты работает нормально. В противном случае возникает следующая ошибка

Реализация MapFunction не сериализуема.

Вызвано: java.io.NotSerializableException: com.google.code.regexp.Pattern

Есть ли способ удалить grok.compile за пределы карты? Насколько я понимаю, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.

PS: Я импортировал пакет oi.thekraken.grok.api.Grok

РЕДАКТИРОВАТЬ:

Я просмотрел реализацию Grok, а класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java


person user3351750    schedule 15.11.2016    source источник


Ответы (1)


Ваш код не показывает, откуда берется локальная переменная grok, но:

Flink требует, чтобы все операторы были сериализуемыми, потому что они могут перемещаться в кластере. Это также верно для всех членов операторов. Можете выложить полный нерабочий пример? Это может упростить определение того, где может произойти сбой сериализации.

Дополнительную информацию о сериализации flink можно найти в документации по flink по адресу https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- и https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

По сути, вы можете зарегистрировать сериализатор kryo для пользовательских типов или реализовать (де-) сериализацию самостоятельно, если вам нужны операторы, которые не могут быть сериализуемы напрямую.

Кстати: я думаю, вы правы, пытаясь уменьшить количество раз, когда шаблон компилируется.

person Frank Lauterwald    schedule 15.11.2016
comment
Basically, you can register a kryo serializer for custom types or implement (de-)serialization yourself if you need operator members that are not directly serializable. --- Я немного не понимаю, как сделать то же самое. - person user3351750; 16.11.2016