Создайте наблюдаемую оболочку для класса, не являющегося потокобезопасным.

У меня есть класс,

public class Test
{
  public int Calc();
}

который требует, чтобы все вызовы Calc выполнялись в том же потоке, что и тот, в котором был создан Test. Мне нужно создать Test один раз (дорогая операция) и несколько раз вызвать Calc.

Я хотел бы иметь оболочку, которая позволит мне асинхронно вызывать Calc:

public class TestWrapper
{
  private Test _test;
  public IObservable<int> Calc();
}

Один из способов сделать это — создать BackgroundWorker или Thread и использовать его в качестве гарантии того, что все операции над Test выполняются в одном потоке. Для простоты мы можем предположить, что все вызовы Calc() будут выполняться последовательно, поэтому не нужно беспокоиться об очередях.

Есть ли более элегантный способ RX сделать это?


person Sergey Aldoukhov    schedule 15.08.2011    source источник


Ответы (3)


Если возможно создание Test при создании TestWrapper, то этот класс, похоже, соответствует вашим требованиям:

public class TestWrapper
{
    public TestWrapper(Func<Test> factory)
    {
        _scheduler = new EventLoopScheduler();
        _test = Observable.Start(factory, _scheduler).First();
    }

    private readonly EventLoopScheduler _scheduler;
    private readonly Test _test;

    public IObservable<int> Calc()
    {
        return Observable.Start(() => _test.Calc(), _scheduler);
    }
}

Он используется так:

var testWrapper = new TestWrapper(() => new Test());
testWrapper.Calc().Subscribe(x => { });

Я протестировал его, и он создает Test в том же потоке, в котором выполняется Calc. Подписка, с другой стороны, обрабатывается в том же потоке, в котором был создан сам testWrapper (т. е. в вызывающем потоке).

person Enigmativity    schedule 16.08.2011

Итак, из комментариев и повторного чтения вашего вопроса я понял, что вы хотите неоднократно вызывать Calc() в постоянном потоке и иметь результаты возврата, доступные как IObservable<Int>()?

В этом случае я бы использовал Observable.Create, чтобы обернуть класс Test, и EventLoopScheduler, чтобы убедиться, что вызовы Calc выполняются в одном потоке.

public class TestWrapper
{
  private Test _test;
  public IObservable<int> Calc()
  {
    return Observable.Create(obsvr =>
    {
        var fixedThreadsched = new EventLoopScheduler();
        var disp = new BooleanDisposable();
        while (!disp.IsDisposed)
        {
            fixedThreadsched.Schedule(() => obsvr.OnNext(_test.Calc()));
        }

        return disp;
    });
  }
}
person Scott Weinstein    schedule 15.08.2011
comment
Спасибо, EventLoopScheduler — это ключ к маршалингу вызовов в один и тот же поток. Тем не менее, этот ответ не объясняет, как переключиться с TestWrapper.Calc, который будет выполняться в потоке пользовательского интерфейса, на test.Calc... - person Sergey Aldoukhov; 16.08.2011
comment
@Sergey - Обновлено на основе ваших комментариев - person Scott Weinstein; 16.08.2011
comment
Это ближе... Недостающая часть заключается в том, что вы не создаете _test (что должно быть сделано в EventLoopScheduler). Хм... Трудно решить - вы были первым с EventLoopScheduler, но @enigmativity дал правильный ответ. Спасибо, ребята, вы оба молодцы! - person Sergey Aldoukhov; 16.08.2011
comment
Достаточно справедливо, но вышеизложенное должно дать четкое представление о том, как это сделать. - person Scott Weinstein; 16.08.2011

используйте класс ThreadLocal<T> при создании экземпляра Test:

var MyTEST = new ThreadLocal<Test>();

тогда вы можете использовать MyTEST.Value.Calc () для любых звонков...

Другой вариант — использовать put [ThreadStatic] для члена Test класса-оболочки... см. http://msdn.microsoft.com/en-us/library/system.threadstaticattribute.aspx

В зависимости от того, нужно ли вам более одного экземпляра Test, вы можете сделать его Singleton .

person Yahia    schedule 15.08.2011