Управление сообщениями / потоками Mule

Мы работаем на mule Community Edition 3.4.1. Инвестиции, направленные на то, чтобы код работал там, в настоящее время перевешивают варианты обновления.

Коннектор Mongo в этой версии настолько сильно истощает ресурсы, что практически непригоден для массовой обработки данных. Мы обрабатываем примерно 10–100 млн записей в день (что не так уж и много), и с коннектором Mongo из коробки нам придется перезапускать Mule примерно каждые три часа. Теперь вы можете делать это разумно, а не нет, и мы можем потратить время на то, чтобы сделать это относительно безболезненным, однако с объемом автоматизации и зависимых процессов мы вместо этого решили написать собственный коннектор Mongo.

Однако для удобства использования и для соответствия навыкам нашего существующего кадрового резерва мы хотели создать что-то, что обеспечивало бы простоту использования и БЕЗ утечки ресурсов.

По сути, мы создали простой преобразователь мула на Java, который раскрывает его функциональность с помощью статических методов. Каждый статический метод получает ссылку на экземпляр, а экземпляр имеет текущее событие MuleEvent. По сути, мы просто переопределяем метод процесса, чтобы сохранить MuleEvent, и метод doProcess, чтобы сохранить полезную нагрузку в частное свойство, а затем возвращаем ссылку на экземпляр в качестве полезной нагрузки (this).

Следующим элементом в потоке является преобразователь Expression, который вызывает статический метод с полезной нагрузкой в ​​качестве экземпляра и содержит все выражения как простые выражения MEL, как вы могли бы это сделать в коннекторе Mongo.

Мы используем POJO, и Mule Studio может получать ссылки на статические методы, и использовать этот соединитель очень просто и легко. Он также может обрабатывать сотни миллионов вызовов без каких-либо проблем, поскольку мы очень тщательно управляем ресурсами (на самом деле, слишком консервативно), и все отлично. Кроме. Когда обработка действительно быстрая, иногда Mule теряет хладнокровие и выдает нулевое исключение. Мы точно знаем, что в этот момент в нормальном сценарии не может быть нулевого исключения. На самом деле мы даже явно добавили фильтры выражений, которые не позволяли поднимать нулевое выражение, но это все равно происходит. Но только, если есть асинхронный поток. Теперь, прежде чем мы перейдем к этому, внутри асинхронного потока компоненты все еще выполняются последовательно, и наше решение помещает экземпляр коннектора Mongo в полезную нагрузку сообщения и сразу берет его. Мы хотим узнать, как возникает эта проблема и почему.

Вот некоторые части кода, я могу добавить больше. РЕДАКТИРОВАТЬ: я уже добавлял строку в поток раньше. Вы увидите, что currentSku не может быть нулевым

<set-session-variable variableName="currentSku" value="#[payload.?sku]" doc:name="currentSku"/>
<logger message="#[currentSku]" level="INFO" category="fiuze.plugins.contentproviders.ExtraData.asycnhFlow" doc:name="Logger"/>
<expression-filter expression="#[currentSku != null ]" doc:name="filterForNoSKUinpayload"/> 
<custom-transformer class="com.fiuze.components.Mongo" doc:name="mongoInit" />
<expression-transformer expression="#[com.fiuze.components.Mongo.many(payload, &quot;fizueDemoMongo&quot;,&quot;products&quot;,&quot;{'productChannelData.default.ExtraData':{$exists:false}}&quot;,100)]" doc:name="mongoGetProductsWithoutExtraData"/>

добавьте вот код Java, который отражает этот вызов:

    public static synchronized Object many(Mongo instance, String configRef, String collectionName, String query, Integer limit) throws TransformerException {
      return Mongo.exec(instance, configRef, collectionName, query, limit, null, null, false);
    }

и вот реализация Exec:

    private static synchronized Object exec(Mongo instance, String configRef, String collectionName, Object query, Integer limit, Integer skip, Object sort, Boolean count) throws TransformerException {

    instance.getMuleMessage().setPayload(instance.getPayload());
    instance.setConfigref(configRef);
    instance.setCollectionName(collectionName);
    instance.setLimit(limit);
    instance.setSkip(skip);
    sort = instance.evaluate(sort);
    instance.setQuery(instance.evaluate(query));
    if (sort instanceof BasicDBObject) {
        instance.setSort((BasicDBObject)sort);
    } else if (sort instanceof String) {
        instance.setSort((String)sort);
    } else if (sort == null){
        instance.setSort((String)null);
    } else {
        throw new TransformerException( (Message) instance.getMuleMessage() );
    }
    instance.setCount(count);
    Object result = instance.execFind();
    instance.dispose();
    return result;
}

как вы заметили, мы используем синхронизированные вызовы, поэтому статические методы не запутаются.

Поскольку это сложный вопрос, и я не уверен, что ясно выражаю его, пожалуйста, не стесняйтесь спрашивать меня подробности, я буду более чем счастлив предоставить.

РЕДАКТИРОВАТЬ:

Вот часть журнала, я включу часть без проблем, с ошибкой, а затем без проблем. В этом разделе вы увидите, что процесс продолжается, но конкретный поток Asynch не работает.

INFO  2016-05-20 21:22:00,760 [[fiuze].getExtraProductData.async1.1579] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:00,769 [[fiuze].getExtraProductData.async1.1581] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,812 [[fiuze].getExtraProductData.async1.1590] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:00,825 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,831 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,887 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,959 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:00,995 [[fiuze].getExtraProductData.async1.1580] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84422
INFO  2016-05-20 21:22:00,996 [[fiuze].getExtraProductData.async1.1579] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84436
INFO  2016-05-20 21:22:00,998 [[fiuze].getExtraProductData.async1.1578] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84454
INFO  2016-05-20 21:22:00,999 [[fiuze].getExtraProductData.async1.1587] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84455
INFO  2016-05-20 21:22:01,000 [[fiuze].getExtraProductData.async1.1590] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84446
INFO  2016-05-20 21:22:01,002 [[fiuze].getExtraProductData.async1.1586] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84445
INFO  2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1589] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84470
INFO  2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84477
INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1591] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84462
INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1588] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84464
ERROR 2016-05-20 21:22:01,010 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message               : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
  org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
  org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
  org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
    + 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1583] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84447
