Erlang: простой pubsub для процессов — подходит ли мой подход?

Отказ от ответственности: я новичок в Erlang и OTP.

Мне нужен простой pubsub в Erlang/OTP, где процессы могли бы подписываться на какой-то «концентратор» и получать копии сообщений, которые были отправлены на этот концентратор.

Я знаю про gen_event, но он обрабатывает события в одном единственном процессе менеджера событий, а я хочу, чтобы каждый подписчик был отдельным, автономным процессом. Кроме того, мне не удалось получить контроль над обработчиками gen_event. К сожалению, результаты Google были полны ссылок XMPP (Ejabberd) и RabbitMQ, поэтому я не нашел ничего, относящегося к моей идее.

Моя идея состоит в том, что такая модель pubsub легко сопоставляется с деревом наблюдения. Поэтому я подумал расширить супервизор (gen_server под капотом), чтобы иметь возможность отправлять сообщения всем его дочерним элементам.

Я взломал это в своем быстром и грязном пользовательском поведении «диспетчера»:

-module(dispatcher).
-extends(supervisor).
-export([notify/2, start_link/2, start_link/3, handle_cast/2]).

start_link(Mod, Args) ->
    gen_server:start_link(dispatcher, {self, Mod, Args}, []).

start_link(SupName, Mod, Args) ->
    gen_server:start_link(SupName, dispatcher, {SupName, Mod, Args}, []).

notify(Dispatcher, Message) ->
    gen_server:cast(Dispatcher, {message, Message}).

handle_cast({message, Message}, State) ->
    {reply, Children, State} = supervisor:handle_call(which_children, dummy, State),
    Pids = lists:filter(fun(Pid) -> is_pid(Pid) end,
                 lists:map(fun({_Id, Child, _Type, _Modules}) -> Child end,
                           Children)),
    [gen_server:cast(Pid, Message) || Pid <- Pids],
    {noreply, State}.

Однако, хотя на первый взгляд кажется, что все работает нормально (дети получают сообщения и плавно перезапускаются в случае сбоя), мне интересно, когда это было хорошей идеей.

Может ли кто-нибудь покритиковать (или одобрить) мой подход и/или порекомендовать какие-то альтернативы?


person drdaeman    schedule 30.08.2011    source источник
comment
Сохраняются ли сообщения, и новые процессы, подписавшиеся, получают всю их историю. Или сообщения передаются только с момента подписки процесса?   -  person Peer Stritzinger    schedule 30.08.2011
comment
Последний; как в Redis или 0MQ Pub/Sub. Я еще раз взгляну на gen_event, спасибо.   -  person drdaeman    schedule 31.08.2011


Ответы (4)


Из вашего кода мне кажется, что обработчики gen_event идеально подходят.

Обратные вызовы обработчика вызываются из одного центрального процесса, отправляющего сообщения, но эти обратные вызовы не должны выполнять большую работу.

Поэтому, если вам нужен автономный процесс с собственным состоянием для подписчиков, просто отправьте сообщение в обратном вызове события.

Обычно эти автономные процессы будут gen_servers, и вы просто вызовете gen_server:cast из ваших обратных вызовов событий.

Надзор — это отдельная проблема, с которой может справиться обычная инфраструктура наблюдения, входящая в состав OTP. То, как вы хотите осуществлять контроль, зависит от семантики процессов подписки. Если все они являются идентичными серверами, вы можете использовать, например, simple_one_for_one.

В init обратном вызове процессов-подписчиков можно поместить gen_event:add_handler вызовы, которые добавляют их в менеджер событий.

Вы даже можете использовать диспетчер событий в качестве супервизора, если используете функцию gen_event:add_sup_handler для добавления своих процессов, если семантика этого вам подходит.

Интернет-ресурсы для лучшего понимания gen_event: Изучите главу об Erlang

В противном случае во всех книгах по Erlang есть введение в gen_event. Вероятно, самый подробный из них вы можете найти в Erlang и OTP в действии.

О, и кстати: я бы не стал хакать ваших собственных супервайзеров ради этого.

person Peer Stritzinger    schedule 30.08.2011
comment
Пример процесса, который контролирует обработчик gen_event, можно найти здесь: trapexit.org/Gen_event_behavior_demystified - person legoscia; 31.08.2011
comment
Проблема с менеджерами событий заключается в том, что они могут иметь не более одного обработчика событий каждого типа, поэтому, если вы отправляете многим события одного и того же типа, это будет один обработчик, который отправляет им всем. - person rvirding; 01.09.2011

Недавно я использовал gproc для реализации pubsub. Пример из ридми делает свое дело.

subscribe(EventType) ->
    %% Gproc notation: {p, l, Name} means {(p)roperty, (l)ocal, Name}
    gproc:reg({p, l, {?MODULE, EventType}}).

notify(EventType, Msg) ->
    Key = {?MODULE, EventType},
    gproc:send({p, l, Key}, {self(), Key, Msg}).
person puzza007    schedule 10.09.2011

Очень простой пример, когда вы делаете все это самостоятельно, находится в моем очень простом chat_demo, который представляет собой простую веб-версию. сервер чата. Посмотрите на chat_backend.erl (или chat_backend.lfe, если вам нравятся круглые скобки), который позволяет пользователям подписаться, после чего им будут отправляться все сообщения, поступающие на сервер. Он не вписывается в деревья наблюдения, хотя модификация проста (хотя он использует proc_lib для получения более качественных сообщений об ошибках).

person rvirding    schedule 31.08.2011

Когда-то давно я читал о øMQ (ZeroMQ), который имеет кучу привязок к разным языкам программирования.

http://www.zeromq.org/

http://www.zeromq.org/bindings:erlang

Если это не чистое решение erlang, это может быть выбор.

person f4m8    schedule 31.08.2011