Представьте себе следующий сценарий: приложение Spark (реализация Java) использует базу данных Cassandra для загрузки, преобразования в RDD и обработки данных. Также приложение получает новые данные из базы данных, которые также обрабатываются специальным приемником. Результат процесса потоковой передачи сохраняется в базе данных. Реализация использует Spring Data Cassandra из интеграции с базой данных.
КассандраКонфигурация:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
Метод DataProcessor.main:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
Ожидается большой объем данных для начальной загрузки. По этой причине данные разбиваются на страницы, загружаются и распределяются в rddBuffer.
Также доступны следующие варианты:
- Пример Spark-Cassandra (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), хотя для этого примера имеется минимальное количество документации.
- Проект Calliope (http://tuplejump.github.io/calliope/)
Я хотел бы знать, как лучше всего интегрировать Spark с Cassandra. Что было бы лучшим вариантом для моей реализации?
Apache Spark 1.0.0, Apache Cassandra 2.0.8