FileWritingMessageHandler (int-file:outbound-channel-adapter) работает медленно для случая хранения нескольких сообщений в одном файле

Я использую Spring 4.1.2 с Spring Integration 4.1.0.

У меня есть вариант использования, когда я хотел бы создать один файл, который будет содержать строку для каждого сообщения, переданного в канал. Все полученные сообщения относятся к типу String. Этот файл удобно иметь, то есть нет необходимости, чтобы запись в этот файл выполнялась в рамках одной и той же транзакции главного потока. Таким образом, для варианта использования может быть реализован шаблон асинхронного прослушивания. Однако любые сообщения, записанные в этот файл, должны быть в том же порядке, в котором они были первоначально получены (поэтому либо 1 поток должен будет их обработать, либо агрегатор должен дождаться завершения нескольких потоков, а затем записать их в исходном порядке. ).

Я хотел понять, какой способ справиться с этим вариантом использования будет наиболее эффективным, поэтому я провел несколько тестов. Чтобы сделать это немного проще, в моих тестах не используется асинхронная прослушка (но это было упомянуто в прецеденте, потому что, возможно, некоторые предложения могут включать решения для пакетной обработки/буферизации).

Общий поток взят из раздела «Определить поток интеграции» по этой ссылке: https://spring.io/guides/gs/integration/

