HomeSample Page

Sample Page Title


On this tutorial, we implement an end-to-end Sensible Byzantine Fault Tolerance (PBFT) simulator utilizing asyncio. We mannequin a practical distributed community with asynchronous message passing, configurable delays, and Byzantine nodes that deliberately deviate from the protocol. By explicitly implementing the pre-prepare, put together, and commit phases, we discover how PBFT achieves consensus beneath adversarial circumstances whereas respecting the theoretical 3f+1 sure. We additionally instrument the system to measure consensus latency and success charges because the variety of malicious nodes will increase, permitting us to empirically observe the boundaries of Byzantine fault tolerance.

import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, discipline
from typing import Dict, Set, Tuple, Non-compulsory, Record
import matplotlib.pyplot as plt


PREPREPARE = "PREPREPARE"
PREPARE    = "PREPARE"
COMMIT     = "COMMIT"


@dataclass(frozen=True)
class Msg:
   typ: str
   view: int
   seq: int
   digest: str
   sender: int


@dataclass
class NetConfig:
   min_delay_ms: int = 5
   max_delay_ms: int = 40
   drop_prob: float = 0.0
   reorder_prob: float = 0.0

We set up the simulator’s basis by importing the required libraries and defining the core PBFT message sorts. We formalize community messages and parameters utilizing dataclasses to make sure structured, constant communication. We additionally outline constants representing the three PBFT phases used all through the system.

class Community:
   def __init__(self, cfg: NetConfig):
       self.cfg = cfg
       self.nodes: Dict[int, "Node"] = {}


   def register(self, node: "Node"):
       self.nodes[node.nid] = node


   async def ship(self, dst: int, msg: Msg):
       if random.random() < self.cfg.drop_prob:
           return


       d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
       await asyncio.sleep(d)


       if random.random() < self.cfg.reorder_prob:
           await asyncio.sleep(random.uniform(0.0, 0.02))


       await self.nodes[dst].inbox.put(msg)


   async def broadcast(self, src: int, msg: Msg):
       duties = []
       for nid in self.nodes.keys():
           duties.append(asyncio.create_task(self.ship(nid, msg)))
       await asyncio.collect(*duties)

We implement an asynchronous community layer that simulates real-world message supply with delays, reordering, and potential drops. We register nodes dynamically and use asyncio duties to broadcast messages throughout the simulated community. We mannequin non-deterministic communication habits that straight impacts consensus latency and robustness.

@dataclass
class NodeConfig:
   n: int
   f: int
   primary_id: int = 0
   view: int = 0
   timeout_s: float = 2.0


class Node:
   def __init__(self, nid: int, web: Community, cfg: NodeConfig, byzantine: bool = False):
       self.nid = nid
       self.web = web
       self.cfg = cfg
       self.byzantine = byzantine


       self.inbox: asyncio.Queue[Msg] = asyncio.Queue()


       self.preprepare_seen: Dict[int, str] = {}
       self.prepare_votes: Dict[Tuple[int, str], Set[int]] = {}
       self.commit_votes: Dict[Tuple[int, str], Set[int]] = {}


       self.dedicated: Dict[int, str] = {}
       self.operating = True


   @property
   def f(self) -> int:
       return self.cfg.f


   def _q_prepare(self) -> int:
       return 2 * self.f + 1


   def _q_commit(self) -> int:
       return 2 * self.f + 1


   @staticmethod
   def digest_of(payload: str) -> str:
       return hashlib.sha256(payload.encode("utf-8")).hexdigest()

