HomeSample Page

Sample Page Title


On this tutorial, we reveal construct a unified Apache Beam pipeline that works seamlessly in each batch and stream-like modes utilizing the DirectRunner. We generate artificial, event-time–conscious information and apply mounted windowing with triggers and allowed lateness to reveal how Apache Beam persistently handles each on-time and late occasions. By switching solely the enter supply, we preserve the core aggregation logic equivalent, which helps us clearly perceive how Beam’s event-time mannequin, home windows, and panes behave with out counting on exterior streaming infrastructure. Try the FULL CODES right here.

!pip -q set up -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q set up -U apache-beam crcmod


import apache_beam as beam
from apache_beam.choices.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.set off import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We set up the required dependencies and guarantee model compatibility in order that Apache Beam. We import the core Beam APIs together with windowing, triggers, and TestStream utilities wanted later within the pipeline. We additionally usher in customary Python modules for time dealing with and JSON formatting. Try the FULL CODES right here.

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, quantity, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "quantity": float(quantity), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).exchange(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

We outline the worldwide configuration that controls window dimension, lateness, and execution mode. We create artificial occasions with specific event-time timestamps in order that windowing conduct is deterministic and simple to cause about. We put together a small dataset that deliberately contains out-of-order and late occasions to look at Beam’s event-time semantics. Try the FULL CODES right here.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "depend": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def broaden(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           set off=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Depend.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"depend": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We construct a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply mounted home windows, triggers, and accumulation guidelines, then group occasions by person and compute counts and sums. We preserve this remodel unbiased of the info supply, so the identical logic applies to each batch and streaming inputs. Try the FULL CODES right here.

class AddWindowInfo(beam.DoFn):
   def course of(self, aspect, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.begin)
       we = float(window.finish)
       yield {
           **aspect,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ])
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ])
       .advance_processing_time(5)
       .add_elements([
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ])
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich every aggregated report with window and pane metadata so we are able to clearly see when and why outcomes are emitted. We convert Beam’s inner timestamps into human-readable UTC instances for readability. We additionally outline a TestStream that simulates actual streaming conduct utilizing watermarks, processing-time advances, and late information. Try the FULL CODES right here.

def run_batch():
   with beam.Pipeline(choices=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(choices=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We wire every little thing collectively into executable batch and stream-like pipelines. We toggle between modes by altering a single flag whereas reusing the identical aggregation remodel. We run the pipeline and print the windowed outcomes straight, making the execution movement and outputs straightforward to examine.

In conclusion, we demonstrated that the identical Beam pipeline can course of each bounded batch information and unbounded, stream-like information whereas preserving equivalent windowing and aggregation semantics. We noticed how watermarks, triggers, and accumulation modes affect when outcomes are emitted and the way late information updates beforehand computed home windows. Additionally, we targeted on the conceptual foundations of Beam’s unified mannequin, offering a stable base for later scaling the identical design to actual streaming runners and manufacturing environments.


Try the FULL CODES right here. Additionally, be at liberty to observe us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you possibly can be a part of us on telegram as properly.

Try our newest launch of ai2025.dev, a 2025-focused analytics platform that turns mannequin launches, benchmarks, and ecosystem exercise right into a structured dataset you possibly can filter, examine, and export


Michal Sutter is a knowledge science skilled with a Grasp of Science in Information Science from the College of Padova. With a stable basis in statistical evaluation, machine studying, and information engineering, Michal excels at remodeling advanced datasets into actionable insights.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles