0 Вопрос: Включение ровно один раз приводит к отключению потоков из-за истечения времени ожидания при инициализации состояния транзакции

вопрос создан в Thu, May 2, 2019 12:00 AM

Я написал простой пример для проверки функциональности соединения. Поскольку я иногда получаю сообщения, дублирующиеся в получающейся теме, а иногда и пропускаю сообщения в этой теме, я подумал, точно определяя проблему, чтобы включить ровно семантику. Однако, делая это через:

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

Я получаю тайм-аут, из-за которого потоки kafka закрываются в моем приложении:

2019-05-02 17:02:32.585  INFO 153056 --- [-StreamThread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-05-02 17:02:32.585  INFO 153056 --- [-StreamThread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-05-02 17:02:32.593  INFO 153056 --- [-StreamThread-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1-0_0-producer, transactionalId=join-test-0_0] ProducerId set to -1 with epoch -1
2019-05-02 17:03:32.599 ERROR 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}

org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

2019-05-02 17:03:32.599  INFO 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] partition assignment took 60044 ms.
    current active tasks: []
    current standby tasks: []
    previous active tasks: []

2019-05-02 17:03:32.601  INFO 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2019-05-02 17:03:32.601  INFO 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] Shutting down
2019-05-02 17:03:32.615  INFO 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-05-02 17:03:32.615  INFO 153056 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72] State transition from REBALANCING to ERROR
2019-05-02 17:03:32.615  WARN 153056 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72] All stream threads have died. The instance will be in error state and should be closed.
2019-05-02 17:03:32.615  INFO 153056 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] Shutdown complete
Exception in thread "join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [join-test-90a0aa93-dfd8-4d4f-894b-85a3c5634f72-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:870)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
    static String ORIGINAL = "original-sensor-data";
    static String ERROR = "error-score";

    public static void main(String[] args) throws IOException {

        SpringApplication.run(JoinTest.class, args);
        Properties props = getProperties();


        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, OriginalSensorData> original = builder.stream(ORIGINAL, Consumed.with(Serdes.String(), new OriginalSensorDataSerde()));
        final KStream<String, ErrorScore> error = builder.stream(ERROR, Consumed.with(Serdes.String(), new ErrorScoreSerde()));



        KStream<String, ErrorScore> result = original.join(
                error,
                (originalValue, errorValue) -> new ErrorScore(new Date(originalValue.getTimestamp()), errorValue.getE(),
                        originalValue.getData().get("TE700PV").doubleValue(), errorValue.getT(), errorValue.getR()),
                // KStream-KStream joins are always windowed joins, hence we must provide a join window.
                JoinWindows.of(Duration.ofMillis(3000).toMillis()),

                Joined.with(
                        Serdes.String(), /* key */
                        new OriginalSensorDataSerde(), /* left value */
                        new ErrorScoreSerde() /* right value */
                )
        ).through("atl-joined-data-repartition", Produced.with(Serdes.String(), new ErrorScoreSerde()));

        result.foreach((key, value) -> System.out.println("Join Stream: " + key + " " + value));
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

    private static Properties getProperties() {
        Properties props = new Properties();

        //Url of the kafka broker, this can also be found in the Aiven console
        props.put("bootstrap.servers", "localhost:9095");
        props.put("group.id", "join-test");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("application.id", "join-test");
        props.put("default.timestamp.extractor", "com.my.SensorDataTimestampExtractor");

        //The key of a message is a string
        props.put("key.deserializer",
                StringDeserializer.class.getName());
        props.put("value.deserializer",
                StringDeserializer.class.getName());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        return props;
    }

Я ожидаю, что приложение запустится без тайм-аута и продолжит работать

    
1
  1. Какой у вас вопрос? Может быть, вы просто должны увеличить время ожидания?
    2019-05-08 10: 37: 20Z
  2. хорошо, мой вопрос: почему включение этого единственного свойства нарушает не только мое приложение, но и примеры потока kafka без какого-либо моего кода в нем? Согласно документации это должно быть так же просто, как включение этого свойства.
    2019-05-08 11: 32: 58Z
  3. Кажется, есть проблема конфигурации. Не уверен, атм. Вы проверили журналы (на стороне клиента и брокера) для получения более подробной информации, почему время ожидания истекло? - Кроме того, вы упомянули «пример приложения» - для EOS по умолчанию требуется 3 брокера: cf docs.confluent.io/current/streams/developer-guide/…
    2019-05-09 14: 04: 22Z
  4. ofcourse, 3 брокера, это проверит. это может быть
    2019-05-10 09: 21: 22Z
0 ответов                              0                         
источник размещен Вот