Использование Nio Locker в весенней интеграции java dsl для опроса файлов

Я пытаюсь реализовать опрос файлов интеграции Spring, при котором несколько серверов независимо читают из общего каталога и обрабатывают файл. Каждый из них после обработки файла переименовывает его в .DONE, чтобы другой его не выбирал. Наблюдая за тем, как некоторые файлы выбираются обоими, я использовал nioLocker, как в примере ниже. Но похоже, что случайно это не сработает. Сообщите мне, что нужно сделать, чтобы сделать nioLocker надежным в многосерверной среде. По другим причинам наличие отдельного посредника (например, zookeepr / mongo и т. Д.) Не является вариантом.

Спасибо.

 @Bean
    public TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
                new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.renameTo(new java.io.File(payload.path+'.DONE'))"));
        syncProcessor.setAfterRollbackExpression(parser.parseExpression("payload.renameTo(new java.io.File(payload.path+'.DONE'))"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

@Bean
public IntegrationFlow receiveInputFile(@Value("/opt/tomcat/in/test") File in,
                                            @Value(".txt") String pattern,
                                            @Value("${edi.poll.delay.all.edi}") int delay,
                                            @Value("${edi.messages.per.poll.new.order}") int messagesPerPoll) {

    //Logging required config for debugging
    LOGGER.debug("EDI File pattern :"+pattern);
    LOGGER.debug("EDI Delay Seconds :"+delay);
    LOGGER.debug("EDI Messages Per Poll :"+messagesPerPoll);


    return IntegrationFlows
            .from(s -> s.file(in).patternFilter(ServicesConstant.PATTERN_PREFIX + pattern).scanEachPoll(true).nioLocker(),
                    e -> e.poller(Pollers.fixedDelay(delay).maxMessagesPerPoll(messagesPerPoll)
                            .transactionSynchronizationFactory(transactionSynchronizationFactory())
                            .transactional(new PseudoTransactionManager())))
            .handle(m -> {
                LOGGER.info("Received test file " + m);
                LOGGER.info("File path: " +m.getPayload());
                LOGGER.info(""+ m.getHeaders().toString());
            })
            .get();
}

person harsha galla    schedule 23.07.2018    source источник


Ответы (1)


См. Его JavaDocs:

/**
 * File locking strategy that uses java.nio. The locks taken by FileChannel are shared with all the threads in a single
 * JVM, so this locking strategy <b>does not</b> prevent files being picked up multiple times within the same JVM.
 * {@link FileReadingMessageSource}s sharing a Locker will not pick up the same files.
 * <p>
 * This implementation will acquire or create a {@link FileLock} for the given file. Caching locks might be expensive,
 * so this locking strategy is not recommended for scenarios where many files are accessed in parallel.
 *
 * @author Iwein Fuld
 * @author Mark Fisher
 * @since 2.0
 */
public class NioFileLocker extends AbstractFileLockerFilter {

В моей практике у нас действительно хорошая эксклюзивная блокировка файлов только в Windows.

Сказать, что я не уверен, что для вас означает «многосерверная среда», если мы имеем дело только с одной локальной файловой системой. Я почти уверен, что эта блокировка NIO не влияет на общий сетевой каталог.

Если вы не можете использовать какое-либо общее постоянное хранилище для фильтрации файлов, я бы посоветовал вам взглянуть на FileSystemPersistentAcceptOnceFileListFilter с действительно некоторой общей MetadataStore реализацией, чтобы весь ваш сервер мог договориться с ним, чтобы убедиться, что файл не был обработан. еще каким-то другим сервером.

person Artem Bilan    schedule 24.07.2018
comment
Под «многосерверной средой» я имел в виду, что у нас есть точка монтирования, доступная на двух независимых машинах unix. То же самое приложение интеграции Spring, развернутое для tomcat на этих двух машинах, опрашивает смонтированный диск на предмет новых файлов для обработки. - person harsha galla; 26.07.2018
comment
Можете ли вы поделиться примером использования FileSystemPersistentAcceptOnceFileListFilter с MongoMetadataStore? - person harsha galla; 26.07.2018