Files
Ballet-Production-Suite/backend/main.py

168 lines
6.3 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session, select
from .database import create_db_and_tables, get_session
from . import models, auth
from datetime import timedelta
app = FastAPI(title="Ballet Production Suite API")
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.get("/")
def read_root():
return {"message": "Welcome to the Ballet Production Suite API"}
# --- Authentication ---
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), session: Session = Depends(get_session)):
user = session.exec(select(models.User).where(models.User.username == form_data.username)).first()
if not user or not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/users/", response_model=models.User)
def create_user(user: models.User, session: Session = Depends(get_session)):
user.hashed_password = auth.get_password_hash(user.hashed_password)
session.add(user)
session.commit()
session.refresh(user)
return user
# --- Materials ---
@app.post("/materials/", response_model=models.Material)
def create_material(material: models.Material, session: Session = Depends(get_session)):
session.add(material)
session.commit()
session.refresh(material)
return material
@app.get("/materials/", response_model=list[models.Material])
def read_materials(session: Session = Depends(get_session)):
materials = session.exec(select(models.Material)).all()
return materials
# --- Products ---
@app.post("/products/", response_model=models.Product)
def create_product(product: models.Product, session: Session = Depends(get_session)):
session.add(product)
session.commit()
session.refresh(product)
return product
@app.get("/products/", response_model=list[dict])
def read_products(session: Session = Depends(get_session)):
products = session.exec(select(models.Product)).all()
result = []
for p in products:
# Calculate cost based on materials
total_cost = p.base_price
for link in p.materials:
total_cost += link.quantity_required * link.material.cost_per_unit
p_dict = p.dict()
p_dict["total_cost"] = total_cost
result.append(p_dict)
return result
@app.post("/products/{product_id}/materials/")
def add_material_to_product(product_id: int, material_id: int, quantity: float, session: Session = Depends(get_session)):
link = models.ProductMaterialLink(product_id=product_id, material_id=material_id, quantity_required=quantity)
session.add(link)
session.commit()
return {"message": "Material added to product"}
# --- Clients ---
@app.post("/clients/", response_model=models.Client)
def create_client(client: models.Client, session: Session = Depends(get_session)):
session.add(client)
session.commit()
session.refresh(client)
return client
@app.get("/clients/", response_model=list[models.Client])
def read_clients(session: Session = Depends(get_session)):
clients = session.exec(select(models.Client)).all()
return clients
# --- Orders ---
@app.post("/orders/", response_model=models.Order)
def create_order(order_data: dict, session: Session = Depends(get_session)):
# This is a simplified version for demonstration.
# In a real app, you'd use a Pydantic schema for the request body.
client_id = order_data.get("client_id")
product_ids = order_data.get("product_ids", []) # List of strings or dicts
db_order = models.Order(client_id=client_id, status=models.OrderStatus.QUOTATION)
session.add(db_order)
session.commit()
session.refresh(db_order)
for p_id in product_ids:
link = models.OrderProductLink(order_id=db_order.id, product_id=p_id, quantity=1)
session.add(link)
session.commit()
session.refresh(db_order)
return db_order
@app.get("/orders/", response_model=list[models.Order])
def read_orders(session: Session = Depends(get_session)):
orders = session.exec(select(models.Order)).all()
return orders
@app.get("/orders/{order_id}", response_model=models.Order)
def read_order(order_id: int, session: Session = Depends(get_session)):
order = session.get(models.Order, order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
# --- Production ---
@app.get("/production/", response_model=list[models.ProductionStep])
def read_production_steps(session: Session = Depends(get_session)):
steps = session.exec(select(models.ProductionStep)).all()
return steps
@app.patch("/production/{step_id}", response_model=models.ProductionStep)
def update_production_step(step_id: int, status: str, session: Session = Depends(get_session)):
step = session.get(models.ProductionStep, step_id)
if not step:
raise HTTPException(status_code=404, detail="Step not found")
step.status = status
if status == "In Progress":
step.started_at = datetime.utcnow()
elif status == "Completed":
step.completed_at = datetime.utcnow()
session.add(step)
session.commit()
session.refresh(step)
return step
@app.get("/production/cutting-sheet")
def get_cutting_sheet(session: Session = Depends(get_session)):
# Group material requirements for all orders in 'Corte' status
orders_in_corte = session.exec(select(models.Order).where(models.Order.status == "Corte")).all()
cutting_sheet = {} # material_name -> total_quantity
for order in orders_in_corte:
for product_link in order.products:
product = product_link.product
for mat_link in product.materials:
mat_name = mat_link.material.name
qty = mat_link.quantity_required * product_link.quantity
cutting_sheet[mat_name] = cutting_sheet.get(mat_name, 0) + qty
return cutting_sheet