admin管理员组

文章数量:1435859

I'm trying to run this example from Google on my local machine. I'm using PubSub emulator and Beam 2.60.0, executed with --runner=DirectRunner.

...
options.setStreaming(true);
options.setPubsubRootUrl("http://localhost:8085");
int numShards = 1;

final Pipeline pipeline = Pipeline.create(options);
pipeline.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
.apply("Log PubSub message", ParDo.of(new LogStringFn()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Log windowed message", ParDo.of(new LogStringFn()))
.apply("Write Files", new WriteOneFilePerWindow("outputs/out", numShards));

pipeline.run().waitUntilFinish();

With the logging steps, I can confirm that the messages I publish to PubSub are being consumed by the pipeline. However, the last step of writing the payloads to a windowed file never happens, and the pipeline doesn't ack the messages from PubSub as it should. I've tried adding logs to WriteOneFilePerWindow - none of the methods in PerWindowFiles FilenamePolicy are ever called.

The other windowing example I've tried (Apache's WindowedWordCount example) is working fine, but I need it for the streaming case for my eventual use case.

An additional note is that my PubSub emulator is getting hammered at 100% CPU while the pipeline is running. Is this normal?

What could be the issue here - why is the WriteOneFilePerWindow step never getting executed? And how do I debug issues like this in the future?

I'm trying to run this example from Google on my local machine. I'm using PubSub emulator and Beam 2.60.0, executed with --runner=DirectRunner.

...
options.setStreaming(true);
options.setPubsubRootUrl("http://localhost:8085");
int numShards = 1;

final Pipeline pipeline = Pipeline.create(options);
pipeline.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
.apply("Log PubSub message", ParDo.of(new LogStringFn()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Log windowed message", ParDo.of(new LogStringFn()))
.apply("Write Files", new WriteOneFilePerWindow("outputs/out", numShards));

pipeline.run().waitUntilFinish();

With the logging steps, I can confirm that the messages I publish to PubSub are being consumed by the pipeline. However, the last step of writing the payloads to a windowed file never happens, and the pipeline doesn't ack the messages from PubSub as it should. I've tried adding logs to WriteOneFilePerWindow - none of the methods in PerWindowFiles FilenamePolicy are ever called.

The other windowing example I've tried (Apache's WindowedWordCount example) is working fine, but I need it for the streaming case for my eventual use case.

An additional note is that my PubSub emulator is getting hammered at 100% CPU while the pipeline is running. Is this normal?

What could be the issue here - why is the WriteOneFilePerWindow step never getting executed? And how do I debug issues like this in the future?

Share Improve this question asked Nov 17, 2024 at 4:18 avanzalavanzal 1631 silver badge6 bronze badges 1
  • What happens if you use a different trigger? – CWrecker Commented Dec 19, 2024 at 21:26
Add a comment  | 

1 Answer 1

Reset to default 0

Its hard to say for sure, but there is a good chance that this is due to how the pubsub emulator works; the emulator is not fully featured and its possible that Beam is not able to get correct watermark information [1] from the emulator. Could you try this with a normal pubsub topic?

[1] https://cloud.google/dataflow/docs/concepts/streaming-pipelines#watermarks

本文标签: google cloud dataflowBeam streaming pipeline not writing windowed filesStack Overflow