Files
XCClaw/app/services/storage.py

41 lines
1.2 KiB
Python

import json
import threading
from pathlib import Path
from typing import Any
from app.core.config import settings
from app.core.logging import logger
class JSONStorage:
def __init__(self):
self._lock = threading.Lock()
self.data_dir = settings.data_dir
self.data_dir.mkdir(parents=True, exist_ok=True)
def _get_file_path(self, key: str) -> Path:
return self.data_dir / f"{key}.json"
def load(self, key: str, default: Any = None) -> Any:
file_path = self._get_file_path(key)
with self._lock:
if not file_path.exists():
return default
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load {key}: {e}")
return default
def save(self, key: str, data: Any) -> None:
file_path = self._get_file_path(key)
with self._lock:
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to save {key}: {e}")
storage = JSONStorage()