We outline the configuration and inside state of every PBFT node taking part within the protocol. We initialize knowledge buildings for monitoring pre-prepare, put together, and commit votes whereas supporting each trustworthy and Byzantine habits. We additionally implement quorum threshold logic and deterministic digest technology for request validation.

 async def suggest(self, payload: str, seq: int):
       if self.nid != self.cfg.primary_id:
           increase ValueError("Solely the first can suggest on this simplified simulator.")


       if not self.byzantine:
           dig = self.digest_of(payload)
           msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
           await self.web.broadcast(self.nid, msg)
           return


       for dst in self.web.nodes.keys():
           variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
           dig = self.digest_of(variant)
           msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
           await self.web.ship(dst, msg)


   async def handle_preprepare(self, msg: Msg):
       seq = msg.seq
       dig = msg.digest


       if self.byzantine:
           if random.random() < 0.5:
               return
           fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::pretend")
           out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
           await self.web.broadcast(self.nid, out)
           return


       if seq not in self.preprepare_seen:
           self.preprepare_seen[seq] = dig
           out = Msg(PREPARE, msg.view, seq, dig, self.nid)
           await self.web.broadcast(self.nid, out)


   async def handle_prepare(self, msg: Msg):
       seq, dig = msg.seq, msg.digest
       key = (seq, dig)
       voters = self.prepare_votes.setdefault(key, set())
       voters.add(msg.sender)


       if self.byzantine:
           return


       if self.preprepare_seen.get(seq) != dig:
           return


       if len(voters) >= self._q_prepare():
           out = Msg(COMMIT, msg.view, seq, dig, self.nid)
           await self.web.broadcast(self.nid, out)


   async def handle_commit(self, msg: Msg):
       seq, dig = msg.seq, msg.digest
       key = (seq, dig)
       voters = self.commit_votes.setdefault(key, set())
       voters.add(msg.sender)


       if self.byzantine:
           return


       if self.preprepare_seen.get(seq) != dig:
           return


       if seq in self.dedicated:
           return


       if len(voters) >= self._q_commit():
           self.dedicated[seq] = dig

We implement the core PBFT protocol logic, together with proposal dealing with and the pre-prepare and put together phases. We explicitly mannequin Byzantine equivocation by permitting malicious nodes to ship conflicting digests to completely different friends. We advance the protocol to the commit section as soon as the required put together quorum is reached.

 async def run(self):
       whereas self.operating:
           msg = await self.inbox.get()
           if msg.typ == PREPREPARE:
               await self.handle_preprepare(msg)
           elif msg.typ == PREPARE:
               await self.handle_prepare(msg)
           elif msg.typ == COMMIT:
               await self.handle_commit(msg)


   def cease(self):
       self.operating = False


def pbft_params(n: int) -> int:
   return (n - 1) // 3


async def run_single_consensus(
   n: int,
   malicious: int,
   net_cfg: NetConfig,
   payload: str = "tx: pay Alice->Bob 5",
   seq: int = 1,
   timeout_s: float = 2.0,
   seed: Non-compulsory[int] = None
) -> Dict[str, object]:
   if seed just isn't None:
       random.seed(seed)


   f_max = pbft_params(n)
   f = f_max


   web = Community(net_cfg)
   cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)


   mal_set = set(random.pattern(vary(n), ok=min(malicious, n)))
   nodes: Record[Node] = []
   for i in vary(n):
       node = Node(i, web, cfg, byzantine=(i in mal_set))
       web.register(node)
       nodes.append(node)


   duties = [asyncio.create_task(node.run()) for node in nodes]


   t0 = time.perf_counter()
   await nodes[cfg.primary_id].suggest(payload, seq)


   trustworthy = [node for node in nodes if not node.byzantine]
   goal = max(1, len(trustworthy))


   committed_honest = 0
   latency = None


   async def poll_commits():
       nonlocal committed_honest, latency
       whereas True:
           committed_honest = sum(1 for node in trustworthy if seq in node.dedicated)
           if committed_honest >= goal:
               latency = time.perf_counter() - t0
               return
           await asyncio.sleep(0.005)


   attempt:
       await asyncio.wait_for(poll_commits(), timeout=timeout_s)
       success = True
   besides asyncio.TimeoutError:
       success = False
       latency = None


   for node in nodes:
       node.cease()
   for process in duties:
       process.cancel()
   await asyncio.collect(*duties, return_exceptions=True)


   digest_set = set(node.dedicated.get(seq) for node in trustworthy if seq in node.dedicated)
   agreed = (len(digest_set) == 1) if success else False


   return {
       "n": n,
       "f": f,
       "malicious": malicious,
       "mal_set": mal_set,
       "success": success,
       "latency_s": latency,
       "honest_committed": committed_honest,
       "honest_total": len(trustworthy),
       "agreed_digest": agreed,
   }