Основные варианты, которые я пробовал, были:

  1. Используйте int-file:outbound-channel-adapter (который создает FileWritingMessageHandler) вместе с преобразователем, который добавляет новую строку к каждому сообщению (преобразователь использует выражение SpEL payload + '#{systemProperties['line.separator']}.
    spring.expression.compiler.mode=OFF
  2. Используйте int-file:outbound-channel-adapter (который создает FileWritingMessageHandler) вместе с преобразователем, который добавляет новую строку к каждому сообщению (преобразователь использует выражение SpEL payload.toString() + '#{systemProperties['line.separator']}.
    spring.expression.compiler.mode=MIXED
    ПРИМЕЧАНИЕ. Использование payload.toString() вместо payload для обхода Проблема со SpEL: https://jira.spring.io/browse/SPR-12514
  3. Используйте int:logging-channel-adapter вместо int-file:outbound-channel-adapter (чтобы не использовать преобразователь с выражением SpEL).
    Протестировано с Log4J2 с использованием RollingRandomAccessFile и синхронных регистраторов.
    spring.expression.compiler.mode=OFF
  4. Используйте int:logging-channel-adapter вместо int-file:outbound-channel-adapter (чтобы не использовать преобразователь с выражением SpEL).
    Проверено с помощью Log4J2 с использованием RollingRandomAccessFile и асинхронных регистраторов. См. http://logging.apache.org/log4j/2.0/manual/async.html#Making асинхронности всех регистраторов .
    spring.expression.compiler.mode=OFF
  5. Используйте int:logging-channel-adapter вместо int-file:outbound-channel-adapter (чтобы не использовать преобразователь с выражением SpEL).
    Протестировано с помощью Log4J2 с использованием RollingRandomAccessFile и асинхронных регистраторов. См. http://logging.apache.org/log4j/2.0/manual/async.html#Making все регистраторы асинхронными .
    spring.expression.compiler.mode=MIXED

Поток тестовых примеров 1 и 2: int-file:outbound-channel-adapter flow

Тестовые случаи с 3 по 5: int:logging-channel-adapter flow

Входной файл содержит XML-данные (строки), длина которых варьируется от 1200 до 1500 символов в строке (каждая строка представляет собой отдельное сообщение).
В моем тесте у меня было 203 712 сообщений.
Ниже приведены временные параметры. Я показываю время для первых элементов немного больше, чем для последних, поскольку компилятор SpEL срабатывает через определенный период времени.

|          1              |            2             |              3                 |               4                |              5                 |
|SpringInt FileAdapter    | SpringInt FileAdapter    | Log4j2 RollingRandomAccessFile | Log4j2 RollingRandomAccessFile | Log4j2 RollingRandomAccessFile |
|                         |                          | Sync Loggers                   | Async Loggers                  | Async with                     |
|SpEL-compiler=OFF        | SpEL-compiler=MIXED      | SpEL-compiler=OFF              | SpEL-compiler=OFF              | SpEL-compiler=MIXED            |
|-------------------------|--------------------------|--------------------------------|--------------------------------|------------------------------- |
|Cnt=10000 : 0:00:12.670  | Cnt=10000 : 0:00:17.235  | Cnt=10000 : 0:00:08.222        | Cnt=10000 : 0:00:01.847        | Cnt=10000 : 0:00:01.320        |
|Cnt=20000 : 0:00:24.636  | Cnt=20000 : 0:00:30.208  | Cnt=20000 : 0:00:08.828        | Cnt=20000 : 0:00:02.232        | Cnt=20000 : 0:00:01.839        |
|Cnt=30000 : 0:00:36.179  | Cnt=30000 : 0:00:44.300  | Cnt=30000 : 0:00:09.426        | Cnt=30000 : 0:00:02.512        | Cnt=30000 : 0:00:02.647        |
|...                      | ....                     | ...                            | ...                            | ...                            |
|Cnt=180000 : 0:02:58.935 | Cnt=180000 : 0:04:15.528 | Cnt=180000 : 0:00:17.095       | Cnt=180000 : 0:00:08.546       | Cnt=180000 : 0:00:07.936       |
|Cnt=200000 : 0:03:16.473 | Cnt=200000 : 0:04:35.582 | Cnt=200000 : 0:00:18.107       | Cnt=200000 : 0:00:09.548       | Cnt=200000 : 0:00:08.660       |
|Cnt=203712 : 0:03:19.715 | Cnt=203712 : 0:04:39.452 | Cnt=203712 : 0:00:18.284       | Cnt=203712 : 0:00:09.661       | Cnt=203712 : 0:00:08.732       |

Возьмите тайминги с недоверием - я не запускал эти десятки раз и не брал средние значения. Я также не утверждаю, что log4j2 быстрее, чем другие предложения, такие как logback, я просто использую его для целей сравнения. ПРИМЕЧАНИЕ. Я использую файл в качестве входных данных только для этого теста. Я указываю на это, потому что кто-то может предложить Spring Integration просто скопировать необработанный файл из файла A в файл B. Однако в нашем реальном случае сообщения на самом деле поступают через JMS, поэтому решение «файл в файл» не является реальным вариантом.
Интересные моменты:

  • Spring Integration FileWritingMessageHandler НАМНОГО медленнее, чем любое предложение log4j2.
    Log4j2-async занял 4,3% времени FileWritingMessageHandler (199,715 сек. для сценария 1 против 8,732 сек. для сценария 5).
    Log4j2 -sync занял 4,8 % времени, которое FileWritingMessageHandler сделал (199,715 сек. для сценария 1 против 9,661 сек. для сценария 4).
  • Интеграция Spring FileWritingMessageHandler с spring.expression.compiler.mode=MIXED (сценарий № 2) на самом деле медленнее, чем spring.expression.compiler.mode=OFF. Я предполагаю, что это потому, что в сценарии № 1 я мог использовать payload + '#{systemProperties['line.separator']}, тогда как в сценарии № 2 мне пришлось использовать payload.toString() + '#{systemProperties['line.separator']}
  • Сценарии с 3 по 5 соответствуют ожиданиям по сравнению с другими сценариями.

В идеале мне не нравится использовать logging-channel-adapter только для записи сообщений в файл - похоже, я искажаю этот компонент. Однако прирост производительности значителен, поэтому, к сожалению, на данный момент я не могу исключить его использование.
Итак, мои вопросы:

  • Какие еще у меня есть варианты, помимо написания собственного FileWritingMessageHandler, чтобы повысить производительность записи файлов?
  • Я предполагаю, что, возможно, если я выполню пакетную обработку или агрегирование до FileWritingMessageHandler, а затем запишу производительность пакетной группы, это может быть лучше. Я уверен, что мог бы также использовать исполнителя задач и опросчик (мой вариант использования позволил бы это). Если пакетную обработку следует рассматривать как вариант, должен ли FileWritingMessageHandler предоставлять атрибут buffersize?
  • Можно ли изменить FileWritingMessageHandler или, возможно, можно предложить дополнительные более конкретные версии, которые будут более производительными для моего варианта использования (возможно, приняв некоторые советы / подсказки от регистраторов log4j2)?
  • Будет ли файл StreamWriter более производительным?
  • Просто подумайте об этом вслух: следует ли предлагать класс «оболочки» log4j2, который действует исключительно как файловый адаптер (т.е. ему нужно будет только регистрировать сообщение без информации о строке/классе/и т. д., всегда нужно будет писать независимо от уровня, пользователь просто передаст имя файла и, возможно, синхронизирует/асинхронно)?
  • Можно ли лучше оптимизировать компилятор SpEL для обработки случая payload.toString() + '#{systemProperties['line.separator']}, поскольку, как уже отмечалось, он был на самом деле медленнее, чем отсутствие вызова toString() в самом SpEL?

Ниже приведены файлы кода/конфигурации, используемые для тестов.

pom.xml

    <dependencies>

        <!-- Testing -->

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Spring Integration -->

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <!-- Binding for JCL (aka Java Common Logging).  -->
        <!-- Needed since things like the commons libs all use commons-logging which we don't want -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.7</version>
            <!-- Making scope be runtime so we'll catch any of our own classes that try to use commons-logging when we compile -->
            <scope>runtime</scope>
        </dependency>
        <!-- Binding for Log4J -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <!-- As of 9/12/2014 our company Maven repos does not have 2.0.2 -->
            <version>2.0.1</version>
        </dependency>
        <!-- Log4j API and Core implementation required for binding -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.0.2</version>
        </dependency>
        <!-- Async loggers for log4j2 require LMAX disruptor, see http://logging.apache.org/log4j/2.x/manual/async.html -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.1</version>
        </dependency>
    </dependencies>

Курсы Java

package com.xxx;

import java.util.Scanner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Starts the Spring Context and will initialize the Spring Integration routes.
 */
public final class Main {

    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

    private Main() {
    }

    /**
     * Load the Spring Integration Application Context
     *
     * @param args - command line arguments
     */
    public static void main(final String... args) {

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("\n=========================================================" + "\n                                                         " + "\n          Welcome to Spring Integration!                 " + "\n                                                         " + "\n    For more information please visit:                   " + "\n    http://www.springsource.org/spring-integration       " + "\n                                                         " + "\n=========================================================");
        }

        final AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/spring-integration-context-usecases.xml");

        context.registerShutdownHook();

        SpringIntegrationUtils.displayDirectories(context);

        final Scanner scanner = new Scanner(System.in);

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("\n=========================================================" + "\n                                                         " + "\n    Please press 'q + Enter' to quit the application.    " + "\n                                                         " + "\n=========================================================");
        }

        while (!scanner.hasNext("q")) {
            //Do nothing unless user presses 'q' to quit.
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Exiting application...bye.");
        }

        System.exit(0);

    }
}



