передать работу наблюдателям

У меня есть слушатель, который получает работу в виде IPayload. Слушатель должен передать эту работу наблюдателям, которые действительно выполняют эту работу. Это моя первая грубая попытка добиться этого:

public interface IObserver
{
    void DoWork(IPayload payload);
}

public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}

public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}

public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();

    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }

    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }

    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }

    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

Является ли это допустимым подходом, который следует шаблону наблюдатель/наблюдаемый (т.е. pub sub?)? Насколько я понимаю, NotifyObservers также создает угрозу для каждой полезной нагрузки. Это правильно? Любые предложения по улучшению очень приветствуются.

Обратите внимание, что все наблюдатели должны закончить свою работу, прежде чем им будет передана новая работа в виде полезной нагрузки — порядок «наблюдения» не имеет значения. По сути, слушатель должен действовать как мастер, максимально используя ядра хоста, используя TPL. ИМХО, для этого требуется явная регистрация наблюдателей с помощью слушателя/наблюдателя.

PS:

Я думаю, что Parallel.ForEach не создает поток для каждого наблюдателя: Почему не Parallel.ForEach запускает несколько потоков? Если это правда, как я могу обеспечить создание потока для каждого наблюдателя?

Альтернатива, которую я имею в виду, такова:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}

person cs0815    schedule 13.02.2016    source источник


Ответы (1)


Конечно, вы можете сделать это так, но в .net это не нужно, если вы не хотите изобретать велосипед :-) В c # это можно сделать с помощью событий.

Краткий пример:

  //Your Listener who has a public eventhandler where others can add them as listeners
  public class Listener{
      //your eventhandler where others "add" them as listeners
      public event EventHandler<PayLoadEventsArgs> IncomingPayload;

      //a method where you process new data and want to notify the others
      public void PushIncomingPayLoad(IPayload payload)
      {
          //check if there are any listeners
          if(IncomingPayload != null)
              //if so, then notify them with the data in the PayloadEventArgs
              IncomingPayload(this, new PayloadEventArgs(payload));
      }
  }  

  //Your EventArgs class to hold the data
  public class PayloadEventArgs : EventArgs{

      Payload payload { get; private set; }  

      public PayloadEventArgs(Payload payload){
          this.payload = payload;
      }
  }

  public class Worker{
      //add this instance as a observer
      YourListenerInstance.IncomingPayload += DoWork;

      //remove this instance 
      YourListenerInstance.IncomingPayload -= DoWork;

      //This method gets called when the Listener notifies the  IncomingPayload listeners
      void DoWork(Object sender, PayloadEventArgs e){
          Console.WriteLine(e.payload);
      }
   }

РЕДАКТИРОВАТЬ: Поскольку вопрос требует параллельного выполнения, как насчет создания нового потока на стороне подписчика? Я думаю, что это самый простой способ добиться этого.

//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () => 
{
      //Do your work here
});

//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () => 
{
      //Do your work here
}, TaskCreationOptions.LongRunning);

Надеюсь, это ответ на ваш вопрос? Если нет, пожалуйста, оставьте комментарий.

person sebhaub    schedule 13.02.2016
comment
Спасибо, я тоже знаю об этом подходе. Как вы гарантируете, что у каждого воркера здесь есть свой поток? - person cs0815; 13.02.2016
comment
извините, я пропустил эту часть вашего вопроса, я отредактировал свой ответ. На мой взгляд, создать новый поток в методе подписчика - самый простой подход. Что ты об этом думаешь? - person sebhaub; 13.02.2016
comment
на самом деле я думаю, что создание нового потока в наблюдателе не сработает в моем сценарии. Мне нужно обеспечить следующее: Допустим, у меня есть 3 наблюдателя, подписанных на слушателя: O1, O2 и O3. Мне нужно убедиться, что все 3 наблюдателя завершили свою работу, прежде чем им снова будет передана новая работа. Я думаю, что это не гарантируется созданием новых потоков в воркере. По сути, слушатель должен быть мастером, используя все ядра для выполнения работы в наблюдателях. - person cs0815; 13.02.2016
comment
Вы хотите заблокировать слушателя, пока работа не будет выполнена наблюдателями? - person sebhaub; 16.02.2016
comment
Да, см. также stackoverflow.com/questions/35394672/ - person cs0815; 16.02.2016