Ошибка при запуске Cassandra из Spark в Java — NoClassDefFoundError на org.apache.spark.sql.catalyst

Я использую Cassandra 3.0.3, Spark 1.6.0 и пытаюсь запустить, комбинируя код из старой документации в http://www.datastax.com/dev/blog/accessing-cassandra-from-spark.-in-java и новый в https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md.

Вот мой файл pom.xml

<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>muhrafifm</groupId>
 <artifactId>spark-cass-twitterdw</artifactId>
 <version>1.0</version>
 <packaging>jar</packaging>
 <build>
    <plugins>
      <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.0</version>
          <configuration>
              <source>1.7</source>
              <target>1.7</target>
          </configuration>
      </plugin>
    </plugins>
</build>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>        
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
        <type>jar</type>    
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
     </dependency>
</dependencies>

The changes I made are basically in method javaFunction, and here is one of the method after I change the javaFunction according to the new documentation. I've also included import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

private void generateData(JavaSparkContext sc) {
    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    // Prepare the schema
    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
    }

    // Prepare the products hierarchy
    List<Product> products = Arrays.asList(
            new Product(0, "All products", Collections.<Integer>emptyList()),
            new Product(1, "Product A", Arrays.asList(0)),
            new Product(4, "Product A1", Arrays.asList(0, 1)),
            new Product(5, "Product A2", Arrays.asList(0, 1)),
            new Product(2, "Product B", Arrays.asList(0)),
            new Product(6, "Product B1", Arrays.asList(0, 2)),
            new Product(7, "Product B2", Arrays.asList(0, 2)),
            new Product(3, "Product C", Arrays.asList(0)),
            new Product(8, "Product C1", Arrays.asList(0, 3)),
            new Product(9, "Product C2", Arrays.asList(0, 3))
    );

    JavaRDD<Product> productsRDD = sc.parallelize(products);       
    javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();

    JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
        @Override
        public Boolean call(Product product) throws Exception {
            return product.getParents().size() == 2;
        }
    }).flatMap(new FlatMapFunction<Product, Sale>() {
        @Override
        public Iterable<Sale> call(Product product) throws Exception {
            Random random = new Random();
            List<Sale> sales = new ArrayList<>(1000);
            for (int i = 0; i < 1000; i++) {
                sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
            }
            return sales;
        }
    });
    javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}

И вот ошибка, которую я получил.

16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
    at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
    at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-    16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE

Есть ли что-то, что я сделал неправильно? похоже, нужен пакет, который я даже не использую, можно ли это исправить? или мне следует использовать предыдущую версию cassandra-spark-connector?

Любой ответ приветствуется, спасибо.


person M.R. Murazza    schedule 04.03.2016    source источник


Ответы (4)


Код ищет

org/apache/spark/sql/catalyst/package$ScalaReflectionLock$

Поэтому вам следует включить библиотеку spark-sql, которая имеет правильную зависимость.

person rhernando    schedule 18.03.2016
comment
Я проверил зависимости, и есть библиотека spark-sql, я пытался импортировать ее, но она ничего не делает. Как включить и убедиться, что библиотека включена? - person M.R. Murazza; 23.03.2016
comment
Либо вы помещаете библиотеку в каталог lib рабочих процессов, либо отправляете их им с параметром --jars в команде отправки искры. - person rhernando; 28.03.2016

У меня была та же проблема, и проблема заключалась в совместимости между версией Spark и соединителем Spark Cassandra. Я использовал искру 2.3, а разъем Cassandra был более старой версии.

Матрица совместимости версий доступна здесь:

https://github.com/datastax/spark-cassandra-connector

person Praveen    schedule 19.10.2018
comment
Да, у вас либо проблема с отсутствием spark-sql в области действия, либо версии spark и spark-cassandra несовместимы. - person Thiago Pereira; 08.08.2019

Это POM, который я использовал для этого приложения, и он был полностью запущен без каких-либо проблем (версия java "1.8.0_131" и javac 1.8.0_131). Полное приложение можно найти здесь. https://github.com/sunone5/BigData/tree/master/spark-cassandra-streaming


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>spark-cassandra-streaming</groupId>
    <artifactId>spark-cassandra-streaming</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector-java_2.10</artifactId>
            <version>1.6.0-M1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>    

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
person sunone5    schedule 13.10.2017

Я умею делать это успешно.

Моя версия scala 2.11.12

Ниже мой pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sashi</groupId>
    <artifactId>SalesAnalysis</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>SalesAnalysis</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency> 

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.5</version>
        </dependency>               


        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>

    </dependencies>
</project>

Это мой сценарий отправки искры:

spark-submit --class com.sashi.SalesAnalysis.CassandraSparkSalesAnalysis --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 /home/cloudera/Desktop/spark_ex/Cassandra/sales-analysis.jar
person Shashi Sharma    schedule 22.01.2019