загрузка динамического файла spring-integration-aws

У меня есть требование загрузить файл с S3 на основе содержимого сообщения. Другими словами, файл для загрузки ранее неизвестен, я должен искать и находить его во время выполнения. S3StreamingMessageSource не подходит, потому что:

  1. Это зависит от опроса, где мне нужно дождаться сообщения.
  2. Я не могу найти способ динамически создать S3StreamingMessageSource в середине потока. gateway(IntegrationFlow) выглядит интересно, но мне нужен несуществующий gateway(Function<Message<?>, IntegrationFlow>).

Другой кандидат: S3MessageHandler, но он не поддерживает перечисление файлов, которые мне нужны для поиска нужного файла.

Я могу реализовать свой собственный обработчик сообщений напрямую с помощью AWS API, просто интересно, не упустил ли я что-то, потому что это не кажется необычным требованием. В конце концов, не каждое приложение просто сидит там и продолжает опрашивать S3 на наличие новых файлов.


person Abhijit Sarkar    schedule 29.12.2017    source источник
comment
Что ж, для InputStream вы все еще можете использовать S3RemoteFileTemplate и его функцию get() или S3Session.readRaw(), если вам обязательно нужно вернуть поток. Да, мы можем добавить поддержку InputStream в S3MessageHandler, но это не ошибка.   -  person Artem Bilan    schedule 30.12.2017
comment
@ArtemBilan Я посмотрел на get, но он вызывает callback.doWithInputStream, а затем закрывает поток. В отличие от S3StreamingMessageSource, сообщение не может быть передано вниз по течению, все, что должно быть сделано, должно быть сделано в обратном вызове. Я иду по пути внедрения интеллектуального фильтра, который я могу использовать с S3StreamingMessageSource, чтобы мне не пришлось изобретать велосипед.   -  person Abhijit Sarkar    schedule 30.12.2017
comment
Ok. Понимаю. Хотя это не управляется событиями, оно по-прежнему доступно для опроса, однако вы действительно можете вызвать receive() вручную. readRaw() из сеанса должно быть хорошо для вас.   -  person Artem Bilan    schedule 30.12.2017


Ответы (2)


Существует S3RemoteFileTemplate с функцией list(), которую вы можете использовать в handle(). Затем split() результат и вызов S3MessageHandler для загрузки каждого удаленного файла.

Хотя последний имеет функцию загрузки всего удаленного каталога.

person Artem Bilan    schedule 29.12.2017
comment
Я посмотрел, и, к сожалению, это не сработает. S3MessageHandler не может передавать файл в потоковом режиме, только скачивает его. Это похоже на ошибку, потому что она должна создавать Message<InputStream>, если полезная нагрузка не является File. - person Abhijit Sarkar; 29.12.2017
comment
Я заставил это работать, смотрите мой ответ. См. также issue-82. - person Abhijit Sarkar; 31.12.2017

Для тех, кто сталкивается с этим вопросом, это то, что я сделал. Хитрость заключается в следующем:

  1. Установите фильтры позже, а не во время строительства. Обратите внимание, что нет метода addFilters или getFilters, поэтому фильтры можно установить только один раз, и их нельзя будет добавить позже. @artem-bilan, это неудобно.
  2. Вызовите S3StreamingMessageSource.receive вручную.

    .handle(String.class, (fileName, h) -> {
    if (messageSource instanceof S3StreamingMessageSource) {
        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
    
        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilters(
                new S3SimplePatternFileListFilter("**/*/*.json.gz"),
                new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
                new S3FileListFilter(fileName)
        );
        s3StreamingMessageSource.setFilter(chainFileListFilter);
    
        return s3StreamingMessageSource.receive();
    }
    log.warn("Expected: {} but got: {}.",
            S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
    return messageSource.receive();
    }, spec -> spec
        .requiresReply(false) // in case all messages got filtered out
    )
    
person Abhijit Sarkar    schedule 31.12.2017
comment
MessageSorce не предназначен для перенастройки во время выполнения. Вы можете столкнуться с некоторыми условиями гонки при одновременном доступе к нему. Однако ваш addFilter мы можем преодолеть с помощью пользовательской реализации ChainFileListFilter, где вы можете внедрить только это в S3StreamingMessageSource и реализовать там свою собственную логику добавления/удаления. - person Artem Bilan; 19.04.2018
comment
Я думаю, что вы все еще можете выполнить свое InputStream требование естественным путем, не вызывая MessageSource.receive() вручную. Вам нужен S3RemoteFileTemplate и его getSession(). Этот Session имеет InputStream readRaw(String source) API. - person Artem Bilan; 19.04.2018
comment
@ArtemBilan Если я использую Session, мне придется самому выполнять фильтрацию. Я не видел никаких фильтров в S3RemoteFileTemplate. Кроме того, это не поможет мне с обработкой ошибок, обсуждаемой здесь - person Abhijit Sarkar; 20.04.2018
comment
Ну, вы используете S3RemoteFileTemplate.list() из .handle(), чтобы получить список ресурсов S3, затем вы используете .filter() для разделенных элементов, а затем упомянутый Session.readRaw(). Вы можете обернуть этот поток в .gateway() и иметь единую точку для обработки ошибок. - person Artem Bilan; 20.04.2018