Coverage for apps/outers/datastores/one_datastore.py: 68%
38 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1import traceback
2from typing import Any
4import asyncpg
5import sqlalchemy
6from sqlalchemy import exc
7from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
8from sqlmodel.ext.asyncio.session import AsyncSession
10from apps.inners.exceptions import datastore_exception
11from apps.outers.settings.one_datastore_setting import OneDatastoreSetting
14class OneDatastore:
16 def __init__(
17 self,
18 one_datastore_setting: OneDatastoreSetting
19 ):
20 self.one_datastore_setting: OneDatastoreSetting = one_datastore_setting
21 self.engine: AsyncEngine = create_async_engine(
22 url=self.one_datastore_setting.URL,
23 isolation_level="SERIALIZABLE"
24 )
26 def get_session(self):
27 session = AsyncSession(
28 bind=self.engine
29 )
30 return session
32 async def retryable(self, handler, max_retries: int = 10) -> Any:
33 retry_count: int = 0
34 while retry_count <= max_retries:
35 session: AsyncSession = self.get_session()
36 try:
37 await session.begin()
38 result: Any = await handler(session)
39 await session.commit()
40 await session.close()
41 return result
42 except sqlalchemy.exc.DBAPIError as exception:
43 await session.rollback()
44 await session.close()
45 if exception.orig.pgcode == asyncpg.exceptions.SerializationError.sqlstate:
46 retry_count += 1
47 continue
48 except Exception:
49 await session.rollback()
50 await session.close()
51 traceback.print_exc()
52 raise datastore_exception.HandlerError()
54 raise datastore_exception.MaxRetriesExceeded()