Coverage for apps/outers/datastores/three_datastore.py: 92%
13 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1from typing import List
3from minio import Minio
5from apps.outers.settings.three_datastore_setting import ThreeDatastoreSetting
8class ThreeDatastore:
10 def __init__(
11 self,
12 three_datastore_setting: ThreeDatastoreSetting
13 ):
14 self.three_datastore_setting: ThreeDatastoreSetting = three_datastore_setting
15 self.client: Minio = Minio(
16 endpoint=f"{self.three_datastore_setting.DS_THREE_HOST}:{self.three_datastore_setting.DS_THREE_PORT}",
17 access_key=self.three_datastore_setting.DS_THREE_USER,
18 secret_key=self.three_datastore_setting.DS_THREE_PASSWORD,
19 secure=False,
20 )
21 self.bucket_names: List[str] = [
22 "research-assistant-backend.file-documents"
23 ]
24 self.prepare_buckets(self.bucket_names)
26 def prepare_buckets(self, bucket_names: List[str]):
27 for bucket_name in bucket_names:
28 if not self.client.bucket_exists(bucket_name):
29 self.client.make_bucket(bucket_name)