package com.xxx;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
 * This class is only needed until a bug is fixed in Spring Integration 4.1.0.
 * See {@link http://stackoverflow.com/questions/27171978/read-csv-file-concurrently-using-spring-integration}
 * Once that is fixed delete this class and use this in the Spring context file.
 * <code>
 * <splitter input-channel="splitChannel" output-channel="executorChannel" expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
 * </code>
 *
 */
public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    int counter = 0;
    StopWatch sw = new StopWatch();

    public Object splitMessage(Message<?> message) {
        if (log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");

            return org.apache.commons.io.FileUtils.lineIterator((File) payload);
        } catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    @Override
    protected void produceOutput(Object result, Message<?> requestMessage) {

        Iterator<?> iterator = (Iterator<?>) result;
        sw.start();
        while (iterator.hasNext()) {
            ++counter;
            produceOutputInternal(iterator.next(), requestMessage);
            if (counter % 10000 == 0) {
                sw.split();
                System.out.println("Cnt=" + counter + " : " + sw.toSplitString());
            }
        }
        sw.stop();
        System.out.println("completed");
        System.out.println("Cnt=" + counter + " : " + sw.toSplitString());
    }

    private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip, AtomicInteger routingSlipIndex) {
        if (routingSlipIndex.get() >= routingSlip.size()) {
            return null;
        }

        Object path = routingSlip.get(routingSlipIndex.get());
        Object routingSlipPathValue = null;

        if (path instanceof String) {
            routingSlipPathValue = getBeanFactory().getBean((String) path);
        } else if (path instanceof RoutingSlipRouteStrategy) {
            routingSlipPathValue = path;
        } else {
            throw new IllegalArgumentException("The RoutingSlip 'path' can be of " + "String or RoutingSlipRouteStrategy type, but gotten: " + path);
        }

        if (routingSlipPathValue instanceof MessageChannel) {
            routingSlipIndex.incrementAndGet();
            return routingSlipPathValue;
        } else {
            Object nextPath = ((RoutingSlipRouteStrategy) routingSlipPathValue).getNextPath(requestMessage, reply);
            if (nextPath != null && (!(nextPath instanceof String) || StringUtils.hasText((String) nextPath))) {
                return nextPath;
            } else {
                routingSlipIndex.incrementAndGet();
                return getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
            }
        }
    }

    protected void produceOutputInternal(Object reply, Message<?> requestMessage) {
        MessageHeaders requestHeaders = requestMessage.getHeaders();

        Object replyChannel = null;
        if (getOutputChannel() == null) {
            Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
            if (routingSlipHeader != null) {
                Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
                Object key = routingSlipHeader.keySet().iterator().next();
                Object value = routingSlipHeader.values().iterator().next();
                Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
                Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
                List<?> routingSlip = (List<?>) key;
                AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
                replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
                if (replyChannel != null) {
                    //TODO Migrate to the SF MessageBuilder
                    AbstractIntegrationMessageBuilder<?> builder = null;
                    if (reply instanceof Message) {
                        builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
                    } else if (reply instanceof AbstractIntegrationMessageBuilder) {
                        builder = (AbstractIntegrationMessageBuilder<?>) reply;
                    } else {
                        builder = this.getMessageBuilderFactory().withPayload(reply);
                    }
                    builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Collections.singletonMap(routingSlip, routingSlipIndex.get()));
                    reply = builder;
                }
            }

            if (replyChannel == null) {
                replyChannel = requestHeaders.getReplyChannel();
            }
        }

        Message<?> replyMessage = createOutputMessage(reply, requestHeaders);
        sendOutput(replyMessage, replyChannel);
    }

    private Message<?> createOutputMessage(Object output, MessageHeaders requestHeaders) {
        AbstractIntegrationMessageBuilder<?> builder = null;
        if (output instanceof Message<?>) {
            if (!this.shouldCopyRequestHeaders()) {
                return (Message<?>) output;
            }
            builder = this.getMessageBuilderFactory().fromMessage((Message<?>) output);
        } else if (output instanceof AbstractIntegrationMessageBuilder) {
            builder = (AbstractIntegrationMessageBuilder<?>) output;
        } else {
            builder = this.getMessageBuilderFactory().withPayload(output);
        }
        if (this.shouldCopyRequestHeaders()) {
            builder.copyHeadersIfAbsent(requestHeaders);
        }
        return builder.build();
    }

    private void sendOutput(Object output, Object replyChannel) {
        MessageChannel outputChannel = getOutputChannel();
        if (outputChannel != null) {
            replyChannel = outputChannel;
        }
        if (replyChannel == null) {
            throw new DestinationResolutionException("no output-channel or replyChannel header available");
        }

        if (replyChannel instanceof MessageChannel) {
            if (output instanceof Message<?>) {
                this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);
            } else {
                this.messagingTemplate.convertAndSend((MessageChannel) replyChannel, output);
            }
        } else if (replyChannel instanceof String) {
            if (output instanceof Message<?>) {
                this.messagingTemplate.send((String) replyChannel, (Message<?>) output);
            } else {
                this.messagingTemplate.convertAndSend((String) replyChannel, output);
            }
        } else {
            throw new MessagingException("replyChannel must be a MessageChannel or String");
        }
    }
}


