Android Retrofit 2 + RxJava: слушайте бесконечный поток

Могу ли я использовать Retrofit + RxJava для прослушивания бесконечного потока? Например, поток в Твиттере. У меня есть это:

public interface MeetupAPI {
    @GET("http://stream.meetup.com/2/rsvps/")
    Observable<RSVP> getRSVPs();
}

MeetupAPI api = new Retrofit.Builder()
            .baseUrl(MeetupAPI.RSVP_API)
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build()
            .create(MeetupAPI.class);

api.getRSVPs()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(rsvp -> Log.d(TAG, "got rsvp"),
                error -> Log.d(TAG, "error: " + error),
                () -> Log.d(TAG, "onComplete"));

но «onComplete» вызывается после анализа первого объекта. Есть ли способ заставить Retrofit оставаться открытым до дальнейшего уведомления?


person ticofab    schedule 13.04.2016    source источник


Ответы (2)


Вот мое решение:

Вы можете использовать аннотацию @Streaming:

public interface ITwitterAPI {

    @GET("/2/rsvps")
    @Streaming
    Observable<ResponseBody> twitterStream();
}

ITwitterAPI api = new Retrofit.Builder()
          .baseUrl("http://stream.meetup.com")
          .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
          .build().create(ITwitterAPI.class);

С помощью @Streaming мы можем получить исходные данные от ResponseBody.

Вот моя функция для переноса тела, разделенного строками, с событиями:

public static Observable<String> events(BufferedSource source) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
                subscriber.onCompleted();
            } catch (IOException e) {
                e.printStackTrace();
                subscriber.onError(e);
            }
        }
    });
}

И использование результата:

api.twitterStream()
  .flatMap(responseBody -> events(responseBody.source()))
  .subscribe(System.out::println);

уведомление об изящной остановке

Когда мы отписываемся, модификация закрывает входной поток. Но невозможно определить закрытый входной поток или нет из самого входного потока, поэтому единственный способ - попробовать прочитать из потока - мы получим исключение с сообщением Socket closed. Мы можем интерпретировать это исключение как закрытие:

        @Override
        public void call(Subscriber<? super String> subscriber) {
            boolean isCompleted = false;
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
            } catch (IOException e) {
                if (e.getMessage().equals("Socket closed")) {
                    isCompleted = true;
                    subscriber.onCompleted();
                } else {
                    throw new UncheckedIOException(e);
                }
            }
            //if response end we get here
            if (!isCompleted) {
                subscriber.onCompleted();
            }
        }

И если соединение закрыто из-за окончания ответа, у нас нет никаких исключений. Вот isCompleted проверьте это. Дайте мне знать, если я ошибаюсь :)

person zella    schedule 15.04.2016
comment
Я использовал ваш код, но он не работает. Сетевой вызов выполняется только один раз, но я не получаю ни ответа, ни ошибки. @целла - person davideagostini; 11.11.2020
comment
ссылка на мой код gist.github.com/davideagostini/33223f4697e82bff9f4747580e608391 - person davideagostini; 11.11.2020

Zella ответ подходит для Retrofit2 с rxJava. Для rxJava2 я изменил пользовательский наблюдаемый объект следующим образом:

//imports
import io.reactivex.Observable                              
import io.reactivex.disposables.Disposable                  
import io.reactivex.schedulers.Schedulers                   
import okio.BufferedSource                   
import java.io.IOException                             



fun events(source: BufferedSource): Observable<String> {         
    return Observable.create { emitter ->                        
        var isCompleted = false                                  
        try {                                                    
            while (!source.exhausted()) {                        
                emitter.onNext(source.readUtf8Line()!!)          
            }                                                    
            emitter.onComplete()                                 
        } catch (e: IOException) {                               
            e.printStackTrace()                                  
            if (e.message == "Socket closed") {                  
                isCompleted = true                               
                emitter.onComplete()                             
            } else {                                             
                throw IOException(e)                             
            }                                                    
        }                                    
        if (!isCompleted) {                                      
            emitter.onComplete()                                 
        }                                                        
    }                                                            
}                                                                

Изменения в зависимостях build.gradle на уровне модуля:

//retrofit rxJava2 adapter
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'

//rx-java
implementation 'io.reactivex.rxjava2:rxjava:2.2.11'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

Изменения адаптера модернизации:

ITwitterAPI api = new Retrofit.Builder()
          .baseUrl("http://stream.meetup.com")
          .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
          .build().create(ITwitterAPI.class);

И назвал Streaming API как

api.twitterStream()
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .flatMap { responseBody-> events(responseBody.source()) }
        .subscribe({ t ->
            Log.i(TAG, "onNext t=$t")
        }, { e ->
            Log.i(TAG, "onError e=$e")
        }, {
            Log.i(TAG, "onFinish")
        })
person Shankar Bhagwati    schedule 08.01.2020
comment
Я использовал ваш код, но он не работает. Сетевой вызов выполняется только один раз, но я не получаю ни ответа, ни ошибки. - person davideagostini; 11.11.2020
comment
ссылка на мой код gist.github.com/davideagostini/33223f4697e82bff9f4747580e608391 - person davideagostini; 11.11.2020