Coverage for apps/outers/datastores/four_datastore.py: 71%
41 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1import uuid
2from typing import Any, Optional, Dict, Union
4from langchain_community.vectorstores.milvus import Milvus
5from pymilvus import connections
7from apps.outers.settings.four_datastore_setting import FourDatastoreSetting
10class FourDatastore:
12 def __init__(
13 self,
14 four_datastore_setting: FourDatastoreSetting
15 ):
16 self.four_datastore_setting: FourDatastoreSetting = four_datastore_setting
17 self.alias = self.get_connection_alias(self.get_connection_args())
19 def get_connection_alias(self, connection_args: Dict[str, Any]) -> str:
20 host: Optional[str] = connection_args.get("host", None)
21 port: Optional[Union[str, int]] = connection_args.get("port", None)
22 address: Optional[str] = connection_args.get("address", None)
23 uri: Optional[str] = connection_args.get("uri", None)
24 user = connection_args.get("user", None)
26 if host is not None and port is not None:
27 given_address = str(host) + ":" + str(port)
28 elif uri is not None:
29 if uri.startswith("https://"):
30 given_address = uri.split("https://")[1]
31 elif uri.startswith("http://"):
32 given_address = uri.split("http://")[1]
33 else:
34 raise ValueError("Invalid Milvus URI: %s.", uri)
35 elif address is not None:
36 given_address = address
37 else:
38 given_address = None
40 if user is not None:
41 tmp_user = user
42 else:
43 tmp_user = ""
45 if given_address is not None:
46 for con in connections.list_connections():
47 addr = connections.get_connection_addr(con[0])
48 if (
49 con[1]
50 and ("address" in addr)
51 and (addr["address"] == given_address)
52 and ("user" in addr)
53 and (addr["user"] == tmp_user)
54 ):
55 return con[0]
57 alias = uuid.uuid4().hex
58 connections.connect(alias=alias, **connection_args)
59 return alias
61 def get_vector_store(self, *args: Any, **kwargs: Any) -> Milvus:
62 return Milvus(
63 connection_args=self.get_connection_args(),
64 *args,
65 **kwargs
66 )
68 def get_connection_args(self):
69 return {
70 "host": self.four_datastore_setting.DS_FOUR_HOST,
71 "port": self.four_datastore_setting.DS_FOUR_PORT,
72 "user": self.four_datastore_setting.DS_FOUR_USER,
73 "password": self.four_datastore_setting.DS_FOUR_PASSWORD,
74 }