Как сделать WaitAll с помощью Akka.Net?

У меня есть иерархия субъектов в Akka.Net, и мне интересно, правильно ли я выбрал способ что-то сделать, или есть лучшие / более простые способы достичь того, чего я хочу.

Мой конкретный пример: я создаю субъект User в ответ на вход пользователя в систему, и при создании этого субъекта мне нужны два фрагмента данных, чтобы завершить построение объекта актер.

Если бы это был обычный .NET-код, у меня могло бы получиться что-то вроде следующего ...

public Task<User> LoadUserAsync (string username)
{
  IProfileService profileService = ...;
  IMessageService messageService = ...;

  var loadProfileTask = profileService.GetUserProfileAsync(username);
  var loadMessagesTask = messageService.GetMessagesAsync(username);

  Task.WaitAll(loadProfileTask, loadMessagesTask);

  // Now construct the user from the result of both tasks
  var user = new User
  {
    Profile = loadProfileTask.Result,
    Messages = loadMessagesTask.Result
  }

  return Task.FromResult(user);
}

Здесь я использую WaitAll, чтобы дождаться завершения подчиненных задач и позволить им работать одновременно.

Мой вопрос: если бы я хотел сделать то же самое в Akka.Net, был бы следующий способ сделать это наиболее часто? Графически я создал следующее ...

Иерархия актеров

Когда я создаю своего актора User, я затем создаю (временный) User Loader Actor, задача которого состоит в том, чтобы получить полные сведения о пользователе путем вызова актора Profile и субъекта Messages. Листовые акторы, получающие данные, следующие ...

public class UserProfileLoader : ReceiveActor
{
    public UserProfileLoader()
    {
        Receive<LoadUserRequest>(msg =>
        {
            // Load the user profile from somewhere
            var profile = new UserProfile();

            // And respond to the Sender
            Sender.Tell(profile);
            Self.Tell(PoisonPill.Instance);
        });
    }
}

public class UserMessagesLoader : ReceiveActor
{
    public UserMessagesLoader()
    {
        Receive<LoadUserRequest>(msg =>
        {
            // Load the messages from somewhere
            var messages = new List<Message>();

            // And respond to the Sender
            Sender.Tell(messages);
            Self.Tell(PoisonPill.Instance);
        });
    }
}

На самом деле не имеет значения, откуда они берут данные для этого обсуждения, но оба просто отвечают на запрос, возвращая некоторые данные.

Затем у меня есть актер, который координирует двух участников сбора данных ...

public class UserLoaderActor : ReceiveActor
{
    public UserLoaderActor()
    {
        Receive<LoadUserRequest>(msg => LoadProfileAndMessages(msg));
        Receive<UserProfile>(msg =>
        {
            _profile = msg;
            FinishIfPossible();
        });

        Receive<List<Message>>(msg =>
        {
            _messages = msg;
            FinishIfPossible();
        });
    }

    private void LoadProfileAndMessages(LoadUserRequest msg)
    {
        _originalSender = Sender;
        Context.ActorOf<UserProfileLoader>().Tell(msg);
        Context.ActorOf<UserMessagesLoader>().Tell(msg);
    }

    private void FinishIfPossible()
    {
        if ((null != _messages) && (null != _profile))
        {
            _originalSender.Tell(new LoadUserResponse(_profile, _messages));
            Self.Tell(PoisonPill.Instance);
        }
    }

    private IActorRef _originalSender;
    private UserProfile _profile;
    private List<Message> _messages;
}

Это просто создает двух подчиненных субъектов, отправляет им сообщение для взлома, а затем ожидает ответа обоих перед отправкой обратно всех данных, которые были собраны исходному запрашивающему.

Итак, кажется ли это разумным способом скоординировать два несопоставимых ответа, чтобы объединить их? Есть ли более простой способ сделать это, чем сделать это самому?

Заранее благодарим за ответы!