package com.xxx;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;

/**
 * Displays the names of the input and output directories.
 */
public final class SpringIntegrationUtils {

    private static final Log logger = LogFactory.getLog(SpringIntegrationUtils.class);

    private SpringIntegrationUtils() { }

    /**
     * Helper Method to dynamically determine and display input and output
     * directories as defined in the Spring Integration context.
     *
     * @param context Spring Application Context
     */
    public static void displayDirectories(final ApplicationContext context) {

        final File inDir = (File) new DirectFieldAccessor(context.getBean(FileReadingMessageSource.class)).getPropertyValue("directory");

        final Map<String, FileWritingMessageHandler> fileWritingMessageHandlers = context.getBeansOfType(FileWritingMessageHandler.class);

        final List<String> outputDirectories = new ArrayList<String>();

        for (final FileWritingMessageHandler messageHandler : fileWritingMessageHandlers.values()) {
            final Expression outDir = (Expression) new DirectFieldAccessor(messageHandler).getPropertyValue("destinationDirectoryExpression");
            outputDirectories.add(outDir.getExpressionString());
        }

        final StringBuilder stringBuilder = new StringBuilder();

        stringBuilder.append("\n=========================================================");
        stringBuilder.append("\n");
        stringBuilder.append("\n    Input directory is : '" + inDir.getAbsolutePath() + "'");

        for (final String outputDirectory : outputDirectories) {
            stringBuilder.append("\n    Output directory is: '" + outputDirectory + "'");
        }

        stringBuilder.append("\n\n=========================================================");

        logger.info(stringBuilder.toString());

    }

}

log4j2.xml файл конфигурации

<?xml version="1.0" encoding="UTF-8"?>

