Coverage for apps/outers/datastores/four_datastore.py: 71%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1import uuid 

2from typing import Any, Optional, Dict, Union 

3 

4from langchain_community.vectorstores.milvus import Milvus 

5from pymilvus import connections 

6 

7from apps.outers.settings.four_datastore_setting import FourDatastoreSetting 

8 

9 

10class FourDatastore: 

11 

12 def __init__( 

13 self, 

14 four_datastore_setting: FourDatastoreSetting 

15 ): 

16 self.four_datastore_setting: FourDatastoreSetting = four_datastore_setting 

17 self.alias = self.get_connection_alias(self.get_connection_args()) 

18 

19 def get_connection_alias(self, connection_args: Dict[str, Any]) -> str: 

20 host: Optional[str] = connection_args.get("host", None) 

21 port: Optional[Union[str, int]] = connection_args.get("port", None) 

22 address: Optional[str] = connection_args.get("address", None) 

23 uri: Optional[str] = connection_args.get("uri", None) 

24 user = connection_args.get("user", None) 

25 

26 if host is not None and port is not None: 

27 given_address = str(host) + ":" + str(port) 

28 elif uri is not None: 

29 if uri.startswith("https://"): 

30 given_address = uri.split("https://")[1] 

31 elif uri.startswith("http://"): 

32 given_address = uri.split("http://")[1] 

33 else: 

34 raise ValueError("Invalid Milvus URI: %s.", uri) 

35 elif address is not None: 

36 given_address = address 

37 else: 

38 given_address = None 

39 

40 if user is not None: 

41 tmp_user = user 

42 else: 

43 tmp_user = "" 

44 

45 if given_address is not None: 

46 for con in connections.list_connections(): 

47 addr = connections.get_connection_addr(con[0]) 

48 if ( 

49 con[1] 

50 and ("address" in addr) 

51 and (addr["address"] == given_address) 

52 and ("user" in addr) 

53 and (addr["user"] == tmp_user) 

54 ): 

55 return con[0] 

56 

57 alias = uuid.uuid4().hex 

58 connections.connect(alias=alias, **connection_args) 

59 return alias 

60 

61 def get_vector_store(self, *args: Any, **kwargs: Any) -> Milvus: 

62 return Milvus( 

63 connection_args=self.get_connection_args(), 

64 *args, 

65 **kwargs 

66 ) 

67 

68 def get_connection_args(self): 

69 return { 

70 "host": self.four_datastore_setting.DS_FOUR_HOST, 

71 "port": self.four_datastore_setting.DS_FOUR_PORT, 

72 "user": self.four_datastore_setting.DS_FOUR_USER, 

73 "password": self.four_datastore_setting.DS_FOUR_PASSWORD, 

74 }