INFO  2016-05-20 21:22:01,003 [[fiuze].getExtraProductData.async1.1585] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84456
ERROR 2016-05-20 21:22:01,014 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message               : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
  org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
  org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
  org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
    + 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.1582] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84482
INFO  2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.01] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84483
INFO  2016-05-20 21:22:01,023 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.api.processor.LoggerMessageProcessor: starting ForEach
WARN  2016-05-20 21:22:01,024 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
ERROR 2016-05-20 21:22:01,034 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,035 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,145 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,153 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,268 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,269 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
WARN  2016-05-20 21:22:01,385 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
WARN  2016-05-20 21:22:01,586 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
WARN  2016-05-20 21:22:01,600 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
INFO  2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.1580] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1583] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1586] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,820 [[fiuze].getExtraProductData.async1.1581] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84485
INFO  2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1584] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84488
INFO  2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84490
ERROR 2016-05-20 21:22:01,892 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:01,911 [[fiuze].getExtraProductData.async1.1585] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,136 [[fiuze].getExtraProductData.async1.1591] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,231 [[fiuze].getExtraProductData.async1.1588] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,356 [[fiuze].getExtraProductData.async1.1587] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:02,425 [[fiuze].getExtraProductData.async1.1587] org.mule.api.processor.LoggerMessageProcessor: no match found

person Menashe Borbely    schedule 19.05.2016    source источник
comment
Предоставление трассировки стека исключения может помочь.   -  person David Dossot    schedule 19.05.2016
comment
Добавил лог. Дэвид - спасибо, что посмотрели, я уже многому у вас научился.   -  person Menashe Borbely    schedule 21.05.2016
comment
Похоже, что метод update на com.fiuze.components.Mongo выдает NPE. Есть идеи, почему?   -  person David Dossot    schedule 21.05.2016
comment
Дэвид, не уверен в этом. В нем говорится, что 2. невозможно вызвать метод: update (java.lang.RuntimeException), который сообщает мне, что исключение выбрасывается на этапе, когда Mule оценивает преобразователь выражения для его вызова. Как-то в этот момент, оценивая значения, он выдает NPE. Я думаю, может быть, потокобезопасность между мной, вызывающим трансформатор для инициализации, а затем его использованием. Это происходит только в Asynch, а не в случае использования синхронизации.   -  person Menashe Borbely    schedule 22.05.2016
comment
Как установлен currentSku? Похоже, это единственное, что могло вызвать ошибку eval. Эту проблему сложно решить в StackOverflow, я бы попытался поставить точку останова на org.mule.api.expression.ExpressionRuntimeException и выполнить пошаговую отладку, чтобы выяснить, что не так с состоянием приложения. А затем удалите это неудачное synchronized ключевое слово :)   -  person David Dossot    schedule 22.05.2016
comment
Как я уже сказал, мы захватили каждую возможную переменную для null. Я добавил правку, так что вы видите поток. Ключевое слово synchronized действительно важно (хотя это может быть неудачно :)), потому что просто из соображений удобства использования вызовы предоставляются статическими точками входа с экземпляром, являющимся параметром, поэтому Mule Studio может получить подпись метода. К сожалению, эти статические точки входа необходимо синхронизировать.   -  person Menashe Borbely    schedule 22.05.2016
comment
Проблема использования currentSku вместо flowVars.currentSku заключается в том, что вы не можете проверить нулевое значение, потому что значение может просто не быть привязано как переменная MVEL верхнего уровня. Как установлен currentSku?   -  person David Dossot    schedule 24.05.2016
comment
У нас есть ‹expression-language autoResolveVariables = true› в качестве глобального набора настроек, но я понимаю суть. Я также добавил две отсутствующие строки (я полагаю, что должен был добавить их в прошлый раз), которые показывают, как устанавливается currentSku. В примере с журналом вы можете ясно видеть, что проблема не в этом. Журнал показывает, что поток 1577 имеет установленное значение и по-прежнему генерирует исключение.   -  person Menashe Borbely    schedule 24.05.2016
comment
Только что понял, что вы используете версию 3.4.1 (-_-). Мне интересно, не укусил ли вас вариант этой ошибки, о которой я сообщил давно: mulesoft.org/jira/browse/MULE-6630, которая была исправлена ​​в 3.5.0 mulesoft.org/jira/browse/MULE-7414 В ранних версиях многопоточности MEL была некоторая нестабильность. Извините за то, что не обратил внимание на версию, которую вы используете раньше.   -  person David Dossot    schedule 24.05.2016
comment
Дэвид, пожалуйста, сделайте свой последний комментарий ответом, чтобы я мог отметить его галочкой. Вот и ответ. Я следил за вашим последним комментарием к JIRA, и это сработало.   -  person Menashe Borbely    schedule 30.05.2016
comment
Хорошо, здорово, рад, что ты смог преодолеть это препятствие.   -  person David Dossot    schedule 31.05.2016


Ответы (1)


Скорее всего, вас поразил вариант этой ошибки, о которой я сообщал давно: MULE-6630 которая была исправлена ​​в версии 3.5.0 (см. MULE-7414. В MEL многопоточность в ранних версиях.

person David Dossot    schedule 30.05.2016