У меня есть приложение, которое использует RX через удаленное взаимодействие с одним сервером (наблюдаемым) и многими клиентами (наблюдателями). Моя проблема заключается в том, что когда клиент (наблюдатель) отключается неправильно, не выполняя подписку (удаление), функция OnNext() на сервере начинает выдавать исключение удаленного взаимодействия.
Есть ли какой-либо механизм для отмены подписки на проблемный наблюдатель на стороне сервера?
Часть клиентского кода:
internal void SetRemoting(bool refreshInstance)
{
string channelName = "RemotingClientUI";
IDictionary dict = new Hashtable();
dict["port"] = 9988;
dict["name"] = channelName;
var bcp = new BinaryClientFormatterSinkProvider();
var channel = new TcpClientChannel(dict, bcp);
ChannelServices.RegisterChannel(channel, false);
_remoteServer = (IRemoteServerService) Activator.
GetObject(typeof (IRemoteServerService),
tcp://...");
}
private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{
_jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
_packageRowUpdate = _remoteServer.PackageRowUpdate.
Subscribe(UpdatePackageQueueRow);
_miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}
Часть кода сервера:
public class RemoteServiceService
{
public RemoteServiceService()
{
JobRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).JobRowUpdate.Remotable();
PackageRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
MiscAction = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).MiscActions.Remotable();
}
}
public class RemoteLoggerForService
{
private RemoteLoggerForService(IService service)
{
_jobRowUpdate = new Subject<IJobQueueRow>();
_packageRowUpdate = new Subject<IPackageQueueRow>();
_miscActions = new Subject<MiscRemoteObjects>();
_service = service;
}
#region Overrides of LoggerBase
public override void WriteToLog<T>(T stringFormatOrObject,
params object[] args)
{
lock (this)
try
{
lock (LockLogger)
{
if (stringFormatOrObject is IJobQueueRow &&
_jobRowUpdate != null)
{
_jobRowUpdate.OnNext(
stringFormatOrObject as IJobQueueRow);
}
if (stringFormatOrObject is IPackageQueueRow &&
_packageRowUpdate != null)
{
_packageRowUpdate.OnNext(
stringFormatOrObject as IPackageQueueRow);
}
if (stringFormatOrObject is MiscRemoteObjects &&
_miscActions != null)
{
_miscActions.OnNext(
stringFormatOrObject as MiscRemoteObjects);
}
}
}
catch(Exception ex)
{
LoggerFactory.GetLogger(LoggerType.File, null).
WriteToLog(
Utils.GetFullException("RemoteLoggerForService", ex));
}
}
#endregion
}