We full the PBFT state machine by processing commit messages and finalizing selections as soon as commit quorums are happy. We run the node occasion loop to constantly course of incoming messages asynchronously. We additionally embrace lifecycle controls to securely cease nodes after every experiment run.

async def latency_sweep(
   n: int = 10,
   max_malicious: Non-compulsory[int] = None,
   trials_per_point: int = 5,
   timeout_s: float = 2.0,
   net_cfg: Non-compulsory[NetConfig] = None,
   seed: int = 7
):
   if net_cfg is None:
       net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)


   if max_malicious is None:
       max_malicious = n


   outcomes = []
   random.seed(seed)


   for m in vary(0, max_malicious + 1):
       latencies = []
       successes = 0
       agreements = 0


       for t in vary(trials_per_point):
           out = await run_single_consensus(
               n=n,
               malicious=m,
               net_cfg=net_cfg,
               timeout_s=timeout_s,
               seed=seed + 1000*m + t
           )
           outcomes.append(out)
           if out["success"]:
               successes += 1
               latencies.append(out["latency_s"])
               if out["agreed_digest"]:
                   agreements += 1


       avg_lat = sum(latencies)/len(latencies) if latencies else None
       print(
           f"malicious={m:second} | success={successes}/{trials_per_point} "
           f"| avg_latency={avg_lat if avg_lat just isn't None else 'NA'} "
           f"| digest_agreement={agreements}/{successes if successes else 1}"
       )


   return outcomes


def plot_latency(outcomes: Record[Dict[str, object]], trials_per_point: int):
   by_m = {}
   for r in outcomes:
       m = r["malicious"]
       by_m.setdefault(m, []).append(r)


   xs, ys = [], []
   success_rate = []
   for m in sorted(by_m.keys()):
       group = by_m[m]
       lats = [g["latency_s"] for g in group if g["latency_s"] just isn't None]
       succ = sum(1 for g in group if g["success"])
       xs.append(m)
       ys.append(sum(lats)/len(lats) if lats else float("nan"))
       success_rate.append(succ / len(group))


   plt.determine()
   plt.plot(xs, ys, marker="o")
   plt.xlabel("Variety of malicious (Byzantine) nodes")
   plt.ylabel("Consensus latency (seconds) — avg over successes")
   plt.title("PBFT Simulator: Latency vs Malicious Nodes")
   plt.grid(True)
   plt.present()


   plt.determine()
   plt.plot(xs, success_rate, marker="o")
   plt.xlabel("Variety of malicious (Byzantine) nodes")
   plt.ylabel("Success price")
   plt.title("PBFT Simulator: Success Charge vs Malicious Nodes")
   plt.ylim(-0.05, 1.05)
   plt.grid(True)
   plt.present()


async def principal():
   n = 10
   trials = 6
   f = pbft_params(n)
   print(f"n={n} => PBFT theoretical max f = ground((n-1)/3) = {f}")
   print("Principle: security/liveness sometimes assumed when malicious <= f and timing assumptions maintain.n")


   outcomes = await latency_sweep(
       n=n,
       max_malicious=min(n, f + 6),
       trials_per_point=trials,
       timeout_s=2.0,
       net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
       seed=11
   )
   plot_latency(outcomes, trials)


await principal()

We orchestrate large-scale experiments by sweeping throughout completely different numbers of malicious nodes and amassing latency statistics. We combination outcomes to research consensus success charges and visualize system habits utilizing plots. We run the total experiment pipeline and observe how PBFT degrades because the variety of Byzantine faults approaches and exceeds theoretical limits.

In conclusion, we gained hands-on perception into how PBFT behaves past textbook ensures and the way adversarial strain impacts each latency and liveness in follow. We noticed how quorum thresholds implement security, why consensus breaks down as soon as Byzantine nodes exceed the tolerated sure, and the way asynchronous networks amplify these results. This implementation offers a sensible basis for experimenting with extra superior distributed-systems ideas, similar to view adjustments, chief rotation, or authenticated messaging. It helps us construct instinct for the design trade-offs that underpin trendy blockchain and distributed belief programs.


Try the Full Codes right here. Additionally, be happy to comply with us on Twitter and don’t neglect to affix our 120k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you’ll be able to be part of us on telegram as effectively.


Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles