У меня есть слушатель, который получает работу в виде IPayload. Слушатель должен передать эту работу наблюдателям, которые действительно выполняют эту работу. Это моя первая грубая попытка добиться этого:
public interface IObserver
{
void DoWork(IPayload payload);
}
public interface IObservable
{
void RegisterObserver(IObserver observer);
void RemoveObserver(IObserver observer);
void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
public void DoWork(IPayload payload)
{
// do some heavy lifting
}
}
public class Listener : IObservable
{
private readonly List<IObserver> _observers = new List<IObserver>();
public void PushIncomingPayLoad(IPayload payload)
{
NotifyObservers(payload);
}
public void RegisterObserver(IObserver observer)
{
_observers.Add(observer);
}
public void RemoveObserver(IObserver observer)
{
_observers.Remove(observer);
}
public void NotifyObservers(IPayload payload)
{
Parallel.ForEach(_observers, observer =>
{
observer.DoWork(payload);
});
}
}
Является ли это допустимым подходом, который следует шаблону наблюдатель/наблюдаемый (т.е. pub sub?)? Насколько я понимаю, NotifyObservers также создает угрозу для каждой полезной нагрузки. Это правильно? Любые предложения по улучшению очень приветствуются.
Обратите внимание, что все наблюдатели должны закончить свою работу, прежде чем им будет передана новая работа в виде полезной нагрузки — порядок «наблюдения» не имеет значения. По сути, слушатель должен действовать как мастер, максимально используя ядра хоста, используя TPL. ИМХО, для этого требуется явная регистрация наблюдателей с помощью слушателя/наблюдателя.
PS:
Я думаю, что Parallel.ForEach не создает поток для каждого наблюдателя: Почему не Parallel.ForEach запускает несколько потоков? Если это правда, как я могу обеспечить создание потока для каждого наблюдателя?
Альтернатива, которую я имею в виду, такова:
public async void NotifyObservers(IPayload payload)
{
foreach (var observer in _observers)
{
var observer1 = observer;
await Task.Run(() => observer1.DoWork(payload));
}
await Task.WhenAll();
}