Coverage for apps/outers/datastores/one_datastore.py: 68%

38 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1import traceback 

2from typing import Any 

3 

4import asyncpg 

5import sqlalchemy 

6from sqlalchemy import exc 

7from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine 

8from sqlmodel.ext.asyncio.session import AsyncSession 

9 

10from apps.inners.exceptions import datastore_exception 

11from apps.outers.settings.one_datastore_setting import OneDatastoreSetting 

12 

13 

14class OneDatastore: 

15 

16 def __init__( 

17 self, 

18 one_datastore_setting: OneDatastoreSetting 

19 ): 

20 self.one_datastore_setting: OneDatastoreSetting = one_datastore_setting 

21 self.engine: AsyncEngine = create_async_engine( 

22 url=self.one_datastore_setting.URL, 

23 isolation_level="SERIALIZABLE" 

24 ) 

25 

26 def get_session(self): 

27 session = AsyncSession( 

28 bind=self.engine 

29 ) 

30 return session 

31 

32 async def retryable(self, handler, max_retries: int = 10) -> Any: 

33 retry_count: int = 0 

34 while retry_count <= max_retries: 

35 session: AsyncSession = self.get_session() 

36 try: 

37 await session.begin() 

38 result: Any = await handler(session) 

39 await session.commit() 

40 await session.close() 

41 return result 

42 except sqlalchemy.exc.DBAPIError as exception: 

43 await session.rollback() 

44 await session.close() 

45 if exception.orig.pgcode == asyncpg.exceptions.SerializationError.sqlstate: 

46 retry_count += 1 

47 continue 

48 except Exception: 

49 await session.rollback() 

50 await session.close() 

51 traceback.print_exc() 

52 raise datastore_exception.HandlerError() 

53 

54 raise datastore_exception.MaxRetriesExceeded()