Apache Flink: как использовать SourceFunction для выполнения задачи с заданным интервалом?

Мне нужно, чтобы моя работа flink извлекала записи из базы данных с заданным интервалом и архивировала их после обработки. Я реализовал SourceFunction для извлечения необходимых записей из базы данных и добавил SourceFunction в качестве источника StreamExecutionEnvironment. Как я могу указать, что StreamExecutionEnvironment необходимо получать записи из базы данных с помощью SourceFunction каждые 10 минут?

ИсточникФункция:

public class MongoDBSourceFunction implements SourceFunction<List<Book>>{

    public void cancel() {
        // TODO Auto-generated method stub
    }

    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {

        List<Book> books = getBooks();

        context.collect(books);

    }

    public List<Book> getBooks() {
        List<Book> books = new ArrayList<Book>();

        //fetch all books from database     
        return books;
    }

}

Процессор:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

    public static void main(String[] args) {

        final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new MongoDBSourceFunction()).print();
    }

}

person tweeper    schedule 23.10.2018    source источник


Ответы (1)


Вам необходимо добавить эту функциональность в сам MongoDBSourceFunction. Например, вы можете создать экземпляр ScheduledExecutorService в методе open и запланировать задачу чтения с помощью этого исполнителя.

Обратите внимание, что при отправке записей важно удерживать блокировку контрольной точки.

person Till Rohrmann    schedule 24.10.2018
comment
SourceFunction для соединителя flink Apache NiFi использует Thread.sleep(). Это приемлемо или использовать ScheduledExecutorService единственный выход? - person tweeper; 25.10.2018
comment
Thread.sleep() также должен работать, если вы отключите блокировку контрольной точки во время сна. - person Till Rohrmann; 25.10.2018
comment
Я не использую контрольные точки. Я просто запрашиваю базу данных каждые 15 минут и отправляю эти записи. При чем тут блокировка контрольных точек? - person tweeper; 25.10.2018
comment
Если вы хотите использовать контрольную точку, важно генерировать записи под блокировкой контрольной точки, потому что в противном случае состояние не будет согласованным. - person Till Rohrmann; 25.10.2018