84 lines
3.3 KiB
Python
84 lines
3.3 KiB
Python
|
|
import pytest
|
||
|
|
from unittest.mock import patch, AsyncMock
|
||
|
|
from datetime import datetime
|
||
|
|
from app.models.session import SessionType, TaskStatus, CreateSessionRequest
|
||
|
|
from app.services.session_manager import SessionManager
|
||
|
|
|
||
|
|
|
||
|
|
class TestSessionManager:
|
||
|
|
@pytest.fixture(autouse=True)
|
||
|
|
def setup(self, mock_settings, mock_opencode_client):
|
||
|
|
self.manager = SessionManager()
|
||
|
|
self.manager.tasks = {}
|
||
|
|
self.mock_client = mock_opencode_client
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_create_task(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
request = CreateSessionRequest(
|
||
|
|
type=SessionType.EPHEMERAL, prompt="test prompt"
|
||
|
|
)
|
||
|
|
|
||
|
|
task = await self.manager.create_task(request)
|
||
|
|
|
||
|
|
assert task.id is not None
|
||
|
|
assert task.type == SessionType.EPHEMERAL
|
||
|
|
assert task.prompt == "test prompt"
|
||
|
|
assert task.status == TaskStatus.PENDING
|
||
|
|
assert task.created_at is not None
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_execute_task_sync(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
request = CreateSessionRequest(prompt="sync task")
|
||
|
|
task = await self.manager.create_task(request)
|
||
|
|
|
||
|
|
result = await self.manager.execute_task(task.id)
|
||
|
|
|
||
|
|
assert result is not None
|
||
|
|
updated_task = await self.manager.get_task(task.id)
|
||
|
|
assert updated_task.status == TaskStatus.COMPLETED
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_execute_task_async(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
request = CreateSessionRequest(prompt="async task")
|
||
|
|
task = await self.manager.create_task(request)
|
||
|
|
|
||
|
|
await self.manager.execute_task_async(task.id)
|
||
|
|
|
||
|
|
updated_task = await self.manager.get_task(task.id)
|
||
|
|
assert updated_task.status == TaskStatus.RUNNING
|
||
|
|
assert updated_task.session_id is not None
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_abort_task(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
request = CreateSessionRequest(prompt="abortable task")
|
||
|
|
task = await self.manager.create_task(request)
|
||
|
|
await self.manager.execute_task_async(task.id)
|
||
|
|
|
||
|
|
result = await self.manager.abort_task(task.id)
|
||
|
|
|
||
|
|
assert result is True
|
||
|
|
updated_task = await self.manager.get_task(task.id)
|
||
|
|
assert updated_task.status == TaskStatus.ABORTED
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_get_task_not_found(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
task = await self.manager.get_task("nonexistent")
|
||
|
|
assert task is None
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_list_tasks(self):
|
||
|
|
with patch("app.services.session_manager.opencode_client", self.mock_client):
|
||
|
|
request1 = CreateSessionRequest(prompt="task 1")
|
||
|
|
request2 = CreateSessionRequest(prompt="task 2")
|
||
|
|
|
||
|
|
await self.manager.create_task(request1)
|
||
|
|
await self.manager.create_task(request2)
|
||
|
|
|
||
|
|
tasks = await self.manager.list_tasks()
|
||
|
|
assert len(tasks) == 2
|