Coverage for apps/outers/interfaces/deliveries/middlewares/authorization_middleware.py: 69%
52 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-22 19:03 +0000
1from datetime import datetime, timezone
2from typing import List
4import regex
5from starlette import status
6from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
7from starlette.requests import Request
8from starlette.responses import Response
10from apps.inners.exceptions import repository_exception
11from apps.inners.models.daos.session import Session
12from apps.inners.models.dtos.content import Content
13from apps.inners.use_cases.managements.session_management import SessionManagement
16class AuthorizationMiddleware(BaseHTTPMiddleware):
17 def __init__(
18 self,
19 app,
20 session_management: SessionManagement
21 ):
22 super().__init__(app)
23 self.session_management: SessionManagement = session_management
24 self.unauthorized_path_patterns = [
25 r".*\/docs.*",
26 r".*\/openapi.json.*",
27 r".*\/authentications\/logins.*",
28 r".*\/authentications\/registers.*",
29 r".*\/authorizations\/refreshes.*"
30 ]
32 def is_unauthorized_path(self, path: str) -> bool:
33 pattern: str = "|".join(self.unauthorized_path_patterns)
34 return regex.match(pattern, path) is not None
36 async def dispatch(
37 self,
38 request: Request,
39 call_next: RequestResponseEndpoint
40 ) -> Response:
41 content: Content[Session] = Content(
42 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
43 message=f"{self.__class__.__name__}.{self.dispatch.__name__}: Failed.",
44 data=None
45 )
47 if self.is_unauthorized_path(request.url.path):
48 response: Response = await call_next(request)
49 return response
51 authorization_header = request.headers.get("Authorization")
52 if not authorization_header:
53 content.status_code = status.HTTP_401_UNAUTHORIZED
54 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization header is missing."
55 return content.to_response()
57 authorization: List[str] = authorization_header.split(" ")
59 if len(authorization) != 2:
60 content.status_code = status.HTTP_401_UNAUTHORIZED
61 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization is invalid."
62 return content.to_response()
64 if authorization[0] != "Bearer":
65 content.status_code = status.HTTP_401_UNAUTHORIZED
66 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Authorization scheme is invalid."
67 return content.to_response()
69 access_token: str = authorization[1]
71 try:
72 found_session: [Session] = await self.session_management.find_one_by_access_token(
73 state=request.state,
74 access_token=access_token
75 )
76 except repository_exception.NotFound as exception:
77 content.status_code = status.HTTP_404_NOT_FOUND
78 content.message += f" {exception.caller.class_name}.{exception.caller.function_name}: {exception.__class__.__name__}"
79 return content.to_response()
81 if found_session.access_token_expired_at < datetime.now(tz=timezone.utc):
82 content.status_code = status.HTTP_401_UNAUTHORIZED
83 content.message += f" {self.__class__.__name__}.{self.dispatch.__name__}: Access token is expired."
84 return content.to_response()
86 request.state.authorized_session = found_session
87 response: Response = await call_next(request)
89 return response