legco_ai_assistant/backend/app/services/video_service.py

36 lines
1.4 KiB
Python

from pathlib import Path
from fastapi import HTTPException
class VideoService:
def __init__(self, upload_dir: str, max_size_mb: int, supported_formats: list[str]):
self.upload_dir = Path(upload_dir)
self.max_size_bytes = max_size_mb * 1024 * 1024
self.supported_formats = supported_formats
self.upload_dir.mkdir(parents=True, exist_ok=True)
def validate_video(self, filename: str | None, content_type: str | None, size_bytes: int) -> None:
if not filename:
raise HTTPException(status_code=400, detail="No file selected")
ext = Path(filename).suffix.lower()
if ext not in self.supported_formats:
raise HTTPException(
status_code=400,
detail=f"Unsupported format: {ext}. Supported: {', '.join(self.supported_formats)}",
)
if size_bytes > self.max_size_bytes:
raise HTTPException(
status_code=413,
detail=f"File exceeds {self.max_size_bytes // 1024 // 1024}MB limit",
)
def get_video_path(self, video_id: str) -> Path:
candidates = list(self.upload_dir.glob(f"{video_id}.*"))
if not candidates:
raise HTTPException(status_code=404, detail="Video not found")
return candidates[0]
def delete_video(self, video_id: str) -> None:
for p in self.upload_dir.glob(f"{video_id}.*"):
p.unlink()