person Morgan Skinner    schedule 16.02.2016    source источник
comment
Я не знаю, нужен ли вам отдельный актер в качестве фасада для получения только двух ответов. Не мог User субъект отправить эти два запроса напрямую?   -  person Bartosz Sypytkowski    schedule 16.02.2016
comment
Я действительно мог бы заставить пользовательского актора проделать всю эту работу, но поскольку у пользовательского актора, скорее всего, будет еще больше дел, я решил разделить ответственность здесь на загрузчик. Независимо от распределения работы, имеет ли смысл вышеизложенное и / или есть ли лучший / меньше кода / более стандартный способ сделать это?   -  person Morgan Skinner    schedule 16.02.2016
comment
Происходит ли что-то особенное в субъектах профиля и сообщений или они только извлекают данные из БД? если так, я бы, вероятно, просто обернул бы это каким-нибудь асинхронным методом, который выполняет WhenAll для обеих подзадач, а затем вернул бы эту задачу, чтобы вызывающий мог PipeTo(Self), когда это будет сделано ... Я знаю, что мы проповедуем детям опасную работу, но нужно учитывать, если вы действительно выиграете от этого, или если это просто приведет к раздуванию кода ...   -  person Roger Johansson    schedule 16.02.2016


Ответы (3)


Спасибо, ребята, теперь я значительно упростил актера до следующего, основываясь на предложениях Роджера и Джеффа ...

public class TaskBasedUserLoader : ReceiveActor
{
    public TaskBasedUserLoader()
    {
        Receive<LoadUserRequest>(msg => LoadProfileAndMessages(msg));
    }

    private void LoadProfileAndMessages(LoadUserRequest msg)
    {
        var originalSender = Sender;
        var loadPreferences = this.LoadProfile(msg.UserId);
        var loadMessages = this.LoadMessages(msg.UserId);

        Task.WhenAll(loadPreferences, loadMessages)
            .ContinueWith(t => new UserLoadedResponse(loadPreferences.Result, loadMessages.Result), 
                TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously)
            .PipeTo(originalSender);
    }

    private Task<UserProfile> LoadProfile(string userId)
    {
        return Task.FromResult(new UserProfile { UserId = userId });
    }

    private Task<List<Message>> LoadMessages(string userId)
    {
        return Task.FromResult(new List<Message>());
    }
}

Методы LoadProfile и LoadMessages в конечном итоге вызовут репозиторий для получения данных, но пока у меня есть краткий способ сделать то, что я хотел.

Еще раз спасибо!

person Morgan Skinner    schedule 16.02.2016
comment
Рад помочь. Остерегайтесь AttachedToParent, здесь родитель не Task.WhenAll, это текущая задача, которая вызывает LoadProfileAndMessages (в данном случае их нет). И вы используете & вместо | чтобы объединить флаги, так что вы получите ноль. - person Jeff Cyr; 19.02.2016
comment
Кстати, мое использование TaskContinuationOptions было скопировано из статьи Petabridge об использовании PipeTo здесь - petabridge.com/blog/akkadotnet-async-actors-using-pipeto. Так что я думаю, что это тоже нужно обновить, так как там тоже есть два случая этой же проблемы. - person Morgan Skinner; 21.02.2016

ИМХО, это действительный процесс, поскольку вы разветвляете действие, а затем присоединяетесь к нему.

Кстати, вы могли бы использовать this.Self.GracefulStop(new TimeSpan(1)); вместо отправки ядовитой таблетки.

person profesor79    schedule 16.02.2016
comment
Спасибо - я бы не стал рассматривать способы остановки Актера, прочитав это - getakka.net/docs/working-with-actors/stopping-actors Я изменил код на Context.Stop (Self). - person Morgan Skinner; 16.02.2016

Вы можете использовать комбинацию Ask, WhenAll и PipeTo:

var task1 = actor1.Ask<Result1>(request1);
var task2 = actor2.Ask<Result2>(request2);

Task.WhenAll(task1, task2)
    .ContinueWith(_ => new Result3(task1.Result, task2.Result))
    .PipeTo(Self);

...

Receive<Result3>(msg => { ... });
person Jeff Cyr    schedule 16.02.2016