[Python] FastAPI MongoDB 연결 가이드
Intro
안녕하세요 Noah입니다.
MongoDB는 문서 중심 NoSQL 데이터베이스로, 문자열 그대로를 사용하기 때문에 빠르게 확장 가능하며 개발자 친화적인 데이터베이스입니다. FastAPI는 Python 기반 웹 API 프레임워크로, 빠르고 효율적인 API 개발을 지원합니다.
이 두 기술을 함께 사용해 웹 API를 만들어 보겠습니다.
본문
기본 조건
- Python 3.7 이상
- MongoDB 서버 (설치 및 실행)
MongoDB 연결 가이드
아래 로직을 따라 MongoDB와 FastAPI를 연동하여 사용자 정보를 저장하고 조회하는 코드를 작성해보겠습니다.
이 코드는 FastAPI와 MongoDB를 통합하여 간단하고 효율적인 API 서비스를 구축하는 방법을 구현해놨습니다.
main.py
from fastapi import FastAPI, Body
import log_util
from dto import CreateKepcoBillingDailyRequestDto
import service
app = FastAPI()
@app.post("/kepco")
async def create_room(create_dto: CreateKepcoBillingDailyRequestDto = Body(...)):
log_util.info(f"create_room")
response = service.create_kepco(create_dto)
log_util.info(f"create_room success: {response}")
return {"content": True}
@app.get("/kepco")
async def get_rooms():
log_util.info(f"create_dto")
response = service.get_kepco_list()
log_util.info(f"response : {response}")
return {"content": response}
@app.get("/kepco/{kepco_id}")
async def get_room_detail(kepco_id: str):
log_util.info("get_room_detail called")
response = service.get_kepco_by_id(kepco_id)
log_util.info(f"response : {response}")
return {"content": response}
- POST /kepco: 데이터를 생성 생성합니다.
- GET /kepco: 데이터를 검색해 가져옵니다.
- GET /kepco/{kepco_id}: 특정 데이터를 가져옵니다.
dto.py
from typing import Optional
from pydantic import BaseModel
class CreateKepcoBillingDailyRequestDto(BaseModel):
F_AP_QT_ALL: Optional[float]
F_AP_QT_ALL_SUM: Optional[float]
F_AP_QT_B: Optional[float]
F_AP_QT_B_SUM: Optional[float]
F_AP_QT_M: Optional[float]
F_AP_QT_M_SUM: Optional[float]
F_AP_QT_S: Optional[float]
F_AP_QT_S_SUM: Optional[float]
KWH_BILL_ALL: Optional[float]
KWH_BILL_ALL_SUM: Optional[float]
KWH_BILL_B: Optional[float]
KWH_BILL_B_SUM: Optional[float]
KWH_BILL_M: Optional[float]
KWH_BILL_M_SUM: Optional[float]
KWH_BILL_S: Optional[float]
KWH_BILL_S_SUM: Optional[float]
MAX_PWR: Optional[float]
이 파일은 데이터 전송 객체(DTO)를 정의합니다. DTO는 API 요청 및 응답 데이터를 유효성 검사합니다.
- CreateRoomDto: 방 생성에 필요한 데이터의 DTO입니다.
service.py
import dao
import log_util
from dto import CreateKepcoBillingDailyRequestDto
def get_kepco_list():
rooms = dao.find_kepco_list()
log_util.info(f"Retrieved rooms : {rooms}")
return rooms
def get_kepco_by_id(room_id: str):
return dao.find_kepco_by_id(room_id)
def create_kepco(create_dto: CreateKepcoBillingDailyRequestDto):
dao.insert_kepco(create_dto)
유저 생성과 유저 조회 함수를 정의합니다.
- get_rooms: 모든 방을 가져옵니다.
- get_room_by_id: 특정 ID로 방을 가져옵니다.
- create_room: 새로운 방을 생성합니다.
dao.py
from datetime import timedelta, timezone, datetime
from bson import ObjectId
import log_util
from database import client
from dto import CreateKepcoBillingDailyRequestDto
from models import KepcoBillingDaily
collection = 'kepco_billing_daily'
collection_temp = 'kepco_billing_daily_temp'
def find_kepco_list():
log_util.info(f"find_kepco_list START --------------------------------------")
# Mongo 쿼리는 이런 식으로 활용하면 됨
yesterday = "{:%Y-%m-%d %H:%M:%S}".format(datetime.now(timezone(timedelta(hours=9))) - timedelta(days=1))
search_pipeline = [
{"$sort": {"_id": -1}},
{'$match': {'F_AP_QT_ALL': {"$gt": 10}}},
{"$limit": 10}
]
cursor = client[collection].aggregate(search_pipeline)
if not cursor:
return None
result_list = [KepcoBillingDaily(result) for result in cursor]
log_util.info(f"result_list: {result_list}")
return result_list
def find_kepco_by_id(room_id: str):
log_util.info(f"find_kepco_by_id START --------------------------------------")
room_id = ObjectId(room_id)
response = client[collection].find_one({"_id": room_id})
if response:
return KepcoBillingDaily(response)
def insert_kepco(create_dto: CreateKepcoBillingDailyRequestDto):
log_util.info(f"insert_kepco START --------------------------------------")
create_dto = create_dto.model_dump()
# 데이터 삽입
log_util.info(f"create test; {create_dto}")
response = client[collection_temp].insert_one(create_dto)
# 삽입 결과 확인
if response.acknowledged:
log_util.info("데이터 삽입 성공!")
else:
log_util.error("데이터 삽입 실패:", response.error)
위 코드는 FastAPI와 MongoDB를 연동하여 사용자 정보를 저장하고 조회하는 코드입니다.
유저 생성과 유저 조회 함수를 정의합니다. 이때, models에 정의된 User에서 MongoDB의 ‘_id’를 Python Code에서 ‘id’로 변환하여 정상적으로 조회됩니다.
- find_rooms: 모든 방을 가져오는 쿼리를 실행합니다.
- find_room_by_id: 특정 ID로 방을 조회합니다.
- insert_room: 새로운 방을 데이터베이스에 삽입합니다.
models.py
class BaseMongoModel:
def __init__(self, obj: dict):
for key, value in obj.items():
if key == "_id":
self.id = str(value)
elif isinstance(value, float):
self.__setattr__(key, float(value))
elif isinstance(value, bool):
self.__setattr__(key, bool(value))
elif isinstance(value, str):
self.__setattr__(key, str(value))
class KepcoBillingDaily(BaseMongoModel):
id: str = None
F_AP_QT_ALL: float = None
F_AP_QT_ALL_SUM: float = None
F_AP_QT_B: float = None
F_AP_QT_B_SUM: float = None
F_AP_QT_M: float = None
F_AP_QT_M_SUM: float = None
F_AP_QT_S: float = None
F_AP_QT_S_SUM: float = None
KWH_BILL_ALL: float = None
KWH_BILL_ALL_SUM: float = None
KWH_BILL_B: float = None
KWH_BILL_B_SUM: float = None
KWH_BILL_M: float = None
KWH_BILL_M_SUM: float = None
KWH_BILL_S: float = None
KWH_BILL_S_SUM: float = None
MAX_PWR: float = None
model 객체를 생성 시 자동으로 변환이 가능하도록 BaseMongoModel을 상속받아 구현합니다.
- RoomCollection: 방의 모델입니다.
database.py
from pymongo.mongo_client import MongoClient
import certifi
import log_util
host = ''
query_param = 'ssl=true&authSource=admin'
port = 27017
username = ''
password = ''
ca = certifi.where()
uri = f"mongodb://{username}:{password}@{host}/?{query_param}"
dbname = ''
client = MongoClient(uri, tlsCAFile=ca)[dbname]
client.command('ping')
log_util.info("Pinged your deployment. You successfully greenos connected to MongoDB!")
maxPoolSize 옵션은 연결 풀의 최대 크기를 설정합니다. 기본값은 100입니다.
이 파일은 MongoDB와의 연결을 설정합니다. 인증 및 데이터베이스 선택을 처리합니다.
app_test.py
import json
import pytest
from starlette.testclient import TestClient
import log_util
from dto import CreateKepcoBillingDailyRequestDto
from main import app
class TestRoom:
client = TestClient(app)
def test_create_room_success(self):
"""
Test creating a room with valid data.
"""
data = CreateKepcoBillingDailyRequestDto(
F_AP_QT_ALL=1.0,
F_AP_QT_ALL_SUM=1.0,
F_AP_QT_B=1.0,
F_AP_QT_B_SUM=1.0,
F_AP_QT_M=1.0,
F_AP_QT_M_SUM=1.0,
F_AP_QT_S=1.0,
F_AP_QT_S_SUM=1.0,
KWH_BILL_ALL=1.0,
KWH_BILL_ALL_SUM=1.0,
KWH_BILL_B=1.0,
KWH_BILL_B_SUM=1.0,
KWH_BILL_M=1.0,
KWH_BILL_M_SUM=1.0,
KWH_BILL_S=1.0,
KWH_BILL_S_SUM=1.0,
MAX_PWR=1.0
)
response = self.client.post("/kepco", json=data.model_dump()) # json으로 전달해야 현재는 수신 가능
log_util.info(f"test_create_room_success : {response.json()}")
assert response.status_code == 200
def test_get_all_rooms(self):
"""
Test retrieving all rooms.
"""
response = self.client.get("/kepco")
log_util.info(f"test_get_all_rooms : {response.json()}")
assert response.status_code == 200
@pytest.mark.parametrize("id, expected_status_code", [
("000000000000000000000000", False),
("65cd50dec4b04b0b65598952", True)
])
def test_get_room_by_id_with_parametrization(self, id, expected_status_code):
response = self.client.get(f"/kepco/{id}")
content = response.json()
log_util.info(f"test_get_room_by_id_with_parametrization ({id}) : {content['content']} : {expected_status_code}")
assert expected_status_code == (content['content'] is not None)
이 파일은 방 관련 API에 대한 테스트를 정의한 내용입니다.
- test_create_room_success: 새로운 방을 성공적으로 생성하는지 테스트합니다.
- test_get_all_rooms: 모든 방을 성공적으로 검색하는지 테스트합니다.
- test_get_room_by_id_with_parametrization: 특정 ID로 방을 검색하는지 테스트합니다. 파라미터화된 테스트로 유효한 ID와 무효한 ID를 사용하여 테스트합니다.
requirement.txt
fastapi
hypercorn
pymongo
pytest-html
certifi
pydantic
httpx
이 파일에는 프로젝트에 필요한 모든 종속성이 명시되어 있습니다.
테스트 실행
pytest -o log_cli=true
pytest를 사용하여 테스트를 실행합니다. log_cli=true 옵션을 사용하여 테스트 로그를 표시합니다.
log.ini
[loggers]
keys=root,sLogger
[handlers]
keys=consoleHandler,fileHandler
[formatters]
keys=fileFormatter,consoleFormatter
[logger_root]
level=DEBUG
handlers=consoleHandler
[logger_sLogger]
level=DEBUG
handlers=consoleHandler,fileHandler
qualname=sLogger
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=consoleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=FileHandler
level=DEBUG
formatter=fileFormatter
args=('logfile.log',)
[formatter_fileFormatter]
format=[%(asctime)s] %(levelname)s : %(message)s
datefmt=
[formatter_consoleFormatter]
format=[%(asctime)s] %(levelname)s : %(message)s
datefmt=
log 관련 설정을 모아 둔 파일입니다.
log_util.py
import logging.config
# set up logging
logging.config.fileConfig("log.ini")
logger = logging.getLogger('sLogger')
def info(message):
logger.info(message)
def debug(message):
logger.debug(message)
def error(message):
logger.error(message)
def warning(message):
logger.warning(message)
def critical(message):
logger.critical(message)
RDBMS와의 차이점
-
스키마
MongoDB는 RDBMS와 다르게 스키마가 없습니다. 따라서 데이터베이스에 데이터를 저장할 때 스키마를 정의하지 않아도 됩니다. 또한, MongoDB는 JSON 형태의 데이터를 저장하므로, 데이터베이스에 저장된 데이터를 JSON 형태로 조회할 수 있습니다. 이러한 특징을 활용하여 데이터베이스에 저장된 데이터를 쉽게 관리할 수 있습니다. -
데이터베이스 커넥션 관리
RDBMS (SQLAlchemy 사용 시)
Session Maker 생성: sessionmaker 객체를 만들고 변수에 저장합니다.
Dependency 함수: get_session과 같은 dependency 함수를 만들어 요청마다 sessionmaker를 사용하여 새 세션을 생성합니다.
새로운 연결: 각 요청은 session을 배정받아 관리됩니다.
MongoDB
연결 풀: Motor는 기본적으로 비동기 연결 풀을 사용합니다.
새로운 연결 없음: 요청마다 새 연결을 생성하지 않고 애플리케이션 전체에서 AsyncIOMotorClient 인스턴스를 직접 사용합니다.
효율성: 이 방식은 효율적이며 불필요한 연결 오버헤드가 생기는 것을 방지해줍니다.
글을 마치며
이번 포스트에서는 FastAPI와 MongoDB를 결합하여 API 서비스를 구축하는 방법을 살펴보았습니다.
FastAPI를 사용하여 방 생성 및 조회 API를 작성하고 MongoDB와 연동하여 데이터 관리를 수행했으며, Pydantic을 활용하여 데이터 모델링과 유효성 검사를 간편하게 처리했습니다.
pytest를 활용하여 API 기능을 테스트하고 안정성을 확보했습니다. 이를 통해 FastAPI와 MongoDB를 연결하는 방법에 대해 확인할 수 있었습니다.
여러분도 FastAPI와 MongoDB를 활용하여 다양한 프로젝트를 구현하고 개발 경험을 쌓아나가기를 기대하겠습니다.
다들 즐코하세요~ 🚀