Gloe & FastAPI¶
Let’s create an example that demonstrates how to use Gloe with FastAPI to build a pipeline for process and store an e-commerce order in a HTTP server.
Setup¶
Ensure you have the necessary packages installed:
pip install gloe fastapi
The below imports are necessary for the rest of the code:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from gloe import transformer, async_transformer
Define the Models¶
We will define the Pydantic models for the order, user, and processed order:
class Product(BaseModel):
product_id: int
quantity: int
class Order(BaseModel):
id: int
user_id: int
items: list[Product]
class User(BaseModel):
id: int
name: str
email: str
shipping_address: str
class OrderItem(BaseModel):
product_id: int
name: str
quantity: int
price: float
class ProcessedOrder(BaseModel):
order_id: int
user: User
items: list[OrderItem]
total_amount: float
Define the Transformers¶
The bellow transformers will be used to process the order data.
The first transformer validates the order items, ensuring that the products exist:
@async_transformer
async def validate_order_items(order: Order) -> Order:
"""Validates order data, ensuring items exist."""
item_ids = [item.product_id for item in order.items]
try:
_ = await ProductService.get_by_ids(item_ids)
except ProductNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return order
Important
All the services (ProductService, UserService, and OrderService) are assumed to be available and implemented elsewhere.
The second one fetches user data based on the user ID from the order:
@async_transformer
async def fetch_user_data(order: Order) -> User:
"""Fetches user data based on user ID."""
user_data = UserService.get(order.user_id)
if not user_data:
raise HTTPException(
status_code=404,
detail=f"User with ID {order.user_id} not found.",
)
return user_data
The third transformer enriches the order with product details:
@async_transformer
async def enrich_order_with_product(order: Order) -> list[OrderItem]:
"""Adds product details to the order."""
enriched_items = []
for item in order.items:
product = await ProductService.get(item.product_id)
enriched_item = OrderItem(
product_id=item.product_id,
name=product.name,
quantity=item.quantity,
price=product.price,
)
enriched_items.append(enriched_item)
return enriched_items
The fourth transformer creates the final processed order object:
@transformer
def create_processed_order(
order: Order, user: User, items: list[OrderItem]
) -> ProcessedOrder:
"""Creates the final processed order object."""
total_amount = sum(item.price * item.quantity for item in items)
processed_order = ProcessedOrder(
order_id=order.id, user=user, items=items, total_amount=total_amount
)
return processed_order
The final transformer saves the order to the database:
@async_transformer
async def save_order(order: ProcessedOrder) -> ProcessedOrder:
"""Saves the order to the database"""
return await OrderService.store(order)
Create the Pipeline¶
Finally, you our FastAPI app, we can call the process order flow with the order data:
process_order = (
validate_order_items
>> (
forward(),
fetch_user_data,
enrich_order_with_product,
)
>> create_processed_order
>> save_order
)
Let’s break down the pipeline:
The
validate_order_itemstransformer is used first to validate the order items.Then, a parallel gateway is called with tree branches:
The first branch is only a
forwardthat only pass the order from the previous transformer to the next.The
fetch_user_datatransformer is placed on the second branch to fetch the user data based on the user ID from the order.In the last branch,
enrich_order_with_producttransformer is used to enrich the order with product details.
The next
create_processed_ordertransformer receives the order, user, and items data from the branches to create the final processed order object.Finally, the
save_ordertransformer is appended to save the order to the database.
Call the Pipeline¶
app = FastAPI()
@app.post("/orders", response_model=ProcessedOrder)
async def create_order(order: Order):
return process_order(order)
Plot the Pipeline¶
Finally, we can visualize the pipeline using the .to_image() method:
process_order.to_image('process-order-pipeline.png')
