admin管理员组

文章数量:1432573

I want to read messages published to a kafka topic (in this case first_topic) and feed those messages into a neural network with tensorflow. To achieve this I am using the tfio.experimental.streaming.KafkaBatchIODataset module with the following configuration:


online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["first_topic"],
    group_id="",
    servers="kafka:9092",
    stream_timeout=30000, 
    message_poll_timeout=1000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

When testing the configuration I always recieve the following command output in python:

tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
...

Only at the end, when the timeout period is reached, the messages sent to kafka are processed.

What I want to do is process all messages from the kafka topic until no message is sent for 30s similar to this article: . At the moment I recieve the time out for about 30s and then all messages are processed and the script exits.

My kafka broker runs as a docker container with this configuration:

services:
  kafka:
    image: 'bitnami/kafka:latest'
    container_name: kafka
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_CONTROLLER_BROKER_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - '9092:9092'
      - '9093:9093'

What am I doing wrong?

本文标签: pythonLocal Timed out when using tfioexperimentalstreamingKafkaBatchIODatasetStack Overflow