2 Pregunta: Publicación de datos WebSocketClient reactivos

pregunta creada en Thu, Mar 28, 2019 12:00 AM

Estoy tratando de leer datos de websocket utilizando ReactorNettyWebSocketClient pero no puedo conectarme a su api. El problema es que todos los datos que recibo están disponibles en la parte interna de websockethandler (# 1) al estilo lambda, pero quiero que estén disponibles para los suscriptores después de suscribirse a client.execute (..) (# 2)

WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(
                URI.create(URL),
                session -> session.send(
                        Mono.just(session.textMessage(pairRqStr)))
                        .thenMany(session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .map(this::toResp)
                                .onErrorContinue((throwable, o) -> throwable.getMessage())
                        )
                        .log() // #1
                        .then()
        )
                .log()
                .subscribe(System.out::println); // #2

un poco perdido y nuevo en esto, así que, por favor, guíame.

    
0
2 Respuestas                              2                         

Para enviar o recibir mensajes, primero debe conectarse al canal. Es por eso que client.execute devuelve Mono<Void>, lo que significa que no devuelve datos, solo indica la finalización o falla de handskahe. Si volviera por ejemplo. Flux<WebSocketMessage>, ¿cómo sabrías si el batido de manos se completó con éxito?

Si desea acceder al canal fuera de lambda, no puede hacerlo implementando el método del controlador:

        Consumer<WebSocketMessage> printingConsumer = webSocketMessage -> System.out.println(webSocketMessage.getPayloadAsText());

        client.execute(URI.create(URL), session -> handle(session, printingConsumer));
    }


    public Mono<Void> handle(WebSocketSession session, Consumer<WebSocketMessage> consumer) {
        return session.receive()
                .doOnNext(consumer::accept)
                .then();
    }
    
0
2019-03-30 11: 01: 21Z
  1. bien si handshake no se completara bien, lanzaría el mensaje (siendo websocketclient) y lo dirigiría a algunos .onError ** (..)
    2019-03-31 21: 19: 20Z

esto es lo que hice finalmente, pero todavía no me gusta mucho:

    private Flux<WebSocketMessage> requestData(String req) {
        WebSocketClient client = new ReactorNettyWebSocketClient();
        return ConnectableFlux.create(sub -> {
            client.execute(
                    URI.create(URL),
                    session -> session.send(
                            Mono.just(session.textMessage(req)))
                            .thenMany(session.receive().doOnNext(sub::next))
                            .then()
            )
                    .log()
                    .subscribe();
        });
    }
    
0
2019-03-31 21: 19: 57Z
fuente colocada aquí