- 基于 FastAPI 的 Web API 服务 - OpenCode API 客户端封装 - 会话管理器(同步/异步任务执行) - APScheduler 定时任务调度 - 完整的 REST API 端点
80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
import uuid
|
|
from pydantic import BaseModel
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from app.services.session_manager import session_manager
|
|
from app.core.logging import logger
|
|
|
|
|
|
class ScheduleTask(BaseModel):
|
|
id: str
|
|
name: str
|
|
cron: str
|
|
prompt: str
|
|
enabled: bool = True
|
|
|
|
|
|
class SchedulerService:
|
|
def __init__(self):
|
|
self.scheduler = AsyncIOScheduler()
|
|
self.scheduled_tasks: dict[str, ScheduleTask] = {}
|
|
|
|
def start(self):
|
|
self.scheduler.start()
|
|
logger.info("Scheduler started")
|
|
|
|
def shutdown(self):
|
|
self.scheduler.shutdown()
|
|
logger.info("Scheduler shutdown")
|
|
|
|
async def add_schedule(self, task: ScheduleTask) -> ScheduleTask:
|
|
self.scheduled_tasks[task.id] = task
|
|
|
|
if task.enabled:
|
|
trigger = CronTrigger.from_crontab(task.cron)
|
|
self.scheduler.add_job(
|
|
self._run_scheduled_task,
|
|
trigger,
|
|
args=[task.id],
|
|
id=task.id,
|
|
)
|
|
logger.info(f"Added schedule: {task.name} ({task.cron})")
|
|
|
|
return task
|
|
|
|
async def remove_schedule(self, task_id: str) -> bool:
|
|
task = self.scheduled_tasks.pop(task_id, None)
|
|
if not task:
|
|
return False
|
|
|
|
self.scheduler.remove_job(task_id)
|
|
logger.info(f"Removed schedule: {task.name}")
|
|
return True
|
|
|
|
async def list_schedules(self) -> list[ScheduleTask]:
|
|
return list(self.scheduled_tasks.values())
|
|
|
|
async def _run_scheduled_task(self, task_id: str):
|
|
task = self.scheduled_tasks.get(task_id)
|
|
if not task:
|
|
return
|
|
|
|
logger.info(f"Running scheduled task: {task.name}")
|
|
|
|
from app.models.session import CreateSessionRequest, SessionType
|
|
|
|
request = CreateSessionRequest(
|
|
type=SessionType.SCHEDULED,
|
|
prompt=task.prompt,
|
|
)
|
|
task_obj = await session_manager.create_task(request)
|
|
|
|
try:
|
|
await session_manager.execute_task_async(task_obj.id)
|
|
logger.info(f"Scheduled task {task.name} started")
|
|
except Exception as e:
|
|
logger.error(f"Scheduled task {task.name} failed to start: {e}")
|
|
|
|
|
|
scheduler_service = SchedulerService()
|