Присоединение подписчика/слушателя в модуле EPL к оператору с разделами контекста

У меня есть следующий модуль EPL, который успешно развертывается:

module context;

import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;

@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);

@Name('createSchemaEvent')
create schema TickEvent as TickEvent; 

@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;


@Name('compareStocks') 
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,     
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and         
B.stockCode =  context.initEvent.secondStock
);

У меня проблема со слушателями/подписчиками. По моим проверкам и отладке, у классов нет проблем, аннотации работают, они прикрепляются к ведомости при деплое, но ни один из них не получает никаких обновлений от событий.

Это мой подписчик, я просто хочу вывести, что он получен:

package subscribers;
import java.util.Map;

public class MySubscriber {

public void update(Map row) {
    System.out.println("got it");
    }
}

У меня раньше был такой же модуль без разделов контекста и тогда подписчики работали без проблем. После того, как я добавил контекст, он остановился.

До сих пор я пробовал:

  1. Проверка того, есть ли к заявлению какой-либо подписчик/слушатель (да)
  2. Проверка их имен
  3. Удалите аннотации и установите их вручную в коде Java после развертывания (то же самое - они прикрепляются, я могу получить их имя, но все еще не получаю обновления)
  4. Отладка класса подписчика. Программа либо вообще не идет туда, чтобы остановиться в точке останова, либо я получаю сообщение об ошибке (ошибка атрибута отсутствующего номера строки - ("не могу разместить там точку останова", которую я пытался исправить безрезультатно)

Любая идея, что может вызвать это или как лучше всего настроить подписчика на оператор, который имеет разделы контекста?

Это продолжение предыдущей проблемы, которая была решена здесь — Создание экземпляров epl Эспера

РЕДАКТИРОВАТЬ: события отправляются в том формате, в котором я их использую, и в формате онлайн-инструмента EPL:

Сначала я получаю пару, за которой следует следить от пользователя:

    System.out.println("First stock:"); 
    String first = scanner.nextLine();
    System.out.println("Second stock:"); 
    String second = scanner.nextLine();
    System.out.println("Difference:"); 
    double diff= scanner.nextDouble();
    InitEvent init = new InitEvent(first, second, diff);

После этого у меня есть поток двигателя, который постоянно отправляет события, но перед его запуском InitEvents отправляется как таковой:

@Override
public void run() {

    runtime.sendEvent(initEvent);   

    while (contSimulation) {

        TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
        runtime.sendEvent(tick1);

        TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
        runtime.sendEvent(tick2);

        TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
        runtime.sendEvent(tick3);

        TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
        runtime.sendEvent(tick4);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();

    }

} 

Раньше я не пользовался онлайн-инструментом, но думаю, что он у меня работает. Это текст модуля:

module context; 

create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);

create context TwoStocksContext
initiated by InitEvent as initEvent;

context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice, 
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and 
B.stockCode =  context.initEvent.secondStock
);

И последовательность событий:

InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}

TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}

Результат, который я получаю, используя их:

At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH', 
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH', 
b_stockCode='GOO'}

Если я сделаю второй набор событий с разницей менее 5 между YAH/GOO, я получу вывод только из первой пары, что имеет смысл. Это, я думаю, то, что он должен делать.

При необходимости эти два метода читают и обрабатывают аннотации модуля EPL (я их не писал сам, они взяты из класса контекста coinTrader, который можно найти здесь - https://github.com)./timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java ):

private static Object getSubscriber(String className) throws Exception {

    Class<?> cl = Class.forName(className);
    return cl.newInstance();
}

private static void processAnnotations(EPStatement statement) throws Exception {

    Annotation[] annotations = statement.getAnnotations();
    for (Annotation annotation : annotations) {
        if (annotation instanceof Subscriber) {

            Subscriber subscriber = (Subscriber) annotation;
            Object obj = getSubscriber(subscriber.className());
            System.out.println(subscriber.className());
            statement.setSubscriber(obj);

        } else if (annotation instanceof Listeners) {

            Listeners listeners = (Listeners) annotation;
            for (String className : listeners.classNames()) {
                Class<?> cl = Class.forName(className);
                Object obj = cl.newInstance();
                if (obj instanceof StatementAwareUpdateListener) {
                    statement.addListener((StatementAwareUpdateListener) obj);
                } else {
                    statement.addListener((UpdateListener) obj);
                }
            }


        }
    }
}

person jocund    schedule 24.01.2018    source источник
comment
Покажите события, которые вы отправляете. Этот дизайн ожидает InitEvent перед другими событиями. Возможно, предоставьте событие, которое вы отправляете, в формате онлайн-инструмента, чтобы его было легко вырезать и вставить в инструмент.   -  person user650839    schedule 24.01.2018
comment
Прошу прощения, я не подумал об этом. Я отредактировал вопрос, чтобы добавить его.   -  person jocund    schedule 25.01.2018
comment
Я немного не понял, какой еще вопрос? В том, что он работает в онлайн-инструменте EPL, но вы не можете заставить его работать в программе?   -  person user650839    schedule 25.01.2018
comment
Почему подписчики/слушатели не получают обновлений, хотя кажется, что они привязаны, а в остальном все в порядке?   -  person jocund    schedule 25.01.2018
comment
Это много кода. Я бы рекомендовал продолжать упрощать код и EPL, пока он не заработает.   -  person user650839    schedule 25.01.2018


Ответы (1)


Ну, после месяца борьбы я наконец решил это. Если у кого-то возникнет подобная проблема в будущем, вот где была проблема. EPL отлично работал в онлайн-инструменте, но не в моем коде. В конце концов я понял, что начальные события не запускаются, поэтому разделы контекста не создаются, и в результате подписчики и слушатели не получают никаких обновлений. Моя ошибка заключалась в том, что у меня был запущен POJO InitEvent, но событие, которое использовал контекст, было создано в модуле EPL с помощью схемы создания. Я не знаю, о чем я думал, теперь это имеет смысл, что это не сработало. В результате события, которые я запускаю в Java, не являются событиями, которые использует контекст. Мое решение было только в рамках EPL. Поскольку я не мог понять, могу ли я запускать события в Java, созданные в модуле, я создал схему, которая заполняется моим POJO, а поток затем используется контекстом как таковой:

@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference 
double);

@Name('insertInitEvent')
insert into StartEvent 
select * from InitEvent; 

Все остальное осталось прежним, как и код Java.

person jocund    schedule 13.03.2018