admin管理员组文章数量:1432573
I want to read messages published to a kafka topic (in this case first_topic
) and feed those messages into a neural network with tensorflow. To achieve this I am using the tfio.experimental.streaming.KafkaBatchIODataset
module with the following configuration:
online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["first_topic"],
group_id="",
servers="kafka:9092",
stream_timeout=30000,
message_poll_timeout=1000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
When testing the configuration I always recieve the following command output in python:
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
...
Only at the end, when the timeout period is reached, the messages sent to kafka are processed.
What I want to do is process all messages from the kafka topic until no message is sent for 30s similar to this article: . At the moment I recieve the time out for about 30s and then all messages are processed and the script exits.
My kafka broker runs as a docker container with this configuration:
services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_CONTROLLER_BROKER_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9093:9093'
What am I doing wrong?
本文标签: pythonLocal Timed out when using tfioexperimentalstreamingKafkaBatchIODatasetStack Overflow
版权声明:本文标题:python - Local: Timed out when using tfio.experimental.streaming.KafkaBatchIODataset - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745577599a2664426.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论