API на основе Observable и проблема с отменой подписки

Я пытаюсь использовать Rx-Java для создания класса для отслеживания местоположения на Android. Чего я до сих пор не могу понять, так это того, как правильно обрабатывать жизненный цикл моего Observable. Я хочу иметь Observable, который начинает отслеживать местоположение, когда происходит первая подписка, и останавливает отслеживание местоположения, когда последняя подписка отбрасывается. То, что я достиг до сих пор, это:

public class LocationObservable 
    implements GooglePlayServicesClient.ConnectionCallbacks, 
               GooglePlayServicesClient.OnConnectionFailedListener, 
               LocationListener {

    private LocationClient locationClient;
    private final PublishSubject<Location> latestLocation = 
        PublishSubject.create();
    public final Observable<Location> locationObservable = 
        Observable.defer(() -> {
        if(!locationClient.isConnected() && !locationClient.isConnecting()) {
            locationClient.connect();
        }
        return latestLocation.asObservable().scan((prev, curr) -> {
            if (Math.abs(prev.getLatitude() - curr.getLatitude()) > 0.000001 ||
                Math.abs(prev.getLongitude() - curr.getLongitude()) > 0.000001)
                return curr;
            else
                return prev;
        }).distinctUntilChanged();});

    public LocationObservable(Context context) {

        locationClient = new LocationClient(context, this, this);
    }

    @Override
    public void onConnected(Bundle bundle) {
        latestLocation.onNext(locationClient.getLastLocation());
    }

    @Override
    public void onDisconnected() {
        latestLocation.onCompleted();
    }

    @Override
    public void onConnectionFailed(ConnectionResult connectionResult) {
        latestLocation.onError(new Exception(connectionResult.toString()));
    }

    @Override
    public void onLocationChanged(Location location) {
        latestLocation.onNext(location);
    }
}

Как видите, я использую Observable#defer для инициализации обратных вызовов местоположения при подписке первого клиента. Я не знаю, хороший ли это подход, но это лучшее, что я придумал на данный момент. Чего мне все еще не хватает, так это того, как остановить обновления местоположения, когда последний клиент моего класса отписывается от моего наблюдаемого. Или, может быть, это что-то неидиоматическое в Rx, так как это не очевидно?

Я считаю, что этот вариант использования должен быть довольно стандартным, и поэтому для него должно быть стандартное/идиоматическое решение. Был бы счастлив узнать это.


person Haspemulator    schedule 02.03.2014    source источник


Ответы (1)


person    schedule
comment
Эй, спасибо за идею, это именно то, что я искал: получать уведомления, когда происходит отписка. Однако, насколько я понимаю, эта часть RxJava API меняется с выпуском 0.17. Ваше решение основано на новом или старом API? Будет ли он совместим или легко обновится до нового, если еще нет? - person Haspemulator; 04.03.2014
comment
Это совместимо. Однако я обновил его до нового API. - person zsxwing; 04.03.2014
comment
Кроме того, метод unsubscribe можно вызывать в любом потоке. Если вы используете некоторые Scheduler, вам может понадобиться добавить некоторую синхронизацию в unsubscribe. RxJava обещает, что unsubscribe вызывается только один раз, но не гарантирует видимость памяти. - person zsxwing; 05.03.2014
comment
Под unsubscribe вы подразумеваете обратный вызов, переданный Subscriptions.create, или что-то еще? - person Haspemulator; 06.03.2014
comment
Да, обратный вызов передан в Subscriptions.create - person zsxwing; 06.03.2014