A code implementation of a real -time -Sensoralar pipeline in Google Colab with Faststream, Rabbitmq, Testrabbit Broker, Pydantic

In this notebook, we demonstrate how to build a full memory with “Sensor Alarm” pipeline in Google Colab using Faststream, a high -performance, python -native stream treatment frame and its integration with Rabbitmq. By utilizing Faststream.Rabbit’s rabbitbroker and test rabbit broker, we simulate a message broker without needing external infrastructure. We orchestrate four different stages: Ingestion and validation, normalization, monitoring and alarm generation and archiving, each defined as pydantic models (rawsensor data, normalized data, alert data) to ensure data quality and type of security. Under the hood, Python’s asyncio -power asynchronous message stream, while nest_asyncio enables embedded event loops in Colab. We also use the standard logging module for traceable pipeline execution and pandas for inspection of the end result, making it easy to visualize archived alarms in a data frame.

!pip install -q faststream[rabbit] nest_asyncio

We install Faststream with its Rabbitmq integration, which delivers core power processing frames and brokerage, as well as the Nest_asyncio package that enables embedded asyncio event loops in environments such as Colab. All this is achieved while output is minimal with -Q flag.

import nest_asyncio, asyncio, logging
nest_asyncio.apply()

We import nest_asyncio, asyncio and logging modules, use nest_asyncio.apply () to patch Python’s event loop so you can run embedded asynchronous tasks in environments such as Colab or Jupyter Notebooks without error. The login import reads you to instrument your pipeline with detailed runtime logs.

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")

We configure Python’s built -in logging to emit info level (and above) messages prefixed with a timestamp and severity, and then create a dedicated loger with the name “Sensor_pipeline” to release structured logs in your streaming pipeline.

from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List

We bring in Faststream’s core -Faststream class with its Rabbitmq stitch (Rabbit Broker for Real Brokers and Testrabbit Broker for Testing Memory), Pydantics Base Model, Field and Validator for Declarative Data Evalidation, Pandas to Tab Certificate Inspection and Python’s List Type To Appoint Our In -Mormory.

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app    = FastStream(broker)

We instantate a rabbit broker that pointed to a (local) Rabbitmq server using the AMQP watch, and then create a fixed stream application that is bound to the broker and set the messaging rage to your pipeline stages.

class RawSensorData(BaseModel):
    sensor_id: str       = Field(..., examples=["sensor_1"])
    reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
   
    @validator("sensor_id")
    def must_start_with_sensor(cls, v):
        if not v.startswith("sensor_"):
            raise ValueError("sensor_id must start with 'sensor_'")
        return v


class NormalizedData(BaseModel):
    sensor_id: str
    reading_kelvin: float


class AlertData(BaseModel):
    sensor_id: str
    reading_kelvin: float
    alert: bool

These Pydantic models define the form for each step: Rawsensordata enforces input validity (eg reading area and a sensor_ prefix), normalizedized data converts Celsius to Kelvin, and Alertdata encapsulates the final alarm load (including a Boolian flag), ensuring a type of SAFE-DATAFLOW.

archive: List[AlertData] = []


@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
    logger.info(f"Ingested raw data: {raw.json()}")
    return raw.dict()


@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
    norm = NormalizedData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_celsius"] + 273.15
    )
    logger.info(f"Normalized to Kelvin: {norm.json()}")
    return norm.dict()


ALERT_THRESHOLD_K = 323.15  
   
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
    alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
    alert = AlertData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_kelvin"],
        alert=alert_flag
    )
    logger.info(f"Monitor result: {alert.json()}")
    return alert.dict()


@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
    rec = AlertData(**payload)
    archive.append(rec)
    logger.info(f"Archived: {rec.json()}")

An archive list in memory collects all completed alarms, while four asynchronous features, wired via @broker.subscriber/ @broker.publisher, form the pipeline stages. These features occupy and validate raw sensor inputs, convert Celsius to Kelvin, check against an alarm limit and eventually archive each alarm data item that emits logs at each step of full traceability.

async def main():
    readings = [
        {"sensor_id": "sensor_1", "reading_celsius": 45.2},
        {"sensor_id": "sensor_2", "reading_celsius": 75.1},
        {"sensor_id": "sensor_3", "reading_celsius": 50.0},
    ]
    async with TestRabbitBroker(broker) as tb:
        for r in readings:
            await tb.publish(r, "sensor_input")
        await asyncio.sleep(0.1)
       
    df = pd.DataFrame([a.dict() for a in archive])
    print("\nFinal Archived Alerts:")
    display(df)


asyncio.run(main())

Finally, the main coronetine publishes a set of test sensor readings in the test rabbit broker in memory, holds brief breaks to allow each pipeline stage to run, and then collects the resulting alarm data records from the archive for a Pandas data letter for light viewing and verification of the end-to-end alarm stream. At the end, asyncio.run (Main ()) starts the entire async demo in Colab.

Finally, this tutorial shows how Faststream, combined with RabbitMQ abstractions and in-memory test via test rabbit brokes, can speed up the development of real-time data pipes at no cost to implement external brokers. With Pydantic Handling Schedule Validation, Asyncio Handling Simultuality and Pandas that enable Rapid Data Analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks or event -driven workflows. You can trouble -free from this memory demo to production by switching in a live broker -url (Rabbitmq, Kafka, Nats or Redis) and running Faststream Run under Uvicorn or your preferred Asgi server, locking scalable, maintenance power treatment in any python environment.


Here it is Colab notebook. Nor do not forget to follow us on Twitter and join in our Telegram Channel and LinkedIn GrOUP. Don’t forget to take part in our 90k+ ml subbreddit.

🔥 [Register Now] Minicon Virtual Conference On Agentic AI: Free Registration + Certificate for Participation + 4 Hours Short Event (21 May, 9- 13.00 pst) + Hands on Workshop


Sana Hassan, a consultant intern at MarkTechpost and dual-degree students at IIT Madras, is passionate about using technology and AI to tackle challenges in the real world. With a great interest in solving practical problems, he brings a new perspective to the intersection of AI and real solutions.

Leave a Comment