Управление потоком чтения Vertx NetServer

Я пытаюсь имитировать TCP-сервер для тестов с Vertx на основе существующей инфраструктуры, с которой мне приходится работать.

Сервер, который я имитирую, работает полностью асинхронно и знает длину входящего буфера на основе предварительного заголовка в буфере, который указывает длину запроса.

Мне нужно прочитать первые 6 символов входящего запроса на каждом клиентском сокете, который подключается к моему фиктивному TCP-серверу. из этого предварительного заголовка я читаю фактическую длину запроса (например, для xx3018, я знаю, что полная длина запроса составляет 3018).

Затем мне нужно прочитать оставшуюся часть буфера в соответствии с длиной, сопоставить ее с картой ответов и вернуть правильный ответ на запрос.

Пример рабочего макета сервера с простой java (быстрая реализация, поэтому другие разработки не будут заблокированы :))

public void run(String... args) throws Exception {
    log.info("Starting TCP Server");

    ServerSocket serverSocket = new ServerSocket(1750);

    while (true) {
        try {
            Socket socket = serverSocket.accept();

            CompletableFuture.runAsync(() -> {
                Exception e = null;
                while (e == null) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        OutputStream outputStream = socket.getOutputStream();

                        byte[] preHeader = new byte[6];
                        inputStream.read(preHeader);

                        String preHeaderValue = new String(preHeader);
                        log.info("Pre header: {}", preHeaderValue);

                        int length = Integer.valueOf(preHeaderValue.substring(2));
                        log.info("Request full length: {}", length);
                        byte[] request = new byte[length - 6];

                        inputStream.read(request);

                        String requestValue = new String(request);

                        log.info("Request: {}", requestValue);

                        String response = this.requestResponseProvider.getResponse(preHeaderValue + requestValue);
                        log.info("Response: {}", response);
                        outputStream.write(response.getBytes());
                    } catch (Exception ex) {
                        log.error("Encountered a problem: {}", e.getMessage());
                        e = ex;
                    }
                }
            });
        } catch (Exception e) {
            log.error("Encountered a problem: {}", e.getMessage());
        }
    }
}

Кажется, я не могу найти способ управлять входным потоком так же, как я управляю им с помощью простого java.


person Tom    schedule 02.07.2018    source источник


Ответы (1)


После того, как я очень долго не обращал внимания на эту проблему, я решил немного поиграть с ней.

Я вспомнил, что использовал следующий модуль для другого проекта: https://github.com/vert-x3/vertx-tcp-eventbus-bridge.

Я также вспомнил, что во внутреннем протоколе моста tcp он добавляет длину полезной нагрузки в буфер, который отправляется через мост tcp, я просмотрел исходный код, чтобы узнать, как он обрабатывает фрагменты (также известные как кадры)

Я нашел следующее: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java, который делает именно то, что я хотел достичь :)

Я немного изменил его, преобразовал в Kotlin и сделал так, чтобы я мог контролировать размер заголовка и способ извлечения длины полезной нагрузки.

Ниже приводится грубый, быстрый и грязный пример управления потоком чтения с помощью Vert.x NetServer:

suspend fun main() {
  val vertx = Vertx.vertx()
  initServer(vertx)
  initClient(vertx)
}

suspend fun initServer(vertx: Vertx) {
  val server = vertx.createNetServer(netServerOptionsOf(port = 8888, host = "localhost"))

  server
    .connectHandler { socket ->
      val parser = FrameParser(
        headerSize = 4,
        headerHandler = {
          it.getInt(0)
        },
        handler = {
          println(it.toString())
          println("---")
        }
      )
      socket.handler(parser)

      socket.exceptionHandler {
        it.printStackTrace()
        socket.close()
      }
    }
    .listenAwait()
}

suspend fun initClient(vertx: Vertx) {
  val client = vertx.createNetClient()
  val socket = client.connectAwait(port = 8888, host = "localhost")

  val message = "START|${"foobarfoobar".repeat(100)}|END"
  val length = message.length
  repeat(5) {
    repeat(100) {
      vertx.setPeriodic(10) {
        socket.write(
          Buffer.buffer()
            .appendInt(length)
            .appendString(message)
        )
      }
    }
    delay(1000)
  }
}

/**
 * Based on: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
 */
class FrameParser(
  private val headerSize: Int,
  private val headerHandler: (Buffer) -> Int,
  private val handler: (Buffer) -> Unit
) : Handler<Buffer?> {

  private var _buffer: Buffer? = null
  private var _offset = 0

  override fun handle(buffer: Buffer?) {
    append(buffer)
    var offset: Int
    while (true) {
      // set a rewind point. if a failure occurs,
      // wait for the next handle()/append() and try again
      offset = _offset

      // how many bytes are in the buffer
      val remainingBytes = bytesRemaining()

      // at least expected header size
      if (remainingBytes < headerSize) {
        break
      }

      // what is the length of the message
      val length: Int = headerHandler(_buffer!!.getBuffer(_offset, _offset + headerSize))
      _offset += headerSize
      if (remainingBytes - headerSize >= length) {
        // we have a complete message
        handler(_buffer!!.getBuffer(_offset, _offset + length))
        _offset += length
      } else {
        // not enough data: rewind, and wait
        // for the next packet to appear
        _offset = offset
        break
      }
    }
  }

  private fun append(newBuffer: Buffer?) {
    if (newBuffer == null) {
      return
    }

    // first run
    if (_buffer == null) {
      _buffer = newBuffer
      return
    }

    // out of data
    if (_offset >= _buffer!!.length()) {
      _buffer = newBuffer
      _offset = 0
      return
    }

    // very large packet
    if (_offset > 0) {
      _buffer = _buffer!!.getBuffer(_offset, _buffer!!.length())
    }
    _buffer!!.appendBuffer(newBuffer)
    _offset = 0
  }

  private fun bytesRemaining(): Int {
    return if (_buffer!!.length() - _offset < 0) {
      0
    } else {
      _buffer!!.length() - _offset
    }
  }
}
person Tom    schedule 16.07.2020