Files
onion-arch/app/infrastructure/db/sqlalchemy/repositories/user_repository.py
2026-01-30 16:34:56 +03:00

55 lines
1.7 KiB
Python

from __future__ import annotations
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.users.ports.user_repository import UserRepository
from app.domain.users.entities import User
from app.domain.users.value_objects import Email
from app.infrastructure.db.sqlalchemy.models import UserModel
class SqlAlchemyUserRepository(UserRepository):
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def add(self, user: User) -> None:
model = UserModel(
id=str(user.id),
email=str(user.email),
full_name=user.full_name,
is_active=user.is_active,
created_at=user.created_at,
)
self._session.add(model)
async def get_by_id(self, user_id: UUID) -> User | None:
stmt = select(UserModel).where(UserModel.id == str(user_id))
res = await self._session.execute(stmt)
model = res.scalar_one_or_none()
if model is None:
return None
return User(
id=UUID(model.id),
email=Email(model.email),
full_name=model.full_name,
is_active=model.is_active,
created_at=model.created_at,
)
async def get_by_email(self, email: str) -> User | None:
stmt = select(UserModel).where(UserModel.email == email)
res = await self._session.execute(stmt)
model = res.scalar_one_or_none()
if model is None:
return None
return User(
id=UUID(model.id),
email=Email(model.email),
full_name=model.full_name,
is_active=model.is_active,
created_at=model.created_at,
)