Spark с вводом/выводом Cassandra

Представьте себе следующий сценарий: приложение Spark (реализация Java) использует базу данных Cassandra для загрузки, преобразования в RDD и обработки данных. Также приложение получает новые данные из базы данных, которые также обрабатываются специальным приемником. Результат процесса потоковой передачи сохраняется в базе данных. Реализация использует Spring Data Cassandra из интеграции с базой данных.

КассандраКонфигурация:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

Метод DataProcessor.main:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

Ожидается большой объем данных для начальной загрузки. По этой причине данные разбиваются на страницы, загружаются и распределяются в rddBuffer.

Также доступны следующие варианты:

  1. Пример Spark-Cassandra (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), хотя для этого примера имеется минимальное количество документации.
  2. Проект Calliope (http://tuplejump.github.io/calliope/)

Я хотел бы знать, как лучше всего интегрировать Spark с Cassandra. Что было бы лучшим вариантом для моей реализации?

Apache Spark 1.0.0, Apache Cassandra 2.0.8


person Pantelis Papapoulias    schedule 27.06.2014    source источник
comment
Планируете ли вы использовать Spark на одном узле или в кластере?   -  person maasg    schedule 27.06.2014
comment
PS: рассмотрите возможность использования Scala для Spark. Это намного приятнее.   -  person maasg    schedule 27.06.2014
comment
Это реализация прототипа, на следующем этапе весь код будет написан на Java 8 (лямбда-выражения) или Scala (замыкания). Выбор языка будет зависеть от нескольких факторов. Также в кластере бессмысленно использовать Skark на одном узле.   -  person Pantelis Papapoulias    schedule 29.06.2014
comment
@maasg прав, его ответ тоже хорош. Поверьте нам, когда мы говорим, что ваша жизнь станет в 10 раз проще, когда вы перейдете на Scala. У нас есть поговорка: однажды перейдя на Scala, вы никогда не вернетесь назад.   -  person samthebest    schedule 29.06.2014
comment
@samthebest И Scala, и Java 8, кажется, предлагают большие преимущества по сравнению с предыдущими версиями реализаций Java или Python. Scala кажется естественным дополнением к Spark, поскольку это реализация Scala. Но опять же выбор между этими двумя будет оцениваться на более позднем этапе. Сейчас я намерен сосредоточиться на основах.   -  person Pantelis Papapoulias    schedule 29.06.2014
comment
@Pantelis справедливое замечание :)   -  person samthebest    schedule 30.06.2014
comment
С сегодняшнего дня существует третий рекомендуемый вариант использования официального драйвера для Spark: github.com/ datastax/cassandra-driver-spark   -  person Piotr Kołaczkowski    schedule 30.06.2014


Ответы (2)


Самый простой способ работать с Cassandra и Spark — использовать официальный драйвер Cassandra с открытым исходным кодом для Spark, разработанный DataStax: https://github.com/datastax/spark-cassandra-connector

Этот драйвер был построен поверх Cassandra Java Driver и обеспечивает прямой мост между Cassandra и Spark. В отличие от Calliope, он не использует интерфейс Hadoop. Кроме того, он предлагает следующие уникальные функции:

  • поддержка всех типов данных Cassandra, включая коллекции, из коробки
  • легкое сопоставление строк Cassandra с пользовательскими классами или кортежами без необходимости использования каких-либо неявных или других расширенных функций в Scala
  • сохранение любых RDD в Cassandra
  • полная поддержка виртуальных узлов Cassandra
  • возможность фильтровать/выбирать на стороне сервера, например. использование столбцов кластеризации Cassandra или вторичных индексов
person Piotr Kołaczkowski    schedule 30.06.2014
comment
Спасибо, это было именно то, что мне нужно для моей реализации. - person Pantelis Papapoulias; 01.07.2014
comment
Взгляните на класс CassandraConnector, предоставленный spark-cassandra-connector. Это упрощает выполнение запросов CQL и т. д. внутри операций с RDD (foreach, map и т. д.), поэтому они распределяются по кластеру. Это недостаточно подчеркивается в документации IMO. - person David Tinker; 07.10.2014
comment
Петр, можешь взглянуть на этот вопрос о datastax'sspark-cassandra-connector? stackoverflow.com/questions/27130321 / Спасибо. - person chrisTina; 25.11.2014

