Aiosetups -
def get(self, name: str): return self._resources.get(name) async def init_postgres(): # Imagine asyncpg.connect(...) return "conn": "fake pg connection"
async def main(): db = await init_db() redis = await init_redis() http_client = await init_http() # ... cleanup code scattered you use a setup registry. # aiosetups.py import asyncio from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Callable, Dict, List class AsyncSetup: """Register and manage async initializers and cleaners.""" def init (self): self._init_tasks: List[Callable[[], Any]] = [] self._cleanup_tasks: List[Callable[[], Any]] = [] self._resources: Dict[str, Any] = {}
async def init_redis(): return "client": "fake redis" aiosetups
async def cleanup_all(self): for task in self._cleanup_tasks: await task()
async def setup_all(self): for task in self._init_tasks: await task() def get(self, name: str): return self
def add(self, name: str, init_func, cleanup_func=None): async def _init(): self._resources[name] = await init_func() self._init_tasks.append(_init) if cleanup_func: async def _clean(): await cleanup_func(self._resources[name]) self._cleanup_tasks.insert(0, _clean) # reverse order cleanup
# ... your app logic here ...
| Package | Purpose | |----------------|--------------------------------------| | async-lru | Async caching | | aiofiles | Async file I/O | | aioredis | Redis client | | asyncpg | PostgreSQL driver | | anyio / asyncio | Low-level async primitives |