Coverage for apps/inners/use_cases/document_processor/partition_document_processor.py: 72%
67 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1import io
2import shutil
3from pathlib import Path
4from typing import List
5from uuid import UUID
7from pypdf import PdfReader, PdfWriter
8from starlette.datastructures import State
9from unstructured.documents.elements import Element
10from unstructured.partition.auto import partition
11from unstructured.partition.html import partition_html
12from unstructured.partition.text import partition_text
13from unstructured.partition.utils.constants import PartitionStrategy
15from apps.inners.exceptions import use_case_exception
16from apps.inners.models.daos.document import Document
17from apps.inners.models.dtos.constants.document_type_constant import DocumentTypeConstant
18from apps.inners.models.dtos.contracts.responses.managements.documents.file_document_response import \
19 FileDocumentResponse
20from apps.inners.models.dtos.contracts.responses.managements.documents.text_document_response import \
21 TextDocumentResponse
22from apps.inners.models.dtos.contracts.responses.managements.documents.web_document_response import WebDocumentResponse
23from apps.inners.use_cases.managements.document_management import DocumentManagement
24from apps.inners.use_cases.managements.file_document_management import FileDocumentManagement
25from apps.inners.use_cases.managements.text_document_management import TextDocumentManagement
26from apps.inners.use_cases.managements.web_document_management import WebDocumentManagement
29class PartitionDocumentProcessor:
30 def __init__(
31 self,
32 document_management: DocumentManagement,
33 file_document_management: FileDocumentManagement,
34 text_document_management: TextDocumentManagement,
35 web_document_management: WebDocumentManagement,
36 ):
37 self.document_management = document_management
38 self.file_document_management = file_document_management
39 self.text_document_management = text_document_management
40 self.web_document_management = web_document_management
42 @staticmethod
43 def split_pdf_page(file_data: bytes, start_page: int, end_page: int) -> bytes:
44 input_file_io = io.BytesIO(file_data)
45 input_pdf_reader = PdfReader(input_file_io)
46 output_pdf_writer = PdfWriter()
47 for page in input_pdf_reader.pages[start_page - 1:end_page]:
48 output_pdf_writer.add_page(page)
49 output_file_io = io.BytesIO()
50 output_pdf_writer.write(output_file_io)
51 output_file_data = output_file_io.getvalue()
52 input_file_io.close()
53 output_file_io.close()
55 return output_file_data
57 async def partition_file(self, state: State, found_document: Document,
58 partition_strategy: str = PartitionStrategy.AUTO) -> List[Element]:
59 found_file_document: FileDocumentResponse = await self.file_document_management.find_one_by_id_with_authorization(
60 state=state,
61 id=found_document.id
62 )
63 file_data: bytes = self.file_document_management.file_document_repository.get_object_data(
64 object_name=found_file_document.file_name
65 )
66 extracted_image_path: Path = self.file_document_management.file_document_repository.file_path / "assets" / found_file_document.file_data_hash
67 extracted_image_path.mkdir(exist_ok=True, parents=True)
68 shutil.rmtree(extracted_image_path)
69 elements: List[Element] = partition(
70 file=io.BytesIO(file_data),
71 extract_image_block_types=["Image"],
72 extract_image_block_output_dir=str(extracted_image_path),
73 strategy=partition_strategy,
74 hi_res_model_name="yolox",
75 )
77 return elements
79 async def partition_text(self, state: State, found_document: Document) -> List[Element]:
80 found_text_document: TextDocumentResponse = await self.text_document_management.find_one_by_id_with_authorization(
81 state=state,
82 id=found_document.id
83 )
84 elements: List[Element] = partition_text(
85 text=found_text_document.text_content,
86 )
88 return elements
90 async def partition_web(self, state: State, found_document: Document) -> List[Element]:
91 found_web_document: WebDocumentResponse = await self.web_document_management.find_one_by_id_with_authorization(
92 state=state,
93 id=found_document.id
94 )
95 elements: List[Element] = partition_html(
96 url=found_web_document.web_url,
97 ssl_verify=False
98 )
100 return elements
102 async def partition(self, state: State, document_id: UUID, file_partition_strategy: str) -> List[Element]:
103 found_document: Document = await self.document_management.find_one_by_id_with_authorization(
104 state=state,
105 id=document_id
106 )
107 if found_document.document_type_id == DocumentTypeConstant.FILE:
108 elements: List[Element] = await self.partition_file(
109 state=state,
110 found_document=found_document,
111 partition_strategy=file_partition_strategy
112 )
113 elif found_document.document_type_id == DocumentTypeConstant.TEXT:
114 elements: List[Element] = await self.partition_text(
115 state=state,
116 found_document=found_document
117 )
118 elif found_document.document_type_id == DocumentTypeConstant.WEB:
119 elements: List[Element] = await self.partition_web(
120 state=state,
121 found_document=found_document
122 )
123 else:
124 raise use_case_exception.DocumentTypeNotSupported()
126 return elements