Почему Apache Spark выполняет фильтры на клиенте

Будучи новичком в Apache Spark, столкнулся с проблемой при получении данных Cassandra в Spark.

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

Этот запрос не фильтрует данные на сервере cassandra. В то время как этот оператор java выполняет свою стрельбу по памяти и, наконец, выдает исключение spark java.lang.OutOfMemoryError. Запрос должен отфильтровывать данные на сервере cassandra, а не на стороне клиента, как указано в https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md.

Пока я выполняю запрос с фильтрами на cassandra cqlsh, он работает нормально, но выполнение запроса без фильтра (предложение where) дает ожидаемый тайм-аут. Итак, ясно, что spark не применяет фильтры на стороне клиента.

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

Почему фильтры применяются на стороне клиента и как можно улучшить применение фильтров на стороне сервера.

Как мы можем настроить искровой кластер поверх кластера cassandra на платформе Windows?




Ответы (3)


Не используя Cassandra со Spark, прочитав предоставленный вами раздел (спасибо за это), я вижу, что:

Примечание. Хотя предложение ALLOW FILTERING неявно добавляется в сгенерированный запрос CQL, не все предикаты в настоящее время разрешены механизмом Cassandra. Это ограничение будет устранено в будущих выпусках Cassandra. В настоящее время ALLOW FILTERING хорошо работает со столбцами, проиндексированными вторичными индексами или столбцами кластеризации.

Я почти уверен (но не проверял), что предикат «IN» не поддерживается: см. cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80" rel="nofollow">https://github.com/datastax/spark-cassandra- коннектор/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

Таким образом, вы можете попытаться ограничить свое предложение where идентификатором (при условии, что для этого есть вторичный индекс) и использовать искровую фильтрацию для диапазона дат.

person cbbetz    schedule 30.06.2015
comment
Спасибо, вы спасли мой день, дав мне эту ссылку. - person Faraz; 01.04.2018

Я бы предложил читать таблицу как DataFrame вместо RDD. Они доступны в Spark 1.3 и выше. Затем вы можете указать запрос CQL в виде строки следующим образом:

CassandraSQLContext sqlContext = new CassandraSQLContext(sc);

String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
DataFrame resultsFrame = sqlContext.sql(query);

System.out.println(resultsFrame.count());

Так что попробуйте это и посмотрите, работает ли это лучше для вас.

Когда у вас есть данные в DataFrame, вы можете выполнять над ним операции Spark SQL. А если вам нужны данные в RDD, вы можете преобразовать DataFrame в RDD.

person Jim Meyer    schedule 01.07.2015
comment
я бы попробовал датафрейм - person 107; 01.07.2015
comment
Знаете ли вы, как настроить искровой кластер поверх кластера cassandra на платформе Windows?? - person 107; 01.07.2015

установка spark.cassandra.input.split.size_in_mb в SparkConfing решила проблему.

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

Spark-cassnadra-connector считывает неправильное значение spark.cassandra.input.split.size_in_mb, поэтому переопределение этого значения в SparkConf работает. Теперь предложение IN также работает хорошо.

person 107    schedule 01.07.2015