HomeSample Page

Sample Page Title


On this tutorial, we construct a hands-on comparability between a synchronous RPC-based system and an asynchronous event-driven structure to grasp how actual distributed programs behave underneath load and failure. We simulate downstream providers with variable latency, overload situations, and transient errors, after which drive each architectures utilizing bursty site visitors patterns. By observing metrics similar to tail latency, retries, failures, and dead-letter queues, we look at how tight RPC coupling amplifies failures and the way asynchronous event-driven designs commerce rapid consistency for resilience. All through the tutorial, we deal with sensible mechanisms, retries, exponential backoff, circuit breakers, bulkheads, and queues that engineers use to regulate cascading failures in manufacturing programs. Take a look at the FULL CODES right here.

import asyncio, random, time, math, statistics
from dataclasses import dataclass, discipline
from collections import deque


def now_ms():
   return time.perf_counter() * 1000.0


def pctl(xs, p):
   if not xs:
       return None
   xs2 = sorted(xs)
   okay = (len(xs2) - 1) * p
   f = math.ground(okay)
   c = math.ceil(okay)
   if f == c:
       return xs2[int(k)]
   return xs2[f] + (xs2[c] - xs2[f]) * (okay - f)


@dataclass
class Stats:
   latencies_ms: record = discipline(default_factory=record)
   okay: int = 0
   fail: int = 0
   dropped: int = 0
   retries: int = 0
   timeouts: int = 0
   cb_open: int = 0
   dlq: int = 0


   def abstract(self, title):
       l = self.latencies_ms
       return {
           "title": title,
           "okay": self.okay,
           "fail": self.fail,
           "dropped": self.dropped,
           "retries": self.retries,
           "timeouts": self.timeouts,
           "cb_open": self.cb_open,
           "dlq": self.dlq,
           "lat_p50_ms": spherical(pctl(l, 0.50), 2) if l else None,
           "lat_p95_ms": spherical(pctl(l, 0.95), 2) if l else None,
           "lat_p99_ms": spherical(pctl(l, 0.99), 2) if l else None,
           "lat_mean_ms": spherical(statistics.imply(l), 2) if l else None,
       }

We outline the core utilities and information constructions used all through the tutorial. We set up timing helpers, percentile calculations, and a unified metrics container to trace latency, retries, failures, and tail habits. It provides us a constant strategy to measure and evaluate RPC and event-driven executions. Take a look at the FULL CODES right here.

@dataclass
class FailureModel:
   base_latency_ms: float = 8.0
   jitter_ms: float = 6.0
   fail_prob: float = 0.05
   overload_fail_prob: float = 0.40
   overload_latency_ms: float = 50.0


   def pattern(self, load_factor: float):
       base = self.base_latency_ms + random.random() * self.jitter_ms
       if load_factor > 1.0:
           base += (load_factor - 1.0) * self.overload_latency_ms
           fail_p = min(0.95, self.fail_prob + (load_factor - 1.0) * self.overload_fail_prob)
       else:
           fail_p = self.fail_prob
       return base, (random.random() < fail_p)


class CircuitBreaker:
   def __init__(self, fail_threshold=8, window=20, open_ms=500):
       self.fail_threshold = fail_threshold
       self.window = window
       self.open_ms = open_ms
       self.occasions = deque(maxlen=window)
       self.open_until_ms = 0.0


   def permit(self):
       return now_ms() >= self.open_until_ms


   def file(self, okay: bool):
       self.occasions.append(not okay)
       if len(self.occasions) >= self.window and sum(self.occasions) >= self.fail_threshold:
           self.open_until_ms = now_ms() + self.open_ms


class Bulkhead:
   def __init__(self, restrict):
       self.sem = asyncio.Semaphore(restrict)


   async def __aenter__(self):
       await self.sem.purchase()


   async def __aexit__(self, exc_type, exc, tb):
       self.sem.launch()


def exp_backoff(try, base_ms=20, cap_ms=400):
   return random.random() * min(cap_ms, base_ms * (2 ** (try - 1)))

We mannequin failure habits and resilience primitives that form system stability. We simulate overload-sensitive latency and failures, and we introduce circuit breakers, bulkheads, and exponential backoff to regulate cascading results. These parts allow us to experiment with secure versus unsafe distributed-system configurations. Take a look at the FULL CODES right here.

class DownstreamService:
   def __init__(self, fm: FailureModel, capacity_rps=250):
       self.fm = fm
       self.capacity_rps = capacity_rps
       self._inflight = 0


   async def deal with(self, payload: dict):
       self._inflight += 1
       attempt:
           load_factor = max(0.5, self._inflight / (self.capacity_rps / 10))
           lat, should_fail = self.fm.pattern(load_factor)
           await asyncio.sleep(lat / 1000.0)
           if should_fail:
               increase RuntimeError("downstream_error")
           return {"standing": "okay"}
       lastly:
           self._inflight -= 1


