Hazelcast Jet 0.6.1 + API конвейера + пользовательский процессор

Я пытаюсь подключить пользовательский процессор к определению Hazelcast Jet Pipeline.

Вот пример кода.

private Pipeline buildPipeline() {

   Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<String, Record>remoteMapJournal("record", 
   getClientConfig(), START_FROM_OLDEST))
      .addTimestamps((v) ->  getTimeStamp(v), 3000)
      .peek()
      .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
      .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, 
      SLIDE_STEP_MILLIS))
      .aggregate(counting())
      .map((v)-> getMapKey(v))
      .customTransform("test2", ()-> this);

     return p;

    }

Вот пример кода для метода tryProcess ()

    protected boolean tryProcess(int ordinal, Object item) {
    TimestampedEntry entry = (TimestampedEntry)item;
    System.out.println("Item value is "+ item);
    map1.put(entry.getKey(), entry.getValue());
    return true;
}

При исполнении возникло следующее исключение

java.lang.IllegalArgumentException: These transforms have nothing attached to them: [test2]
    at com.hazelcast.jet.impl.pipeline.Planner.validateNoLeakage(Planner.java:104)
    at com.hazelcast.jet.impl.pipeline.Planner.createDag(Planner.java:65)
    at com.hazelcast.jet.impl.pipeline.PipelineImpl.toDag(PipelineImpl.java:85)
    at com.hazelcast.jet.JetInstance.newJob(JetInstance.java:94)
    at com.sap.banking.bc.AbstractAuditLogProcessor1.start(AbstractAuditLogProcessor1.java:132)
    at com.sap.banking.bc.JobProcessor.main(JobProcessor.java:18)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:812)

Чего не хватает? Не могли бы вы помочь мне решить эту проблему?


person Harshad Murtekar    schedule 19.06.2018    source источник
comment
Вы не должны использовать индивидуальное преобразование для вашего случая. Вы можете просто использовать приемник карты.   -  person Can Gencer    schedule 19.06.2018


Ответы (1)


Исключение говорит о том, что вы пропустили приемник, drainTo вызов.

person Oliv    schedule 19.06.2018