Как использовать Scala XML с Apache Flink?

Я пытаюсь использовать библиотеку Scala XML в Flink для анализа XML, и я не могу заставить ее работать. Обратите внимание, что мне нужно использовать как сериализованную, так и несериализованную (строковую) версию в моем коде в одной и той же функции обработки.

Я пробовал уже разные решения, они всегда работают в IntelliJ, но не когда я запускаю их в кластере Flink. Они всегда возвращают разные java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser; Я пробовал несколько вещей, но все равно получаю ошибку, похожую на эту.

Это пример того, как выглядит моя работа Flink:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}

Это пример моей функции обработки:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}
object Processor {
  val factory: SAXParserFactory = SAXParserFactory.newInstance
  val SAXParser: SAXParser = factory.newSAXParser
  val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

и, наконец, это мой pom.xml, использующий плагин maven-shade, чтобы передать банку, чтобы она мигала:

        <!-- other sections of the pom are excluded -->
        <properties>
            <flink.version>1.7.0</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.8</scala.version>
        </properties>
        <!-- other sections of the pom are excluded -->
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api-scala_2.12</artifactId>
            <version>11.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
        <!-- other sections of the pom are excluded -->
<build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        <!-- other sections of the pom are excluded -->

Я считаю, что проблема как-то связана с реализацией, которая будет использоваться для SAXParser, который Flink использует во время выполнения. Я также попытался использовать аннотацию @transient, чтобы предотвратить сохранение полей из Flink, но безуспешно.

Однако я не совсем понимаю, что именно происходит, кто-нибудь знает, как предотвратить ошибку и что пошло не так?


person Jac    schedule 31.01.2019    source источник
comment
Ваша проблема может быть связана с этим   -  person FrontTheMachine    schedule 04.02.2019
comment
Похоже, но я не вижу подходящего решения для моего варианта использования.   -  person Jac    schedule 04.02.2019
comment
Есть ли причина, по которой вы исключаете org.apache.flink:force-shading? Затенение обычно помогает от подобных ошибок времени выполнения.   -  person Richard Deurwaarder    schedule 05.02.2019
comment
Потому что я уже выполняю под flink, поэтому он мне не нужен на jar.   -  person Jac    schedule 08.02.2019


Ответы (1)


Через некоторое время я понял, что с ним не так.

В документах Scala XML говорится:

В Scala 2.11 и более поздних версиях добавьте следующее в файл build.sbt libraryDependencies:

"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

Что в Maven переводится как:

<dependency>
    <groupId>org.scala-lang.modules</groupId>
    <artifactId>scala-xml_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

Что ж, похоже, эта зависимость не нужна, поскольку, хотя Flink 1.7.2, похоже, использует Scala 2.12.8, он все еще сохраняет Scala XML внутри своего дистрибутива (следовательно, в пути к классам), я считаю, что это может вызвать проблемы, в какой класс действительно загружен и если все правильно, то это может не быть решением ошибки связи.

Решение ошибки связывания на самом деле заключается в использовании собственного RichMapFunction[InputT, OutputT] Flink:

class Processor extends RichMapFunction[String, String] {
  var factory: SAXParserFactory = _
  var SAXParser: SAXParser = _
  var xmlLoader: XMLLoader[Elem] = _

  override def open(parameters: Configuration): Unit = {
    factory = SAXParserFactory.newInstance
    SAXParser = factory.newSAXParser
    xmlLoader = XML.withSAXParser(SAXParser)
  }

  override def map(translatedMessage: String): String = {
    val xml = xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}

Как говорится в JavaDoc:

Метод инициализации функции.

Он вызывается перед собственно рабочими методами (например, map или join) и поэтому подходит для одноразовой настройки. Для функций, которые являются частью итерации, этот метод будет вызываться в начале каждого супершага итерации.

К сожалению, в этом случае предпочтительнее использовать var, поскольку инициализацию значений / переменных должен выполнять Flink, предотвращающий ошибку связывания во время выполнения.

Некоторые примечания:

  • Я понял, что это может произойти только на DataStream[T], а не на DataSet[T].
  • В задании должно быть установлено более 1 параллелизма, чтобы несколько диспетчеров задач загружали один и тот же класс. Если это делается в среде IDE, это может быть сложно, объяснил здесь.
  • После того, как вы заметили причину этой проблемы, кажется, что сопутствующие объекты могут не подходить для использования Flink.
  • Эта последняя часть может быть хорошей примечанием для размещения на странице «Расширения Scala API» Flink, где также объясняется, как Flink обычно не поддерживает анонимные функции сопоставления шаблонов для деконструкции кортежей, если не используется расширение Flink Scala API: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html
person Jac    schedule 03.04.2019