Spark выдает исключение NullPointerException в JdbcRDD в кластере с двумя рабочими процессами.

Я запускаю искровой кластер с двумя рабочими, каждый по 60 ГБ.

Я написал ниже код для JdbcRDD.

String sql   = "SELECT * FROM( SELECT ROW_NUMBER() OVER(ORDER BY (SELECT 1)) AS Row,"+ 
               " * FROM [Table_1]) A WHERE Row >= ? AND Row <= ? ";

    SparkContext sctx   = new SparkContext(getSparkConf());
    try {
        JdbcRDD<List> jdbcRdd = new JdbcRDD(sctx,new GetJDBCConnection(),sql,0, rowCount, 200, new GetJDBCResult(),scala.reflect.ClassTag$.MODULE$.AnyRef());

        Object[] bb = (Object[])jdbcRdd.collect();

        System.out.println("Length of Object array : "+bb.length);
        System.out.println("JdbcRDD:- "+bb);
    } catch(Exception e) {
        e.printStackTrace();
    }

и код для GetJdbcResult

class GetJDBCResult extends AbstractFunction1<ResultSet, List> implements Serializable{

private static final long serialVersionUID = -78825308090L;
 public List apply(ResultSet rs) {
    Object result = null;

    List lst = new ArrayList();
    try {
        System.out.println("In apply method");
        System.out.println("resultSet : -"+rs);
        int cols  = rs.getMetaData().getColumnCount();
        System.out.println("no of columns : "+cols);
        for(int i=1;i<=cols;i++) {
            result = rs.getObject(i);

            System.out.println("Object : -"+result);
            lst.add(result);
        }
        System.out.println("result->" + lst);
    } catch (SQLException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return lst;         
}

}

Приведенный выше код работает нормально. Я запускаю Spark в автономном режиме (локальный *), но если использовать кластерную среду, он выдает следующую ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 39, DD1AI7511): java.lang.NullPointerException: 
    org.apache.spark.rdd.JdbcRDD$$anon$1.<init>(JdbcRDD.scala:74)
    org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70)
    org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    java.lang.Thread.run(Thread.java:722)

Трассировка стека драйвера:

В журналах Worker нет следов/журналов. Я делаю что-то не так здесь? У кого-нибудь есть идеи?


person jammer    schedule 18.12.2014    source источник
comment
Как выглядит GetJDBCConnection? Это соединение, к которому рабочие в вашем кластере имеют доступ?   -  person Mike Park    schedule 21.12.2014


Ответы (1)


Если вы посмотрите на строку 74 в JdbcRDD, станет ясно, что это связано с нулевым подключением к базе данных.

https://github.com/apache/spark/blob/655699f8b7156e8216431393436368e80626cdb2/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

Поскольку он работает локально, но не в кластере, проверьте, имеют ли экземпляры вашего кластера доступ к базе данных. Если это в EC2, убедитесь, что правила вашего брандмауэра верны. Также убедитесь, что искровой кластер работает в том же облаке VPC, что и ваша база данных. Лучший способ проверить это — подключиться по SSH к одному из ваших подчиненных узлов Spark и посмотреть, сможете ли вы удаленно подключиться к базе данных оттуда.

person Sujee    schedule 29.12.2014