Мы работаем на mule Community Edition 3.4.1. Инвестиции, направленные на то, чтобы код работал там, в настоящее время перевешивают варианты обновления.
Коннектор Mongo в этой версии настолько сильно истощает ресурсы, что практически непригоден для массовой обработки данных. Мы обрабатываем примерно 10–100 млн записей в день (что не так уж и много), и с коннектором Mongo из коробки нам придется перезапускать Mule примерно каждые три часа. Теперь вы можете делать это разумно, а не нет, и мы можем потратить время на то, чтобы сделать это относительно безболезненным, однако с объемом автоматизации и зависимых процессов мы вместо этого решили написать собственный коннектор Mongo.
Однако для удобства использования и для соответствия навыкам нашего существующего кадрового резерва мы хотели создать что-то, что обеспечивало бы простоту использования и БЕЗ утечки ресурсов.
По сути, мы создали простой преобразователь мула на Java, который раскрывает его функциональность с помощью статических методов. Каждый статический метод получает ссылку на экземпляр, а экземпляр имеет текущее событие MuleEvent. По сути, мы просто переопределяем метод процесса, чтобы сохранить MuleEvent, и метод doProcess, чтобы сохранить полезную нагрузку в частное свойство, а затем возвращаем ссылку на экземпляр в качестве полезной нагрузки (this).
Следующим элементом в потоке является преобразователь Expression, который вызывает статический метод с полезной нагрузкой в качестве экземпляра и содержит все выражения как простые выражения MEL, как вы могли бы это сделать в коннекторе Mongo.
Мы используем POJO, и Mule Studio может получать ссылки на статические методы, и использовать этот соединитель очень просто и легко. Он также может обрабатывать сотни миллионов вызовов без каких-либо проблем, поскольку мы очень тщательно управляем ресурсами (на самом деле, слишком консервативно), и все отлично. Кроме. Когда обработка действительно быстрая, иногда Mule теряет хладнокровие и выдает нулевое исключение. Мы точно знаем, что в этот момент в нормальном сценарии не может быть нулевого исключения. На самом деле мы даже явно добавили фильтры выражений, которые не позволяли поднимать нулевое выражение, но это все равно происходит. Но только, если есть асинхронный поток. Теперь, прежде чем мы перейдем к этому, внутри асинхронного потока компоненты все еще выполняются последовательно, и наше решение помещает экземпляр коннектора Mongo в полезную нагрузку сообщения и сразу берет его. Мы хотим узнать, как возникает эта проблема и почему.
Вот некоторые части кода, я могу добавить больше. РЕДАКТИРОВАТЬ: я уже добавлял строку в поток раньше. Вы увидите, что currentSku не может быть нулевым
<set-session-variable variableName="currentSku" value="#[payload.?sku]" doc:name="currentSku"/>
<logger message="#[currentSku]" level="INFO" category="fiuze.plugins.contentproviders.ExtraData.asycnhFlow" doc:name="Logger"/>
<expression-filter expression="#[currentSku != null ]" doc:name="filterForNoSKUinpayload"/>
<custom-transformer class="com.fiuze.components.Mongo" doc:name="mongoInit" />
<expression-transformer expression="#[com.fiuze.components.Mongo.many(payload, "fizueDemoMongo","products","{'productChannelData.default.ExtraData':{$exists:false}}",100)]" doc:name="mongoGetProductsWithoutExtraData"/>
добавьте вот код Java, который отражает этот вызов:
public static synchronized Object many(Mongo instance, String configRef, String collectionName, String query, Integer limit) throws TransformerException {
return Mongo.exec(instance, configRef, collectionName, query, limit, null, null, false);
}
и вот реализация Exec:
private static synchronized Object exec(Mongo instance, String configRef, String collectionName, Object query, Integer limit, Integer skip, Object sort, Boolean count) throws TransformerException {
instance.getMuleMessage().setPayload(instance.getPayload());
instance.setConfigref(configRef);
instance.setCollectionName(collectionName);
instance.setLimit(limit);
instance.setSkip(skip);
sort = instance.evaluate(sort);
instance.setQuery(instance.evaluate(query));
if (sort instanceof BasicDBObject) {
instance.setSort((BasicDBObject)sort);
} else if (sort instanceof String) {
instance.setSort((String)sort);
} else if (sort == null){
instance.setSort((String)null);
} else {
throw new TransformerException( (Message) instance.getMuleMessage() );
}
instance.setCount(count);
Object result = instance.execFind();
instance.dispose();
return result;
}
как вы заметили, мы используем синхронизированные вызовы, поэтому статические методы не запутаются.
Поскольку это сложный вопрос, и я не уверен, что ясно выражаю его, пожалуйста, не стесняйтесь спрашивать меня подробности, я буду более чем счастлив предоставить.
РЕДАКТИРОВАТЬ:
Вот часть журнала, я включу часть без проблем, с ошибкой, а затем без проблем. В этом разделе вы увидите, что процесс продолжается, но конкретный поток Asynch не работает.
INFO 2016-05-20 21:22:00,760 [[fiuze].getExtraProductData.async1.1579] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:00,769 [[fiuze].getExtraProductData.async1.1581] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,812 [[fiuze].getExtraProductData.async1.1590] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,825 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,831 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,887 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,959 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,995 [[fiuze].getExtraProductData.async1.1580] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84422
INFO 2016-05-20 21:22:00,996 [[fiuze].getExtraProductData.async1.1579] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84436
INFO 2016-05-20 21:22:00,998 [[fiuze].getExtraProductData.async1.1578] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84454
INFO 2016-05-20 21:22:00,999 [[fiuze].getExtraProductData.async1.1587] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84455
INFO 2016-05-20 21:22:01,000 [[fiuze].getExtraProductData.async1.1590] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84446
INFO 2016-05-20 21:22:01,002 [[fiuze].getExtraProductData.async1.1586] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84445
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1589] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84470
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84477
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1591] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84462
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1588] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84464
ERROR 2016-05-20 21:22:01,010 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1583] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84447
INFO 2016-05-20 21:22:01,003 [[fiuze].getExtraProductData.async1.1585] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84456
ERROR 2016-05-20 21:22:01,014 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.1582] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84482
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.01] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84483
INFO 2016-05-20 21:22:01,023 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.api.processor.LoggerMessageProcessor: starting ForEach
WARN 2016-05-20 21:22:01,024 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
ERROR 2016-05-20 21:22:01,034 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,035 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,145 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,153 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,268 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,269 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
WARN 2016-05-20 21:22:01,385 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
WARN 2016-05-20 21:22:01,586 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
WARN 2016-05-20 21:22:01,600 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.1580] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1583] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1586] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,820 [[fiuze].getExtraProductData.async1.1581] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84485
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1584] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84488
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84490
ERROR 2016-05-20 21:22:01,892 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:01,911 [[fiuze].getExtraProductData.async1.1585] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,136 [[fiuze].getExtraProductData.async1.1591] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,231 [[fiuze].getExtraProductData.async1.1588] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,356 [[fiuze].getExtraProductData.async1.1587] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:02,425 [[fiuze].getExtraProductData.async1.1587] org.mule.api.processor.LoggerMessageProcessor: no match found
update
наcom.fiuze.components.Mongo
выдает NPE. Есть идеи, почему? - person David Dossot   schedule 21.05.2016currentSku
? Похоже, это единственное, что могло вызвать ошибку eval. Эту проблему сложно решить в StackOverflow, я бы попытался поставить точку останова наorg.mule.api.expression.ExpressionRuntimeException
и выполнить пошаговую отладку, чтобы выяснить, что не так с состоянием приложения. А затем удалите это неудачноеsynchronized
ключевое слово :) - person David Dossot   schedule 22.05.2016currentSku
вместоflowVars.currentSku
заключается в том, что вы не можете проверить нулевое значение, потому что значение может просто не быть привязано как переменная MVEL верхнего уровня. Как установленcurrentSku
? - person David Dossot   schedule 24.05.2016