<Configuration>
    <Appenders>
        <Console name="STDOUT" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{ISO8601} [%t] [%-5p] (%c) - %m%n" />
        </Console>

        <RollingRandomAccessFile name="fileAppenderMessages" fileName="C:/Users/xxxxx/Desktop/fileadapter-test/usecase3.txt">
            <PatternLayout pattern="%m %n" />
        </RollingRandomAccessFile>
    </Appenders>

    <Loggers>
        <!-- The Wire-Tap and logging-channel-adapter in the Spring cfg file will use this category name -->
        <Logger name="fileLogger" additivity="false">
            <AppenderRef ref="fileAppenderMessages" />
        </Logger>

        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
</Configuration>

spring-integration-context-usecases.xml файл

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:inbound-channel-adapter id="fileAdapter" ref="fileReadingMessageSource" method="receive" auto-startup="true" channel="files" >
        <int:poller fixed-delay="#{T(java.lang.Integer).MAX_VALUE}"/>
    </int:inbound-channel-adapter>

    <bean id="fileReadingMessageSource" class="org.springframework.integration.file.FileReadingMessageSource">
        <property name="directory" value="C:/Users/xxxxx/Desktop/tmg-exchange-gateway-nam/t2"/>
    </bean>

    <int:channel id="files"/>

    <int:splitter input-channel="files" output-channel="stringMessages">
        <bean class="com.xxx.FileSplitter" />
    </int:splitter>

    <int:channel id="stringMessages"/>

    <int:transformer expression="payload + '#{systemProperties['line.separator']}'" output-channel="file" auto-startup="true" input-channel="stringMessages"/>
    <int-file:outbound-channel-adapter id="file"
            mode="APPEND"
            charset="UTF-8"
            directory="C:/Users/xxxxx/Desktop/fileadapter-test"
            auto-create-directory="true"
            filename-generator-expression="'usecase2.txt'"/>
</beans>

Тесты можно запускать с такими настройками:

1. java -Dspring.expression.compiler.mode=OFF com.xxx.Main
Leave context file unchanged.
2. java -Dspring.expression.compiler.mode=MIXED com.xxx.Main
Change context file to have expression="payload.toString() + '#{systemProperties['line.separator']}'"
3. java -Dspring.expression.compiler.mode=OFF com.xxx.Main
Comment out transformer and outbound-channel-adapter.
Change logging-channel-adapter   auto-startup="true"
4. java -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector  -Dspring.expression.compiler.mode=OFF com.xxx.Main
Comment out transformer and outbound-channel-adapter.
Change logging-channel-adapter   auto-startup="true"
5. java -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector  -Dspring.expression.compiler.mode=MIXED com.xxx.Main
Comment out transformer and outbound-channel-adapter.
Change logging-channel-adapter   auto-startup="true"

person Tony Falabella    schedule 05.12.2014    source источник


Ответы (1)


Спасибо за обширный анализ.

Честно говоря, режим APPEND — относительно недавнее дополнение к исходящему адаптеру и не был оптимизирован.

Я подозреваю, что это просто потому, что поток закрывается при каждой записи (с использованием FileCopy.copy()), которая сбрасывается на диск.

Мы определенно должны рассмотреть возможность оставить BufferedOutputStream открытым. Это немного сложно, потому что адаптер поддерживает запись в отдельный файл для каждого сообщения. Я предполагаю, что ваш вариант использования: вы всегда пишете в один и тот же файл или какое-то имя файла на основе метки времени. Мы могли бы обеспечить некоторую оптимизацию, чтобы файл оставался открытым до тех пор, пока не поступит запрос на другой файл, или даже оставить открытыми несколько файловых буферов.

Однако в какой-то момент нам может понадобиться очистить буфер(ы), если по прошествии некоторого времени новые сообщения не поступят. Это добавляет некоторую сложность (но не большую).

Конечно, недостатком является то, что существует риск потери данных в случае сбоя питания, когда у вас есть данные, буферизованные в памяти. Это классический компромисс — производительность против производительности. надежность; прямо сейчас этот адаптер ошибается в сторону последнего.

Как всегда, не стесняйтесь открывать вопрос JIRA, и мы посмотрим.

person Gary Russell    schedule 06.12.2014
comment
Да, ваше предположение верно, мы всегда пишем в один и тот же файл для части нашего потока (много сообщений в один и тот же файл). Мы понимаем риск потери данных при использовании буферизованного файла, и поэтому мы не полагаемся на эти файлы для каких-либо иных целей, кроме создания отчетов или синхронизации данных UAT. См. JIRA: jira.spring.io/browse/INT-3574 - person Tony Falabella; 08.12.2014