async def rpc_call(
   svc,
   req,
   stats,
   timeout_ms=120,
   max_retries=0,
   cb=None,
   bulkhead=None,
):
   t0 = now_ms()
   if cb and never cb.permit():
       stats.cb_open += 1
       stats.fail += 1
       return False


   try = 0
   whereas True:
       try += 1
       attempt:
           if bulkhead:
               async with bulkhead:
                   await asyncio.wait_for(svc.deal with(req), timeout=timeout_ms / 1000.0)
           else:
               await asyncio.wait_for(svc.deal with(req), timeout=timeout_ms / 1000.0)
           stats.latencies_ms.append(now_ms() - t0)
           stats.okay += 1
           if cb: cb.file(True)
           return True
       besides asyncio.TimeoutError:
           stats.timeouts += 1
       besides Exception:
           cross
       stats.fail += 1
       if cb: cb.file(False)
       if try <= max_retries:
           stats.retries += 1
           await asyncio.sleep(exp_backoff(try) / 1000.0)
           proceed
       return False

We implement the synchronous RPC path and its interplay with downstream providers. We observe how timeouts, retries, and in-flight load straight have an effect on latency and failure propagation. It additionally highlights how tight coupling in RPC can amplify transient points underneath bursty site visitors. Take a look at the FULL CODES right here.

@dataclass
class Occasion:
   id: int
   tries: int = 0


class EventBus:
   def __init__(self, max_queue=5000):
       self.q = asyncio.Queue(maxsize=max_queue)


   async def publish(self, e: Occasion):
       attempt:
           self.q.put_nowait(e)
           return True
       besides asyncio.QueueFull:
           return False


async def event_consumer(
   bus,
   svc,
   stats,
   cease,
   max_retries=0,
   dlq=None,
   bulkhead=None,
   timeout_ms=200,
):
   whereas not cease.is_set() or not bus.q.empty():
       attempt:
           e = await asyncio.wait_for(bus.q.get(), timeout=0.2)
       besides asyncio.TimeoutError:
           proceed


       t0 = now_ms()
       e.tries += 1
       attempt:
           if bulkhead:
               async with bulkhead:
                   await asyncio.wait_for(svc.deal with({"id": e.id}), timeout=timeout_ms / 1000.0)
           else:
               await asyncio.wait_for(svc.deal with({"id": e.id}), timeout=timeout_ms / 1000.0)
           stats.okay += 1
           stats.latencies_ms.append(now_ms() - t0)
       besides Exception:
           stats.fail += 1
           if e.tries <= max_retries:
               stats.retries += 1
               await asyncio.sleep(exp_backoff(e.tries) / 1000.0)
               await bus.publish(e)
           else:
               stats.dlq += 1
               if dlq is just not None:
                   dlq.append(e)
       lastly:
           bus.q.task_done()

We construct the asynchronous event-driven pipeline utilizing a queue and background customers. We course of occasions independently of request submission, apply retry logic, and route unrecoverable messages to a dead-letter queue. It demonstrates how decoupling improves resilience whereas introducing new operational concerns. Take a look at the FULL CODES right here.

async def generate_requests(complete=2000, burst=350, gap_ms=80):
   reqs = []
   rid = 0
   whereas rid < complete:
       n = min(burst, complete - rid)
       for _ in vary(n):
           reqs.append(rid)
           rid += 1
       await asyncio.sleep(gap_ms / 1000.0)
   return reqs


async def foremost():
   random.seed(7)
   fm = FailureModel()
   svc = DownstreamService(fm)
   ids = await generate_requests()


   rpc_stats = Stats()
   cb = CircuitBreaker()
   bulk = Bulkhead(40)


   await asyncio.collect(*[
       rpc_call(svc, {"id": i}, rpc_stats, max_retries=3, cb=cb, bulkhead=bulk)
       for i in ids
   ])


   bus = EventBus()
   ev_stats = Stats()
   cease = asyncio.Occasion()
   dlq = []


   customers = [
       asyncio.create_task(event_consumer(bus, svc, ev_stats, stop, max_retries=3, dlq=dlq))
       for _ in range(16)
   ]


   for i in ids:
       await bus.publish(Occasion(i))


   await bus.q.be part of()
   cease.set()
   for c in customers:
       c.cancel()


   print(rpc_stats.abstract("RPC"))
   print(ev_stats.abstract("EventDriven"))
   print("DLQ dimension:", len(dlq))


await foremost()

We drive each architectures with bursty workloads and orchestrate the complete experiment. We acquire metrics, cleanly terminate customers, and evaluate outcomes throughout RPC and event-driven executions. The ultimate step ties collectively latency, throughput, and failure habits right into a coherent system-level comparability.

In conclusion, we clearly noticed the trade-offs between RPC and event-driven architectures in distributed programs. We noticed that RPC presents decrease latency when dependencies are wholesome however turns into fragile underneath saturation, the place retries and timeouts rapidly cascade into system-wide failures. In distinction, the event-driven method decouples producers from customers, absorbs bursts by means of buffering, and localizes failures, however requires cautious dealing with of retries, backpressure, and dead-letter queues to keep away from hidden overload and unbounded queues. By this tutorial, we demonstrated that resilience in distributed programs doesn’t come from selecting a single structure, however from combining the suitable communication mannequin with disciplined failure-handling patterns and capacity-aware design.


Take a look at the FULL CODES right here. Additionally, be happy to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you possibly can be part of us on telegram as nicely.


Michal Sutter is an information science skilled with a Grasp of Science in Information Science from the College of Padova. With a stable basis in statistical evaluation, machine studying, and information engineering, Michal excels at reworking complicated datasets into actionable insights.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles