Coverage for apps/outers/repositories/document_process_repository.py: 72%
43 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1from typing import List
2from uuid import UUID
4import sqlalchemy
5from sqlalchemy import exc
6from sqlalchemy.engine import ScalarResult
7from sqlalchemy.orm import aliased
8from sqlmodel import select
9from sqlmodel.ext.asyncio.session import AsyncSession
11from apps.inners.exceptions import repository_exception
12from apps.inners.models.daos.document import Document
13from apps.inners.models.daos.document_process import DocumentProcess
16class DocumentProcessRepository:
18 def __init__(self):
19 pass
21 async def find_many_by_account_id_with_pagination(
22 self,
23 session: AsyncSession,
24 account_id: UUID,
25 page_position: int,
26 page_size: int
27 ) -> List[DocumentProcess]:
28 initial_document: Document = aliased(Document)
29 final_document: Document = aliased(Document)
30 found_document_process_result: ScalarResult = await session.exec(
31 select(DocumentProcess)
32 .join(initial_document, initial_document.id == DocumentProcess.initial_document_id)
33 .join(final_document, final_document.id == DocumentProcess.final_document_id)
34 .where(initial_document.account_id == account_id)
35 .where(final_document.account_id == account_id)
36 .limit(page_size)
37 .offset(page_size * (page_position - 1))
38 )
39 found_document_processes: List[DocumentProcess] = list(found_document_process_result.all())
41 return found_document_processes
43 async def find_one_by_id_and_accound_id(self, session: AsyncSession, id: UUID, account_id: UUID) -> DocumentProcess:
44 try:
45 initial_document: Document = aliased(Document)
46 final_document: Document = aliased(Document)
47 found_document_process_result: ScalarResult = await session.exec(
48 select(DocumentProcess)
49 .join(initial_document, initial_document.id == DocumentProcess.initial_document_id)
50 .join(final_document, final_document.id == DocumentProcess.final_document_id)
51 .where(DocumentProcess.id == id)
52 .where(initial_document.account_id == account_id)
53 .where(final_document.account_id == account_id)
54 .limit(1)
55 )
56 found_document_process: DocumentProcess = found_document_process_result.one()
57 except sqlalchemy.exc.NoResultFound:
58 raise repository_exception.NotFound()
60 return found_document_process
62 def create_one(self, session: AsyncSession, document_process_creator: DocumentProcess) -> DocumentProcess:
63 try:
64 session.add(document_process_creator)
65 except sqlalchemy.exc.IntegrityError:
66 raise repository_exception.IntegrityError()
68 return document_process_creator
70 async def patch_one_by_id_and_account_id(self, session: AsyncSession, id: UUID, account_id: UUID,
71 document_process_patcher: DocumentProcess) -> DocumentProcess:
72 found_document_process: DocumentProcess = await self.find_one_by_id_and_accound_id(
73 session=session,
74 id=id,
75 account_id=account_id
76 )
77 found_document_process.patch_from(document_process_patcher.dict(exclude_none=True))
78 return found_document_process
80 async def delete_one_by_id_and_account_id(self, session: AsyncSession, id: UUID,
81 account_id: UUID) -> DocumentProcess:
82 found_document_process: DocumentProcess = await self.find_one_by_id_and_accound_id(
83 session=session,
84 id=id,
85 account_id=account_id
86 )
87 await session.delete(found_document_process)
88 return found_document_process