Coverage for apps/inners/use_cases/managements/session_management.py: 76%

25 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1from uuid import UUID 

2 

3from starlette.datastructures import State 

4 

5from apps.inners.models.daos.session import Session 

6from apps.outers.repositories.session_repository import SessionRepository 

7 

8 

9class SessionManagement: 

10 def __init__( 

11 self, 

12 session_repository: SessionRepository, 

13 ): 

14 self.session_repository: SessionRepository = session_repository 

15 

16 async def find_one_by_account_id(self, state: State, account_id: UUID) -> Session: 

17 found_session: Session = await self.session_repository.find_one_by_account_id( 

18 session=state.session, 

19 account_id=account_id 

20 ) 

21 return found_session 

22 

23 async def find_one_by_access_token(self, state: State, access_token: str) -> Session: 

24 found_session: Session = await self.session_repository.find_one_by_access_token( 

25 session=state.session, 

26 access_token=access_token 

27 ) 

28 return found_session 

29 

30 async def find_one_by_refresh_token(self, state: State, refresh_token: str) -> Session: 

31 found_session: Session = await self.session_repository.find_one_by_refresh_token( 

32 session=state.session, 

33 refresh_token=refresh_token 

34 ) 

35 return found_session 

36 

37 def create_one_raw(self, state: State, session_creator: Session) -> Session: 

38 created_session: Session = self.session_repository.create_one( 

39 session=state.session, 

40 session_creator=session_creator 

41 ) 

42 return created_session 

43 

44 async def patch_one_by_id_raw(self, state: State, id: UUID, session_patcher: Session) -> Session: 

45 patched_session: Session = await self.session_repository.patch_one_by_id( 

46 session=state.session, 

47 id=id, 

48 session_patcher=session_patcher 

49 ) 

50 return patched_session 

51 

52 async def delete_one_by_id(self, state: State, id: UUID) -> Session: 

53 deleted_session: Session = await self.session_repository.delete_one_by_id( 

54 session=state.session, 

55 id=id 

56 ) 

57 return deleted_session