Coverage for apps/outers/repositories/document_process_repository.py: 72%

43 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1from typing import List 

2from uuid import UUID 

3 

4import sqlalchemy 

5from sqlalchemy import exc 

6from sqlalchemy.engine import ScalarResult 

7from sqlalchemy.orm import aliased 

8from sqlmodel import select 

9from sqlmodel.ext.asyncio.session import AsyncSession 

10 

11from apps.inners.exceptions import repository_exception 

12from apps.inners.models.daos.document import Document 

13from apps.inners.models.daos.document_process import DocumentProcess 

14 

15 

16class DocumentProcessRepository: 

17 

18 def __init__(self): 

19 pass 

20 

21 async def find_many_by_account_id_with_pagination( 

22 self, 

23 session: AsyncSession, 

24 account_id: UUID, 

25 page_position: int, 

26 page_size: int 

27 ) -> List[DocumentProcess]: 

28 initial_document: Document = aliased(Document) 

29 final_document: Document = aliased(Document) 

30 found_document_process_result: ScalarResult = await session.exec( 

31 select(DocumentProcess) 

32 .join(initial_document, initial_document.id == DocumentProcess.initial_document_id) 

33 .join(final_document, final_document.id == DocumentProcess.final_document_id) 

34 .where(initial_document.account_id == account_id) 

35 .where(final_document.account_id == account_id) 

36 .limit(page_size) 

37 .offset(page_size * (page_position - 1)) 

38 ) 

39 found_document_processes: List[DocumentProcess] = list(found_document_process_result.all()) 

40 

41 return found_document_processes 

42 

43 async def find_one_by_id_and_accound_id(self, session: AsyncSession, id: UUID, account_id: UUID) -> DocumentProcess: 

44 try: 

45 initial_document: Document = aliased(Document) 

46 final_document: Document = aliased(Document) 

47 found_document_process_result: ScalarResult = await session.exec( 

48 select(DocumentProcess) 

49 .join(initial_document, initial_document.id == DocumentProcess.initial_document_id) 

50 .join(final_document, final_document.id == DocumentProcess.final_document_id) 

51 .where(DocumentProcess.id == id) 

52 .where(initial_document.account_id == account_id) 

53 .where(final_document.account_id == account_id) 

54 .limit(1) 

55 ) 

56 found_document_process: DocumentProcess = found_document_process_result.one() 

57 except sqlalchemy.exc.NoResultFound: 

58 raise repository_exception.NotFound() 

59 

60 return found_document_process 

61 

62 def create_one(self, session: AsyncSession, document_process_creator: DocumentProcess) -> DocumentProcess: 

63 try: 

64 session.add(document_process_creator) 

65 except sqlalchemy.exc.IntegrityError: 

66 raise repository_exception.IntegrityError() 

67 

68 return document_process_creator 

69 

70 async def patch_one_by_id_and_account_id(self, session: AsyncSession, id: UUID, account_id: UUID, 

71 document_process_patcher: DocumentProcess) -> DocumentProcess: 

72 found_document_process: DocumentProcess = await self.find_one_by_id_and_accound_id( 

73 session=session, 

74 id=id, 

75 account_id=account_id 

76 ) 

77 found_document_process.patch_from(document_process_patcher.dict(exclude_none=True)) 

78 return found_document_process 

79 

80 async def delete_one_by_id_and_account_id(self, session: AsyncSession, id: UUID, 

81 account_id: UUID) -> DocumentProcess: 

82 found_document_process: DocumentProcess = await self.find_one_by_id_and_accound_id( 

83 session=session, 

84 id=id, 

85 account_id=account_id 

86 ) 

87 await session.delete(found_document_process) 

88 return found_document_process