1 Вопрос: Как разрешить только одно соединение (URL / порт) для чтения и записи из приложения Flink

вопрос создан в Thu, May 2, 2019 12:00 AM

Я читаю с URL /порта, выполняю некоторую обработку и записываю обратно на URL /порт. Url /Port допускает только одно соединение (из которого вам нужно читать и писать при необходимости).

Flink может читать и писать в порт rl, но открывает 2 соединения.

Я использовал основное соединение и через URL /порт через flink

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data_stream = env.socketTextStream(url, port, socket_stream_deliminator, socket_connection_retries)
                         .map(x => printInput(x))
                          .writeToSocket(url, port, new SimpleStringSchema())
                         //.addSink(new SocketClientSink[String](url, port.toInt, new SimpleStringSchema))

    // execute program
    env.execute("Flink Streaming Scala API Skeleton")

Идеальное или единственное решение для моего случая - это читать и писать с одного и того же соединения, а не создавать 2 отдельных соединения

Как бы я поступил так?

    
0
  1. Вам понадобится глобальная (статическая) переменная JVM для этого соединения, и вы будете использовать свой собственный источник и приемник, используя это соединение
    2019-05-02 15: 15: 02Z
  2. Спасибо за ваш ответ, не могли бы вы предоставить пример кода о том, как реализовать эту концепцию.
    2019-05-02 15: 26: 29Z
  3. Возможно, было бы проще решить эту проблему за пределами Flink с помощью приложения, которое открывает одно ч /б соединение с внешней службой и перенаправляет это соединение через два отдельных сокета. к источнику и стоку.
    2019-05-02 20: 54: 35Z
1 ответ                              1                         

Как я сказал в комментарии, вы должны хранить ваше соединение в некоторой статической переменной, потому что ваши Sources- и Sinks не будут использовать одно и то же соединение в противном случае. Вы также должны убедиться, что ваш Source и Sink работают на одной и той же JVM, используя один и тот же Classloader, иначе у вас все равно будет более одного соединения.

Я создал этот класс-обертку, который содержит необработанное Socket-Connection и экземпляр Reader /Writer для этого подключения. Поскольку ваш источник всегда будет останавливаться перед вашим Sink (так работает Flink), этот класс также восстанавливает соединение, если он был закрыт раньше.

package example;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

public class SocketConnection implements Closeable {

    private final String host;
    private final int port;
    private final Object lock;
    private volatile Socket socket;
    private volatile BufferedReader reader;
    private volatile PrintStream writer;

    public SocketConnection(String host, int port) {
        this.host = host;
        this.port = port;
        this.lock = new Object();
        this.socket = null;
        this.reader = null;
        this.writer = null;
    }

    private void connect() throws IOException {
        this.socket = new Socket(this.host, this.port);
        this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
        this.writer = new PrintStream(this.socket.getOutputStream());
    }

    private void ensureConnected() throws IOException {
        // only acquire lock if null
        if (this.socket == null) {
            synchronized (this.lock) {
                // recheck if socket is still null
                if (this.socket == null) {
                    connect();
                }
            }
        }
    }

    public BufferedReader getReader() throws IOException {
        ensureConnected();
        return this.reader;
    }

    public PrintStream getWriter() throws IOException {
        ensureConnected();
        return this.writer;
    }

    @Override
    public void close() throws IOException {
        if (this.socket != null) {
            synchronized (this.lock) {
                if (this.socket != null) {
                    this.reader.close();
                    this.reader = null;

                    this.writer.close();
                    this.writer = null;

                    this.socket.close();
                    this.socket = null;
                }
            }
        }
    }
}

Ваш основной класс (или любой другой класс) содержит один экземпляр этого класса, к которому затем обращается как ваш источник, так и ваш приемник:

package example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {

    public static final SocketConnection CONNECTION = new SocketConnection("your-host", 12345);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamSource())
              .addSink(new SocketTextStreamSink());

        env.execute("Flink Streaming Scala API Skeleton");
    }
}

Ваша SourceFunction может выглядеть примерно так:

package example;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SocketTextStreamSource implements SourceFunction<String> {

    private volatile boolean running;

    public SocketTextStreamSource() {
        this.running = true;
    }

    @Override
    public void run(SourceContext<String> context) throws Exception {
        try (SocketConnection conn = Main.CONNECTION) {
            String line;

            while (this.running && (line = conn.getReader().readLine()) != null) {
                context.collect(line);
            }
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

И ваша SinkFunction:

package example;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class SocketTextStreamSink extends RichSinkFunction<String> {

    private transient SocketConnection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.connection = Main.CONNECTION;
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        this.connection.getWriter().println(value);
        this.connection.getWriter().flush();
    }

    @Override
    public void close() throws Exception {
        this.connection.close();
    }
}

Обратите внимание, что я всегда использую getReader() и getWriter(), потому что основной сокет может быть закрыт в это время.

    
1
2019-05-02 16: 46: 39Z
  1. Я думаю, что это идеальный путь вперед. Спасибо, что нашли время, чтобы помочь с этой проблемой. Попробую @ codeflush.dev
    2019-05-03 07: 47: 31Z
источник размещен Вот