비동기 처리 - async/await와 비동기 SQLAlchemy
FastAPI의 async/await 동작 원리를 이해하고, 동기 SQLAlchemy를 AsyncSession으로 전환하며 httpx로 외부 API를 비동기 호출한다.
FastAPI의 이름에 "Fast"가 붙은 이유가 바로 비동기 처리다. 지금까지 동기 방식으로 작성한 코드를 async/await로 바꾸면 어떤 일이 일어나는지, 그리고 SQLAlchemy도 비동기로 쓰는 방법을 다룬다.
동기 vs 비동기
먼저 차이를 직관적으로 이해하자.
동기 (Synchronous)
──────────────────
요청 A 처리 시작
│
│ DB 쿼리 대기 중... (200ms) ← 이 동안 아무것도 못 함
│
요청 A 처리 완료
│
요청 B 처리 시작 ← A가 끝나야 B를 시작할 수 있음
비동기 (Asynchronous)
──────────────────────
요청 A 처리 시작
│
│ DB 쿼리 보냄 → 기다리는 동안 다른 일 처리
│ │
│ 요청 B 처리 시작
│ 요청 B 처리 완료
│
│ DB 응답 도착
요청 A 처리 완료
비동기는 I/O(DB, 네트워크, 파일) 대기 시간을 낭비하지 않고 다른 요청을 처리한다. 요청이 많을수록 차이가 극적으로 벌어진다.
async/await 기본
import asyncio
# 동기 함수
def get_user_sync(user_id: int):
time.sleep(0.2) # DB 쿼리라고 가정 — 0.2초 블로킹
return {"id": user_id}
# 비동기 함수
async def get_user_async(user_id: int):
await asyncio.sleep(0.2) # 0.2초 동안 다른 코루틴 실행 가능
return {"id": user_id}
async def로 정의한 함수는 **코루틴(coroutine)**이다. await는 "이 작업이 끝날 때까지 기다리되, 그 동안 다른 코루틴을 실행해도 된다"는 의미다.
await 없이 그냥 async def만 쓰면 의미가 없다. 반드시 I/O 대기 지점에 await가 있어야 한다.
FastAPI에서 async 라우터
FastAPI는 async def와 일반 def 라우터를 모두 지원한다.
from fastapi import APIRouter
router = APIRouter()
# 동기 — FastAPI가 별도 스레드에서 실행
@router.get("/users/{user_id}")
def get_user(user_id: int):
return {"id": user_id}
# 비동기 — 이벤트 루프에서 직접 실행
@router.get("/users/{user_id}")
async def get_user(user_id: int):
return {"id": user_id}
언제 async def를 써야 하는가?
| 상황 | 권장 |
|---|---|
| DB 쿼리 (비동기 드라이버 사용) | async def |
| 외부 HTTP API 호출 | async def |
| 단순 계산, CPU 집약적 작업 | def |
동기 라이브러리 사용 (requests 등) | def |
CPU 집약적 작업을 async def로 쓰면 오히려 이벤트 루프를 막아 성능이 나빠진다. I/O 작업에만 async를 쓰자.
비동기 SQLAlchemy
지금까지 쓴 동기 SQLAlchemy를 비동기로 전환한다.
설치
pip install sqlalchemy[asyncio] asyncpg
sqlalchemy[asyncio]— SQLAlchemy 비동기 지원asyncpg— PostgreSQL 비동기 드라이버 (psycopg2 대신)
core/database.py 전환
# app/core/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.core.config import get_settings
settings = get_settings()
# postgresql:// → postgresql+asyncpg://
engine = create_async_engine(
settings.database_url.replace("postgresql://", "postgresql+asyncpg://"),
pool_size=5,
max_overflow=10,
pool_recycle=1800,
pool_pre_ping=True,
echo=settings.debug, # debug 모드일 때 SQL 출력
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # commit 후에도 객체 속성 접근 가능
)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
expire_on_commit=False — 기본값은 True인데, commit 후 객체에 접근할 때 자동으로 DB를 다시 조회한다. 비동기에서는 이게 예상치 못한 쿼리를 발생시킬 수 있어 False로 설정하는 게 안전하다.
Repository 비동기 전환
# app/users/repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.users.models import User
class UserRepository:
async def save(self, db: AsyncSession, email: str, hashed_password: str) -> User:
user = User(email=email, hashed_password=hashed_password)
db.add(user)
await db.flush()
await db.refresh(user)
return user
async def find_by_email(self, db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def find_by_id(self, db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
비동기 SQLAlchemy에서는 db.query()를 쓰지 않는다. select() 구문을 await db.execute()에 넘기는 방식으로 바뀐다.
| 동기 | 비동기 |
|---|---|
db.query(User).filter(...).first() | await db.execute(select(User).where(...)) |
db.flush() | await db.flush() |
db.commit() | await db.commit() |
db.refresh(obj) | await db.refresh(obj) |
Service 비동기 전환
# app/users/service.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.users.repository import UserRepository
from app.users.exceptions import UserAlreadyExistsException, UserNotFoundException
from app.core.hashing import hash_password
class UserService:
def __init__(self, repo: UserRepository):
self.repo = repo
async def create_user(self, db: AsyncSession, email: str, password: str) -> dict:
if await self.repo.find_by_email(db, email):
raise UserAlreadyExistsException()
hashed = hash_password(password)
user = await self.repo.save(db, email, hashed)
await db.commit()
return {"id": user.id, "email": user.email}
async def get_user(self, db: AsyncSession, user_id: int) -> dict:
user = await self.repo.find_by_id(db, user_id)
if not user:
raise UserNotFoundException(user_id)
return {"id": user.id, "email": user.email}
Router 비동기 전환
# app/users/router.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.users.service import UserService
from app.users.repository import UserRepository
from app.users.schemas import UserCreate
router = APIRouter(prefix="/users", tags=["users"])
def get_user_service() -> UserService:
return UserService(UserRepository())
@router.post("/", status_code=201)
async def create_user(
body: UserCreate,
db: AsyncSession = Depends(get_db),
service: UserService = Depends(get_user_service),
):
return await service.create_user(db, body.email, body.password)
@router.get("/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
service: UserService = Depends(get_user_service),
):
return await service.get_user(db, user_id)
외부 HTTP 요청 — httpx
외부 API를 호출할 때 동기 requests 대신 비동기 httpx를 쓴다.
pip install httpx
import httpx
# 동기 — 이벤트 루프 블로킹
import requests
response = requests.get("https://api.example.com/data")
# 비동기 — 올바른 방식
async def fetch_external_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
라우터에서 사용 예시:
@router.get("/external")
async def get_external(db: AsyncSession = Depends(get_db)):
async with httpx.AsyncClient() as client:
res = await client.get("https://jsonplaceholder.typicode.com/todos/1")
return res.json()
비동기에서 주의할 점
blocking 함수를 async에서 쓰지 마라
# 잘못된 예 — 이벤트 루프를 블로킹
async def bad_example():
import time
time.sleep(1) # ← 전체 이벤트 루프 멈춤
requests.get("...") # ← 동기 HTTP 클라이언트도 마찬가지
# 올바른 예
async def good_example():
await asyncio.sleep(1) # ← 이벤트 루프 양보
await httpx.get("...") # ← 비동기 HTTP 클라이언트
CPU 집약적 작업(이미지 처리, 머신러닝 등)이 필요하다면 asyncio.run_in_executor로 별도 스레드에서 실행한다.
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
async def heavy_task():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, cpu_intensive_function)
return result
테스트
비동기 코드를 테스트하려면 pytest-asyncio가 필요하다.
pip install pytest-asyncio
# tests/test_users.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_create_user():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/users/", json={
"email": "test@example.com",
"password": "password123"
})
assert response.status_code == 201
동기 코드를 유지해도 되는 경우
비동기 전환이 항상 정답은 아니다.
동기가 나을 수 있는 상황
- 팀원이 async/await에 익숙하지 않은 경우
- 사용 중인 라이브러리가 비동기를 지원하지 않는 경우
- 트래픽이 크지 않고 간단한 내부 서비스인 경우
FastAPI는 동기 def 라우터도 내부적으로 스레드 풀에서 실행해 블로킹을 방지한다. 작은 서비스라면 동기로도 충분하다.
변경된 파일 요약
app/
├── core/
│ └── database.py ← create_async_engine, AsyncSession, get_db async 전환
├── users/
│ ├── repository.py ← async def, select() 문법으로 전환
│ ├── service.py ← async def, await 추가
│ └── router.py ← async def, AsyncSession 주입
정리
동기 FastAPI
요청 → DB 대기(블로킹) → 응답
요청 → DB 대기(블로킹) → 응답
요청 → DB 대기(블로킹) → 응답
비동기 FastAPI
요청 → await DB ──────────── 응답
요청 ──── await DB ──── 응답
요청 ──────── await DB → 응답
(세 요청이 DB 대기 시간을 공유해서 더 빠름)
| 구분 | 동기 | 비동기 |
|---|---|---|
| 문법 | def | async def + await |
| DB 드라이버 | psycopg2 | asyncpg |
| SQLAlchemy | Session | AsyncSession |
| HTTP 클라이언트 | requests | httpx |
| 테스트 | pytest | pytest-asyncio |
I/O가 많은 API 서버에서 비동기는 동시 처리량을 크게 높인다. 특히 외부 API 호출이 많거나 DB 쿼리가 많은 서비스일수록 효과가 뚜렷하다.