Spark Job не может подключиться к Cassandra

Я пытаюсь подключиться к Cassandra с помощью разъема Cassandra для Spark, задание spark выполняется на EMR.

Ниже мой код

  public class SparkCassandraDriver implements Serializable {
    private transient SparkConf conf;

    private SparkCassandraDriver(SparkConf conf) {
        this.conf = conf;

    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
    //    generateData(sc);
        connectToCassandra(sc);


        sc.stop();
    }

    private void connectToCassandra(JavaSparkContext sc) {
         CassandraConnector connector = CassandraConnector.apply(sc.getConf());
         System.out.println("Conencted is " + sc.getConf().get("spark.cassandra.connection.host"));

         Session session = connector.openSession();
         session.execute("USE dmp");


         ResultSet rs = session.execute(
                    "SELECT XYZ FROM XYZ");

         Iterator<Row> it = rs.iterator();

         while(it.hasNext()){

             System.out.println("it issssssss " +it.next());
         }
         session.close();

    }

    public static void main(String[] args) {


        SparkConf conf = new SparkConf();
        conf.setAppName("Spark-Cassandra Integration");
        conf.setMaster("yarn-cluster");
        conf.set("spark.cassandra.connection.host", "PUBLIC IP");
        conf.set("spark.cassandra.connection.rpc.port", "9042");
        conf.set("spark.cassandra.connection.timeout_ms", "40000");
        conf.set("spark.cassandra.read.timeout_ms", "200000");


        conf.set("spark.cassandra.auth.username", "username");
        conf.set("spark.cassandra.auth.password", "password");


        SparkCassandraDriver app = new SparkCassandraDriver(conf);
        app.run();
    }

 }

Помпон, который я использую,

    <dependency>
       <groupId>org.apache-extras.cassandra-jdbc</groupId>
       <artifactId>cassandra-jdbc</artifactId>
       <version>1.2.5</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>  

Но я получаю следующую ошибку.

   16/02/22 16:47:20 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Failed to open native connection to Cassandra at {54.166.142.199}:9042
  java.io.IOException: Failed to open native connection to Cassandra at {54.166.142.199}:9042
      at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraCon
   nector.scala:176)
   at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
   at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
   at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
   at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
at com.mobi.vserv.driver.SparkCassandraDriver.connectToCassandra(SparkCassandraDriver.java:55)
   at com.mobi.vserv.driver.SparkCassandraDriver.run(SparkCassandraDriver.java:45)
   at com.mobi.vserv.driver.SparkCassandraDriver.main(SparkCassandraDriver.java:90)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
 Caused by:    com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /54.166.142.199:9042 (com.d
    atastax.driver.core.TransportException: [/54.166.142.199:9042] Cannot connect))
     at                       com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
     at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
   at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230)
   at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333)
   at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169)

Пожалуйста, помогите мне в решении этого:

Спасибо


person Rahul Koshaley    schedule 22.02.2016    source источник
comment
Совершенно ясно, что он не может подключиться к 9024 по вашему IP-адресу. Что вы сделали, чтобы определить, действительно ли 9042 запущен и установлен?   -  person apesa    schedule 23.02.2016
comment
Да, я могу подключиться по телнету, и мой локальный искровой код может подключиться к кассандре.   -  person Rahul Koshaley    schedule 25.02.2016


Ответы (1)


Наконец я нашел решение, я добавил группу безопасности (EMR-slaves) в группу безопасности Cassandra, которой там не было.

И это решило проблему.

С уважением,

Рахул

person Rahul Koshaley    schedule 01.03.2016