Files
freepik-api/app/services/file_manager.py

68 lines
1.9 KiB
Python
Raw Permalink Normal View History

import logging
import os
import time
from pathlib import Path
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
def ensure_directories():
os.makedirs(settings.output_dir, exist_ok=True)
os.makedirs(settings.temp_dir, exist_ok=True)
async def download_result(task_id: str, url: str) -> str:
"""Download a result file from a URL and store it locally."""
task_dir = os.path.join(settings.output_dir, task_id)
os.makedirs(task_dir, exist_ok=True)
# Derive filename from URL
filename = url.rsplit('/', 1)[-1].split('?')[0] or 'result'
if '.' not in filename:
filename += '.png'
output_path = os.path.join(task_dir, filename)
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.get(url)
response.raise_for_status()
with open(output_path, 'wb') as f:
f.write(response.content)
logger.info(f'Downloaded result for {task_id}: {output_path}')
return output_path
def get_result_path(task_id: str) -> str | None:
"""Get the path to a task's result file."""
task_dir = os.path.join(settings.output_dir, task_id)
if not os.path.isdir(task_dir):
return None
files = os.listdir(task_dir)
if not files:
return None
return os.path.join(task_dir, files[0])
def cleanup_old_outputs():
"""Remove output directories older than auto_cleanup_hours."""
cutoff = time.time() - (settings.auto_cleanup_hours * 3600)
output_dir = Path(settings.output_dir)
if not output_dir.exists():
return
removed = 0
for task_dir in output_dir.iterdir():
if task_dir.is_dir() and task_dir.stat().st_mtime < cutoff:
for f in task_dir.iterdir():
f.unlink()
task_dir.rmdir()
removed += 1
if removed:
logger.info(f'Cleaned up {removed} old output directories')