Spring Integration DSL: Dispatcher не имеет подписчиков

У меня есть требование получить zip-файл с использованием SFTP. После этого мы должны заархивировать файл как есть, также обрабатывает файл после распаковки zip-файлов. Ниже приведены коды для основного потока и подпотока. agentDataArchiveChannelAdapter () работает нормально, но для другого канала я получаю ошибку ниже. В чем может быть ошибка, как это исправить? Мое предположение заключалось в том, что surancebayAgentDemographicFlow () помещает записи в прямой канал и будет следовать указанному процессу.

Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]

2018-09-24 12:16:22.004 DEBUG 17536 --- [ask-scheduler-2] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.thirdpartyAgentDemographicFlow-Processing'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], headers={id=9e342354-8436-e1de-774e-937c8b6809d5, timestamp=1537816582001}] for original GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]

Код / Интеграция

@Bean("sftpAgentInboundFlow")
public IntegrationFlow sftpAgentInboundFlow(SessionFactory<LsEntry> sftpSessionFactory) {
    return IntegrationFlows
            .from(Sftp.inboundAdapter(sftpSessionFactory)
                    .deleteRemoteFiles(false)
                    .preserveTimestamp(true)
                    .remoteDirectory(agentRemoteDir)
                    .filter(new AcceptOnceFileListFilter<>())
                    .regexFilter(".*\\.zip$")
                    .localDirectory(new File(inputDir))
                    .autoCreateLocalDirectory(true)
                    .maxFetchSize(1)
                    ,
                    consumer -> consumer.id("sftpInboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
            .publishSubscribeChannel(pubSub -> pubSub
                            .id("AgentInboundDemographic-PubSub-Channel")
                            .subscribe(flow -> flow.bridge(e -> e.id("ziparchiver")).handle(agentDataArchiveChannelAdapter()))
                            .subscribe(surancebayAgentDemographicFlow())
                    )
            .get();
}



//@Bean("surancebayAgentDemographicFlow")
public IntegrationFlow surancebayAgentDemographicFlow() {
    return IntegrationFlows
            //.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)/*.maxMessagesPerPoll(corepoolsize)*/))
            .from(MessageChannels.direct("thirdpartyAgentDemographicFlow-Processing"))
            .transform(unZipTransformer())
            .split(splitter())
            .channel(MessageChannels.executor(taskExecutor()))
            .<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
                    .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                    .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                    )
            .aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {

                @Override
                public Object getCorrelationKey(Message<?> message) {
                    return "processdate";
                }
            }).sendPartialResultOnExpiry(true))
            .handle("agentDemograpicOutput","generateAgentDemographicFile")
            .channel(confirmChannel())
            .get()
            ;
}

person Debopam    schedule 24.09.2018    source источник
comment
В соответствии с исключением проблема связана с thirdpartyAgentDemographicFlow-Processing, и вы не показываете эту часть в своей конфигурации.   -  person Artem Bilan    schedule 24.09.2018
comment
Обновлено. По какой-то причине пришлось сменить имя   -  person Debopam    schedule 24.09.2018


Ответы (1)


OK! Я думаю, проблема в том, что вы используете какую-то версию Spring Integration, где функция использования внешнего IntegrationFlow в качестве подпотока еще не реализована. Или рассмотрите возможность обновления до последней версии или используйте обходной путь в качестве .subscribe("thirdpartyAgentDemographicFlow-Processing") и раскомментируйте эту @Bean аннотацию к определению surancebayAgentDemographicFlow.

person Artem Bilan    schedule 24.09.2018