Coverage for apps/inners/use_cases/document_processor/partition_document_processor.py: 72%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1import io 

2import shutil 

3from pathlib import Path 

4from typing import List 

5from uuid import UUID 

6 

7from pypdf import PdfReader, PdfWriter 

8from starlette.datastructures import State 

9from unstructured.documents.elements import Element 

10from unstructured.partition.auto import partition 

11from unstructured.partition.html import partition_html 

12from unstructured.partition.text import partition_text 

13from unstructured.partition.utils.constants import PartitionStrategy 

14 

15from apps.inners.exceptions import use_case_exception 

16from apps.inners.models.daos.document import Document 

17from apps.inners.models.dtos.constants.document_type_constant import DocumentTypeConstant 

18from apps.inners.models.dtos.contracts.responses.managements.documents.file_document_response import \ 

19 FileDocumentResponse 

20from apps.inners.models.dtos.contracts.responses.managements.documents.text_document_response import \ 

21 TextDocumentResponse 

22from apps.inners.models.dtos.contracts.responses.managements.documents.web_document_response import WebDocumentResponse 

23from apps.inners.use_cases.managements.document_management import DocumentManagement 

24from apps.inners.use_cases.managements.file_document_management import FileDocumentManagement 

25from apps.inners.use_cases.managements.text_document_management import TextDocumentManagement 

26from apps.inners.use_cases.managements.web_document_management import WebDocumentManagement 

27 

28 

29class PartitionDocumentProcessor: 

30 def __init__( 

31 self, 

32 document_management: DocumentManagement, 

33 file_document_management: FileDocumentManagement, 

34 text_document_management: TextDocumentManagement, 

35 web_document_management: WebDocumentManagement, 

36 ): 

37 self.document_management = document_management 

38 self.file_document_management = file_document_management 

39 self.text_document_management = text_document_management 

40 self.web_document_management = web_document_management 

41 

42 @staticmethod 

43 def split_pdf_page(file_data: bytes, start_page: int, end_page: int) -> bytes: 

44 input_file_io = io.BytesIO(file_data) 

45 input_pdf_reader = PdfReader(input_file_io) 

46 output_pdf_writer = PdfWriter() 

47 for page in input_pdf_reader.pages[start_page - 1:end_page]: 

48 output_pdf_writer.add_page(page) 

49 output_file_io = io.BytesIO() 

50 output_pdf_writer.write(output_file_io) 

51 output_file_data = output_file_io.getvalue() 

52 input_file_io.close() 

53 output_file_io.close() 

54 

55 return output_file_data 

56 

57 async def partition_file(self, state: State, found_document: Document, 

58 partition_strategy: str = PartitionStrategy.AUTO) -> List[Element]: 

59 found_file_document: FileDocumentResponse = await self.file_document_management.find_one_by_id_with_authorization( 

60 state=state, 

61 id=found_document.id 

62 ) 

63 file_data: bytes = self.file_document_management.file_document_repository.get_object_data( 

64 object_name=found_file_document.file_name 

65 ) 

66 extracted_image_path: Path = self.file_document_management.file_document_repository.file_path / "assets" / found_file_document.file_data_hash 

67 extracted_image_path.mkdir(exist_ok=True, parents=True) 

68 shutil.rmtree(extracted_image_path) 

69 elements: List[Element] = partition( 

70 file=io.BytesIO(file_data), 

71 extract_image_block_types=["Image"], 

72 extract_image_block_output_dir=str(extracted_image_path), 

73 strategy=partition_strategy, 

74 hi_res_model_name="yolox", 

75 ) 

76 

77 return elements 

78 

79 async def partition_text(self, state: State, found_document: Document) -> List[Element]: 

80 found_text_document: TextDocumentResponse = await self.text_document_management.find_one_by_id_with_authorization( 

81 state=state, 

82 id=found_document.id 

83 ) 

84 elements: List[Element] = partition_text( 

85 text=found_text_document.text_content, 

86 ) 

87 

88 return elements 

89 

90 async def partition_web(self, state: State, found_document: Document) -> List[Element]: 

91 found_web_document: WebDocumentResponse = await self.web_document_management.find_one_by_id_with_authorization( 

92 state=state, 

93 id=found_document.id 

94 ) 

95 elements: List[Element] = partition_html( 

96 url=found_web_document.web_url, 

97 ssl_verify=False 

98 ) 

99 

100 return elements 

101 

102 async def partition(self, state: State, document_id: UUID, file_partition_strategy: str) -> List[Element]: 

103 found_document: Document = await self.document_management.find_one_by_id_with_authorization( 

104 state=state, 

105 id=document_id 

106 ) 

107 if found_document.document_type_id == DocumentTypeConstant.FILE: 

108 elements: List[Element] = await self.partition_file( 

109 state=state, 

110 found_document=found_document, 

111 partition_strategy=file_partition_strategy 

112 ) 

113 elif found_document.document_type_id == DocumentTypeConstant.TEXT: 

114 elements: List[Element] = await self.partition_text( 

115 state=state, 

116 found_document=found_document 

117 ) 

118 elif found_document.document_type_id == DocumentTypeConstant.WEB: 

119 elements: List[Element] = await self.partition_web( 

120 state=state, 

121 found_document=found_document 

122 ) 

123 else: 

124 raise use_case_exception.DocumentTypeNotSupported() 

125 

126 return elements