Подход в приведенном выше коде представляет собой классический централизованный алгоритм, который будет работать, только если выполняется на одном узле. И Cassandra, и Spark являются распределенными системами, поэтому необходимо смоделировать процесс таким образом, чтобы его можно было распределить между несколькими узлами.

Есть несколько возможных подходов: если вы знаете ключи строк для извлечения, вы можете сделать что-то простое, например следующее: (используя драйвер DataStax Java)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

Это позволит эффективно распределить выборку ключей по кластеру Spark. Обратите внимание, как соединение с C* выполняется внутри замыкания, так как это гарантирует, что соединение будет установлено при выполнении задачи на каждом отдельном распределенном рабочем потоке.

Учитывая, что в вашем примере используется подстановочный знак (т. е. ключи неизвестны), использование интерфейса Hadoop Cassandra является хорошим вариантом. Пример Spark-Cassandra, указанный в вопросе, иллюстрирует использование этого интерфейса Hadoop в Cassandra.

Calliope — это библиотека, которая инкапсулирует сложность использования интерфейса Hadoop, предоставляя простой API для доступа к этой функциональности. Он доступен только в Scala, поскольку использует определенные функции Scala (такие как имплициты и макросы в предстоящем выпуске). С Calliope вы в основном объявляете, как преобразовать ваш RDD[type] в ключ строки и значение строки, а Calliope позаботится о настройке интерфейсы hadoop для работы. Мы обнаружили, что Calliope (и базовые интерфейсы Hadoop) в 2-4 раза быстрее, чем использование драйвера для взаимодействия с Cassandra.

Вывод: я бы отказался от конфигурации Spring-Data, чтобы получить доступ к Cassandra, так как это ограничит вас одним узлом. Рассмотрите возможность простого параллельного доступа, если это возможно, или изучите использование Calliope в Scala.

person maasg    schedule 28.06.2014
comment
Спасибо за ваш ответ maasg. Вы разъясняете разницу между двумя вариантами интерфейса Hadoop и API Calliope. Но неясно, почему вариант Spring Data Cassandra не подходит для кластерной архитектуры. Не могли бы вы предоставить более подробную информацию об этом? В данном примере кода данные загружаются локально и распределяются по кластеру пакетами. Для этой цели используется буфер RDD (с объединением для каждой партии). Каждый рабочий узел будет параллельно загружать в RDD один и тот же набор данных? - person Pantelis Papapoulias; 30.06.2014
comment
Не следует создавать новый кластер в тесном цикле приложения. Это убьет производительность. Кроме того, вам необходимо правильно закрыть экземпляры кластера после использования. В противном случае у вас скоро закончится память и/или потоки. Экземпляры кластера и сеанса потокобезопасны. Можно переместить их из лямбды и поделиться ими, однако, поскольку они не сериализуемы, их невозможно распространять. Эта проблема была решена с помощью класса Connector в официальном модуле cassandra-driver-spark. - person Piotr Kołaczkowski; 30.06.2014
comment
@PiotrKolaczkowski Учитывая, что кластер не сериализуем, его необходимо создать в лямбде, поскольку этот код будет выполняться каждым рабочим в распределенной системе Spark. Упомянутый вами случай будет работать только в локальном режиме. Re: cassandra-spark: Вы, безусловно, близки к источнику. Я был в объявлении сегодня утром на Spark Summit. Будет изучать его как можно скорее. - person maasg; 01.07.2014
comment
Вы правы, именно поэтому я упомянул класс CassandraConnector, который драйвер использует для решения именно этой проблемы. CassandraConnector можно рассматривать как сериализуемое соединение с Cassandra. Он позволяет совместно использовать кластер между потоками на одной и той же JVM, но при отправке по сети на удаленный узел он прозрачно восстанавливает соединение с кластером. Он также предлагает несколько полезных утилит для предотвращения утечек ресурсов. - person Piotr Kołaczkowski; 01.07.2014