Используя Actor Supervised, как повторить одно и то же сообщение определенное количество раз с заданным интервалом, если произойдет сбой

Я следую коду из отказоустойчивости akka.io http://doc.akka.io/docs/akka/current/java/fault-tolerance.html. Я взял этот код в качестве эталона. Мои требования следующие: предположим, что актор терпит крах при получении сообщения и перезапускается своим руководителем. Затем он начинает обрабатывать очередное сообщение в своем почтовом ящике. Сообщение, вызвавшее сбой, «отброшено». Но я хочу обработать одно и то же действие в течение определенного количества раз (допустим, 3 раза) с определенным интервалом между ними (допустим, 1 секунда). Как это сделать с помощью контроля akka. На самом деле через актера я пытаюсь проверить, работает ли конкретный API-интерфейс службы или нет (т.е. Даю какое-то исключение). Поэтому, если есть какое-либо исключение при конкретной попытке (предположим, 404 не найдено), повторно отправьте сообщение неудавшемуся работнику до тех пор, пока maxNrOfRetries достигается, как указано в supervisorStrategy. Если рабочий не смог выполнить "maxNrOfRetries" раз, то просто зарегистрируется как "максимальное количество попыток, достигнутых для этого сообщения xx". Как я сделаю это в java.

мой класс супервайзера:

public class Supervisor extends UntypedActor {


 private static SupervisorStrategy strategy =

 new OneForOneStrategy(3, Duration.create("1 minute"),
  new Function<Throwable, Directive>() {
    @Override
    public Directive apply(Throwable t) {
      if (t instanceof Exception) {
        return restart();
      }else if (t instanceof IllegalArgumentException) {
        return stop();
      } else {
        return escalate();
      }
    }
  });

 @Override
 public SupervisorStrategy supervisorStrategy() {
 return strategy;


}
public void onReceive(Object o) {
if (o instanceof Props) {
  getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
  unhandled(o);
}


 }
}

Детский класс:

public class Child extends UntypedActor {


  public void onReceive(Object o) throws Exception {
if (o instanceof String) {
Object response = someFunction( (String) message);//this function returns either successfull messgae as string or exception
if(response instanceOf Exception) {
     throw (Exception) response;
   } 
   else
     getSender().tell(response, getSelf())
}else {
  unhandled(o);
}


}

}

Создание актера:

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell("serVice_url", ActorRef.noSender());

Для service_url я хочу повторить процесс, если произойдет сбой. Но этого не происходит. если написать следующую строку при создании актера как child.tell("serVice_url_2", ActorRef.noSender());, то эта строка будет выполнена, но я хочу обработать одно и то же действие (для которого происходит сбой) в течение определенного количества раз (допустим, 3 раза) с определенным интервалом между ними. Пожалуйста, помогите мне достичь этого.


person Mithun Debnath    schedule 20.02.2017    source источник


Ответы (1)


Я думаю, что разработал способ. Хотя мне все равно придется провести тест на производственном уровне. Я пишу ответ ниже, так как он может быть полезен для тех, кто пытается добиться того же. Если кто-то найдет лучший подход, то он/ она приветствуется. Хочу отметить здесь, что с помощью этого подхода супервизор обрабатывает одно и то же действие (с сообщением, для которого происходит сбой) определенное количество раз (допустим, 3 раза) в пределах временного диапазона. Мне не удалось определить интервал между ними. вот код. Класс Супервайзер.

public class MyUntypedActor extends UntypedActor {
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10.
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES),
        new Function<Throwable, SupervisorStrategy.Directive>() {
            @Override
            public SupervisorStrategy.Directive apply(Throwable t) {
                if (t instanceof Exception) {
                    //System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage());
                    return restart();
                } else if (t instanceof NullPointerException) {
                    return restart();
                } else if (t instanceof IllegalArgumentException) {
                    return stop();
                } else {
                    return escalate();
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() {
    return strategy;
}

public void onReceive(Object o) {
    if (o instanceof Props) {
        getSender().tell(getContext().actorOf((Props) o), getSelf());
    } else {
        unhandled(o);
    }
}
}

Дочерний класс, в котором мы будем писать нашу логику.

public class Child extends UntypedActor {
//Through preRestart it will push the message for which exception occured before the restart of the child
@Override
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception {
    System.out.println("reStarting :::" + message.get());
    SetRules.setRemainingTrials(SetRules.remainingTrials + 1);
    getSelf().tell(message.get(), getSender());
};

public void onReceive(Object o) throws Exception {

    if (o instanceof Exception) {
        throw (Exception) o;
    } else if (o instanceof Integer) {
    } else if (o.equals("get")) {
        getSender().tell("get", getSelf());
    } else if (o instanceof String) {

        try {
            // here either we can write our logic directly or for a better
            // approach can call a function where the logic will be excuted.
            getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}",
                    getSelf());
        } catch (Exception ex) {
            if (SetRules.remainingTrials == SetRules.noOfRetries) {
                getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}",
                        getSelf());
            } else {
                Exception value1 = ex;
                throw (Exception) value1;
            }
        }
    } else {
        unhandled(o);
    }
}
}

Класс SetRules, который имеет информацию о пользователе, предоставляет noOfRetrials, а также хранит информацию о количестве повторных попыток в каждом состоянии повторной попытки через оставшиесяTrials.

public class SetRules {

public static int noOfRetries;
public static int remainingTrials;

public SetRules(int noOfRetries, int remainingTrials) {
    super();
    SetRules.noOfRetries = noOfRetries;
    SetRules.remainingTrials = remainingTrials;
}

public int getRemainingTrials() {
    return remainingTrials;
}

public static void setRemainingTrials(int remainingTrials) {
    SetRules.remainingTrials = remainingTrials;
}
}

Теперь давайте создадим актера.

Props superprops = Props.create(MyUntypedActor.class);
SetRules setRules=new SetRules(3,0);
ActorSystem system = ActorSystem.create("helloakka");
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes"));
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes")));
Object result =  Await.result(future, Duration.create(5, "minutes"));
System.out.println(result);
person Mithun Debnath    schedule 22.02.2017
comment
Я задал похожий вопрос почти одновременно с вами; Я тоже просматривал ваши. С помощью этого решения вы действительно можете вернуть сообщение, но вы не можете определить время (отсрочку) между перезапусками. - person x80486; 22.02.2017
comment
Да, я не смог определить интервал между ними. Любое предложение, как это сделать? - person Mithun Debnath; 23.02.2017