Coverage for apps/inners/use_cases/managements/text_document_management.py: 75%
55 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1import hashlib
2import uuid
3from typing import List
4from uuid import UUID
6from starlette.datastructures import State
8from apps.inners.models.daos.document import Document
9from apps.inners.models.daos.text_document import TextDocument
10from apps.inners.models.dtos.constants.document_type_constant import DocumentTypeConstant
11from apps.inners.models.dtos.contracts.requests.managements.text_documents.create_one_body import CreateOneBody
12from apps.inners.models.dtos.contracts.requests.managements.text_documents.patch_one_body import PatchOneBody
13from apps.inners.models.dtos.contracts.responses.managements.documents.text_document_response import \
14 TextDocumentResponse
15from apps.inners.use_cases.managements.document_management import DocumentManagement
16from apps.outers.repositories.text_document_repository import TextDocumentRepository
19class TextDocumentManagement:
20 def __init__(
21 self,
22 document_management: DocumentManagement,
23 text_document_repository: TextDocumentRepository,
24 ):
25 self.document_management: DocumentManagement = document_management
26 self.text_document_repository: TextDocumentRepository = text_document_repository
28 async def find_many_with_authorization_and_pagination(self, state: State, page_position: int, page_size: int) -> \
29 List[
30 TextDocumentResponse]:
31 found_text_documents: List[
32 TextDocument
33 ] = await self.text_document_repository.find_many_by_account_id_with_pagination(
34 session=state.session,
35 account_id=state.authorized_session.account_id,
36 page_position=page_position,
37 page_size=page_size
38 )
39 found_documents: List[
40 Document
41 ] = await self.document_management.document_repository.find_many_by_id_and_account_id(
42 session=state.session,
43 ids=[found_text_document.id for found_text_document in found_text_documents],
44 account_id=state.authorized_session.account_id
45 )
46 found_text_document_responses: List[TextDocumentResponse] = []
47 for found_document, found_text_document in zip(found_documents, found_text_documents, strict=True):
48 found_text_document_response: TextDocumentResponse = TextDocumentResponse(
49 id=found_document.id,
50 name=found_document.name,
51 description=found_document.description,
52 document_type_id=found_document.document_type_id,
53 account_id=found_document.account_id,
54 text_content=found_text_document.text_content,
55 text_content_hash=found_text_document.text_content_hash
56 )
57 found_text_document_responses.append(found_text_document_response)
59 return found_text_document_responses
61 async def find_one_by_id_with_authorization(self, state: State, id: UUID) -> TextDocumentResponse:
62 found_document: Document = await self.document_management.find_one_by_id_with_authorization(
63 state=state,
64 id=id
65 )
66 found_text_document: TextDocument = await self.text_document_repository.find_one_by_id_and_account_id(
67 session=state.session,
68 id=id,
69 account_id=state.authorized_session.account_id
70 )
71 found_text_document_response: TextDocumentResponse = TextDocumentResponse(
72 id=found_document.id,
73 name=found_document.name,
74 description=found_document.description,
75 document_type_id=found_document.document_type_id,
76 account_id=found_document.account_id,
77 text_content=found_text_document.text_content,
78 text_content_hash=found_text_document.text_content_hash
79 )
81 return found_text_document_response
83 async def create_one(self, state: State, body: CreateOneBody) -> TextDocumentResponse:
84 document_creator: Document = Document(
85 id=uuid.uuid4(),
86 name=body.name,
87 description=body.description,
88 document_type_id=DocumentTypeConstant.TEXT,
89 account_id=body.account_id
90 )
91 created_document: Document = self.document_management.create_one_raw(
92 state=state,
93 document_creator=document_creator
94 )
95 text_document_creator: TextDocument = TextDocument(
96 id=created_document.id,
97 text_content=body.text_content,
98 text_content_hash=hashlib.sha256(body.text_content.encode()).hexdigest()
99 )
100 created_text_document: TextDocument = self.create_one_raw(
101 state=state,
102 text_document_creator=text_document_creator
103 )
104 text_document_response: TextDocumentResponse = TextDocumentResponse(
105 id=created_document.id,
106 name=created_document.name,
107 description=created_document.description,
108 document_type_id=created_document.document_type_id,
109 account_id=created_document.account_id,
110 text_content=created_text_document.text_content,
111 text_content_hash=created_text_document.text_content_hash
112 )
114 return text_document_response
116 def create_one_raw(self, state: State, text_document_creator: TextDocument) -> TextDocument:
117 created_text_document: TextDocument = self.text_document_repository.create_one(
118 session=state.session,
119 text_document_creator=text_document_creator
120 )
122 return created_text_document
124 async def patch_one_by_id_with_authorization(self, state: State, id: UUID,
125 body: PatchOneBody) -> TextDocumentResponse:
126 document_patcher: Document = Document(
127 id=id,
128 name=body.name,
129 description=body.description,
130 document_type_id=DocumentTypeConstant.TEXT,
131 account_id=body.account_id
132 )
133 patched_document: Document = await self.document_management.patch_one_by_id_raw_with_authorization(
134 state=state,
135 id=id,
136 document_patcher=document_patcher
137 )
138 text_document_patcher: TextDocument = TextDocument(
139 id=id,
140 text_content=body.text_content,
141 text_content_hash=hashlib.sha256(body.text_content.encode()).hexdigest()
142 )
143 patched_text_document: TextDocument = await self.patch_one_by_id_raw_with_authorization(
144 state=state,
145 id=id,
146 text_document_patcher=text_document_patcher
147 )
148 patched_text_document_response: TextDocumentResponse = TextDocumentResponse(
149 id=patched_document.id,
150 name=patched_document.name,
151 description=patched_document.description,
152 document_type_id=patched_document.document_type_id,
153 account_id=patched_document.account_id,
154 text_content=patched_text_document.text_content,
155 text_content_hash=patched_text_document.text_content_hash
156 )
158 return patched_text_document_response
160 async def patch_one_by_id_raw_with_authorization(self, state: State, id: UUID,
161 text_document_patcher: TextDocument) -> TextDocument:
162 patched_text_document: TextDocument = await self.text_document_repository.patch_one_by_id_and_account_id(
163 session=state.session,
164 id=id,
165 account_id=state.authorized_session.account_id,
166 text_document_patcher=text_document_patcher
167 )
169 return patched_text_document
171 async def delete_one_by_id_with_authorization(self, state: State, id: UUID) -> TextDocumentResponse:
172 deleted_text_document: TextDocument = await self.text_document_repository.delete_one_by_id_and_account_id(
173 session=state.session,
174 id=id,
175 account_id=state.authorized_session.account_id
176 )
177 deleted_document: Document = await self.document_management.delete_one_by_id_with_authorization(
178 state=state,
179 id=id
180 )
181 deleted_text_document_response: TextDocumentResponse = TextDocumentResponse(
182 id=deleted_text_document.id,
183 name=deleted_document.name,
184 description=deleted_document.description,
185 document_type_id=deleted_document.document_type_id,
186 account_id=deleted_document.account_id,
187 text_content=deleted_text_document.text_content,
188 text_content_hash=deleted_text_document.text_content_hash
189 )
191 return deleted_text_document_response