HomeSample Page

Sample Page Title


On this tutorial, we construct a totally practical event-driven workflow utilizing Kombu, treating messaging as a core architectural functionality. We stroll by way of step-by-step the setup of exchanges, routing keys, background employees, and concurrent producers, permitting us to look at an actual distributed system. As we implement every part, we see how clear message movement, asynchronous processing, and routing patterns give us the identical energy that manufacturing microservices depend on on daily basis. Take a look at the FULL CODES.

!pip set up kombu


import threading
import time
import logging
import uuid
import datetime
import sys


from kombu import Connection, Trade, Queue, Producer, Shopper
from kombu.mixins import ConsumerMixin


logging.basicConfig(
   degree=logging.INFO,
   format="%(message)s",
   handlers=[logging.StreamHandler(sys.stdout)],
   pressure=True
)
logger = logging.getLogger(__name__)


BROKER_URL = "reminiscence://localhost/"

We start by putting in Kombu, importing dependencies, and configuring logging so we are able to clearly see each message flowing by way of the system. We additionally set the in-memory dealer URL, permitting us to run all the pieces regionally in Colab without having RabbitMQ. This setup kinds the inspiration for our distributed messaging workflow. Take a look at the FULL CODES.

media_exchange = Trade('media_exchange', sort="subject", sturdy=True)


task_queues = [
   Queue('video_queue', media_exchange, routing_key='video.#'),
   Queue('audit_queue', media_exchange, routing_key='#'),
]

We outline a subject alternate to flexibly route messages utilizing wildcard patterns. We additionally create two queues: one devoted to video-related duties and one other audit queue that listens to all the pieces. Utilizing subject routing, we are able to exactly management how messages movement throughout the system. Take a look at the FULL CODES.

class Employee(ConsumerMixin):
   def __init__(self, connection, queues):
       self.connection = connection
       self.queues = queues
       self.should_stop = False


   def get_consumers(self, Shopper, channel):
       return [
           Consumer(queues=self.queues,
                    callbacks=[self.on_message],
                    settle for=['json'],
                    prefetch_count=1)
       ]


   def on_message(self, physique, message):
       routing_key = message.delivery_info['routing_key']
       payload_id = physique.get('id', 'unknown')


       logger.information(f"n⚡ RECEIVED MSG by way of key: [{routing_key}]")
       logger.information(f"   Payload ID: {payload_id}")
      
       strive:
           if 'video' in routing_key:
               self.process_video(physique)
           elif 'audit' in routing_key:
               logger.information("   🔍 [Audit] Logging occasion...")
          
           message.ack()
           logger.information(f"   ✅ ACKNOWLEDGED")


       besides Exception as e:
           logger.error(f"   ❌ ERROR: {e}")


   def process_video(self, physique):
       logger.information("   ⚙️  [Processor] Transcoding video (Simulating work...)")
       time.sleep(0.5)

We implement a customized employee utilizing Kombu’s ConsumerMixin to run it in a background thread. Within the message callback, we examine the routing key, invoke the suitable processing operate, and acknowledge the message. This employee structure offers us clear, concurrent message consumption with full management. Take a look at the FULL CODES.

def publish_messages(connection):
   producer = Producer(connection)
  
   duties = [
       ('video.upload', {'file': 'movie.mp4'}),
       ('user.login', {'user': 'admin'}),
   ]


   logger.information("n🚀 PRODUCER: Beginning to publish messages...")
  
   for r_key, knowledge in duties:
       knowledge['id'] = str(uuid.uuid4())[:8]
      
       logger.information(f"📤 SENDING: {r_key} -> {knowledge}")
      
       producer.publish(
           knowledge,
           alternate=media_exchange,
           routing_key=r_key,
           serializer="json"
       )
       time.sleep(1.5)


   logger.information("🏁 PRODUCER: Executed.")

We now construct a producer that sends structured JSON payloads into the alternate with completely different routing keys. We generate distinctive IDs for every occasion and observe how they’re routed to different queues. This mirrors real-world microservice occasion publishing, the place producers and customers stay decoupled. Take a look at the FULL CODES.

def run_example():
   with Connection(BROKER_URL) as conn:
       employee = Employee(conn, task_queues)
       worker_thread = threading.Thread(goal=employee.run)
       worker_thread.daemon = True
       worker_thread.begin()
      
       logger.information("✅ SYSTEM: Employee thread began.")
       time.sleep(1)


       strive:
           publish_messages(conn)
           time.sleep(2)
       besides KeyboardInterrupt:
           move
       lastly:
           employee.should_stop = True
           logger.information("n👋 SYSTEM: Execution full.")


if __name__ == "__main__":
   run_example()

We begin the employee in a background thread and hearth the producer in the primary thread. This construction offers us a mini distributed system operating in Colab. By observing the logs, we see messages revealed → routed → consumed → acknowledged, finishing the complete event-processing lifecycle.

In conclusion, we orchestrated a dynamic, distributed task-routing pipeline that processes real-time occasions with readability and precision. We witnessed how Kombu abstracts away the complexity of messaging programs whereas nonetheless giving us fine-grained management over routing, consumption, and employee concurrency. As we see messages transfer from producer to alternate to queue to employee, we gained a deeper appreciation for the class of event-driven system design, and we at the moment are well-equipped to scale this basis into strong microservices, background processors, and enterprise-grade workflows.


Take a look at the FULL CODES. Be at liberty to take a look at our GitHub Web page for Tutorials, Codes and Notebooks. Additionally, be at liberty to comply with us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles