Издатель реактивных потоков из тела ответа веб-клиента Vertx

Я пытаюсь написать оболочку для Vertx веб-клиента для загрузки тело ответа от сервера, использующего Publisher из реактивных потоков:

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;

interface Storage {
  Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
  private final WebClient client;

  public WebStorage(final WebClient client) {
    this.client = client;
  }

  @Override
  public Publisher<ByteBuffer> laod(final String key) {
    return client.get(String.format("https://myhost/path?query=%s", key))
      .rxSend()
      .toFlowable()
      .map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
  }
}

Это решение неверно, поскольку оно читает все байты тела блокирующим образом с вызовом getBytes().

Можно ли прочитать ответ Vertx WebClient по кускам и преобразовать его в Publisher (или Rx Flowable)?


person Kirill    schedule 26.03.2020    source источник


Ответы (2)


Веб-клиент Vert.x не предназначен для потоковой передачи тела ответа. Он буферизует контент по дизайну.

Если вы хотите транслировать контент, вы можете использовать базовый HTTP-клиент, который является более гибким.

person tsegismont    schedule 26.03.2020

Думаю, вы можете использовать ByteCodec.pipe:

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

interface Storage {
    Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
    private final Vertx vertx = Vertx.vertx();
    private final WebClient client;

    public WebStorage(final WebClient client) {
        this.client = client;
    }

    @Override
    public Publisher<ByteBuffer> load(final String key) {
        final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
        client.get(String.format("https://myhost/path?query=%s", key))
            .as(BodyCodec.pipe(WriteStream.newInstance(stream)))
            .rxSend().subscribe();
        return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
    }
}
person Sammers    schedule 26.03.2020