Coverage for apps/outers/interfaces/deliveries/middlewares/authorization_middleware.py: 69%

52 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-22 19:03 +0000

1from datetime import datetime, timezone 

2from typing import List 

3 

4import regex 

5from starlette import status 

6from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint 

7from starlette.requests import Request 

8from starlette.responses import Response 

9 

10from apps.inners.exceptions import repository_exception 

11from apps.inners.models.daos.session import Session 

12from apps.inners.models.dtos.content import Content 

13from apps.inners.use_cases.managements.session_management import SessionManagement 

14 

15 

16class AuthorizationMiddleware(BaseHTTPMiddleware): 

17 def __init__( 

18 self, 

19 app, 

20 session_management: SessionManagement 

21 ): 

22 super().__init__(app) 

23 self.session_management: SessionManagement = session_management 

24 self.unauthorized_path_patterns = [ 

25 r".*\/docs.*", 

26 r".*\/openapi.json.*", 

27 r".*\/authentications\/logins.*", 

28 r".*\/authentications\/registers.*", 

29 r".*\/authorizations\/refreshes.*" 

30 ] 

31 

32 def is_unauthorized_path(self, path: str) -> bool: 

33 pattern: str = "|".join(self.unauthorized_path_patterns) 

34 return regex.match(pattern, path) is not None 

35 

36 async def dispatch( 

37 self, 

38 request: Request, 

39 call_next: RequestResponseEndpoint 

40 ) -> Response: 

41 content: Content[Session] = Content( 

42 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

43 message=f"{self.__class__.__name__}.{self.dispatch.__name__}: Failed.", 

44 data=None 

45 ) 

46 

47 if self.is_unauthorized_path(request.url.path): 

48 response: Response = await call_next(request) 

49 return response 

50 

51 authorization_header = request.headers.get("Authorization") 

52 if not authorization_header: 

53 content.status_code = status.HTTP_401_UNAUTHORIZED 

54 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization header is missing." 

55 return content.to_response() 

56 

57 authorization: List[str] = authorization_header.split(" ") 

58 

59 if len(authorization) != 2: 

60 content.status_code = status.HTTP_401_UNAUTHORIZED 

61 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization is invalid." 

62 return content.to_response() 

63 

64 if authorization[0] != "Bearer": 

65 content.status_code = status.HTTP_401_UNAUTHORIZED 

66 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization scheme is invalid." 

67 return content.to_response() 

68 

69 access_token: str = authorization[1] 

70 

71 try: 

72 found_session: [Session] = await self.session_management.find_one_by_access_token( 

73 state=request.state, 

74 access_token=access_token 

75 ) 

76 except repository_exception.NotFound as exception: 

77 content.status_code = status.HTTP_404_NOT_FOUND 

78 content.message += f" {exception.caller.class_name}.{exception.caller.function_name}: {exception.__class__.__name__}" 

79 return content.to_response() 

80 

81 if found_session.access_token_expired_at < datetime.now(tz=timezone.utc): 

82 content.status_code = status.HTTP_401_UNAUTHORIZED 

83 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Access token is expired." 

84 return content.to_response() 

85 

86 request.state.authorized_session = found_session 

87 response: Response = await call_next(request) 

88 

89 return response