Как использовать Akka-HTTP client websocket send message

Я пытаюсь использовать веб-сокет на стороне клиента, следуя документу webSocketClientFlow.

пример кода:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

после обновления соединения, как использовать connection отправить сообщение на стороне сервера websocket?

Я заметил из документа:

Поток, возвращаемый этим методом, может быть материализован только один раз. Для каждого запроса необходимо получить новый поток путем повторного вызова метода.

все еще не понимаю, зачем нам нужно строить поток много раз, так как обновленное соединение уже готово.


person xring    schedule 31.10.2016    source источник
comment
Извините, не совсем понятно, о чем вы спрашиваете. Вы отправляете сообщения через соединение через веб-сокеты, помещая сообщения в соответствующий поток; в вашем конкретном случае на сервер будет отправлено только одно сообщение (TextMessage("hello world!")), а исходящий поток будет закрыт после этого. Входящий поток (созданный из Sink.foreach) будет продолжать получать сообщения, пока сервер не закроет поток или пока не истечет время ожидания приема. Вы должны создать соответствующий Source, возможно, основанный на акторах, если вы хотите контролировать то, что вы отправляете. более сложный способ.   -  person Vladimir Matveev    schedule 31.10.2016
comment
@VladimirMatveev благодарит за напоминание. Я думаю, что неправильно понял, как работает akka-http, собираясь читать документацию подробно. Большое спасибо!   -  person xring    schedule 01.11.2016


Ответы (1)


Вы можете создать источник на основе актора и отправлять новые сообщения через установленное соединение с веб-сокетом.

    val req = WebSocketRequest(uri = "ws://127.0.0.1/ws")
    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] = 
         Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
        Flow[Message]
            .map(message => println(s"Received text message: [$message]"))
            .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
        messageSource
            .viaMat(webSocketFlow)(Keep.both)
            .toMat(messageSink)(Keep.both)
            .run()

    val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
            Future.successful(Done)
        } else {
            throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
    }

    ws ! TextMessage.Strict("Hello World")
    ws ! TextMessage.Strict("Hi")
    ws ! TextMessage.Strict("Yay!")

`

person dimart.sp    schedule 17.06.2017
comment
это мне очень помогло, не могли бы вы изучить мой вопрос, связанный с вашим кодом, спасибо, это будет большим подспорьем, вот ссылка stackoverflow.com/questions/63505847/ - person swaheed; 20.08.2020