Я запускаю искровой кластер с двумя рабочими, каждый по 60 ГБ.
Я написал ниже код для JdbcRDD.
String sql = "SELECT * FROM( SELECT ROW_NUMBER() OVER(ORDER BY (SELECT 1)) AS Row,"+
" * FROM [Table_1]) A WHERE Row >= ? AND Row <= ? ";
SparkContext sctx = new SparkContext(getSparkConf());
try {
JdbcRDD<List> jdbcRdd = new JdbcRDD(sctx,new GetJDBCConnection(),sql,0, rowCount, 200, new GetJDBCResult(),scala.reflect.ClassTag$.MODULE$.AnyRef());
Object[] bb = (Object[])jdbcRdd.collect();
System.out.println("Length of Object array : "+bb.length);
System.out.println("JdbcRDD:- "+bb);
} catch(Exception e) {
e.printStackTrace();
}
и код для GetJdbcResult
class GetJDBCResult extends AbstractFunction1<ResultSet, List> implements Serializable{
private static final long serialVersionUID = -78825308090L;
public List apply(ResultSet rs) {
Object result = null;
List lst = new ArrayList();
try {
System.out.println("In apply method");
System.out.println("resultSet : -"+rs);
int cols = rs.getMetaData().getColumnCount();
System.out.println("no of columns : "+cols);
for(int i=1;i<=cols;i++) {
result = rs.getObject(i);
System.out.println("Object : -"+result);
lst.add(result);
}
System.out.println("result->" + lst);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return lst;
}
}
Приведенный выше код работает нормально. Я запускаю Spark в автономном режиме (локальный *), но если использовать кластерную среду, он выдает следующую ошибку:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 39, DD1AI7511): java.lang.NullPointerException:
org.apache.spark.rdd.JdbcRDD$$anon$1.<init>(JdbcRDD.scala:74)
org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70)
org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
Трассировка стека драйвера:
В журналах Worker нет следов/журналов. Я делаю что-то не так здесь? У кого-нибудь есть идеи?
GetJDBCConnection
? Это соединение, к которому рабочие в вашем кластере имеют доступ? - person Mike Park   schedule 21.12.2014