FastAPI 使用JWT认证的中间件
FastAPI 使用JWT认证的中间件
fastapi的中间件还是太少,单独开发JWT需要,starlette本身提供认证相关实现,只需要自定义一个AuthenticationBackend即可,本次我们实现使用中间价方式拆包JWT的令牌,获取payload里面的用户信息
私有定义的payload内容格式如下
{ "usid": "SkDQBhEjUfygRSeEBech", //UUID Short "uname": "test user name", //Username "mid":"700010001" // Member ID } |
调用代码
app = FastAPI() app.add_middleware(AuthenticationMiddleware,backend=JWTAuthenticationBackend(secret_key="YOUR_SECRET_KEY")) |
完整的代码
import jwt from starlette.authentication import ( AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials, UnauthenticatedUser ) class JWTUser(BaseUser): def __init__(self, user_id_short: str, member_number: str, user_name: str,token: str, payload: dict) -> None: self.user_name = user_name self.user_id_short = user_id_short self.member_number = member_number self.token = token self.payload = payload @property def is_authenticated(self) -> bool: return True @property def display_name(self) -> str: return self.user_name ## 会员名处理,加*? @property def display_user_id_short(self) -> str: return self.user_id_short ## 会员id处理 @property def display_member_number(self) -> str: return self.member_number ## 会员号处理,加*? class JWTAuthenticationBackend(AuthenticationBackend): def __init__(self, secret_key: str, algorithm: str = 'HS256', prefix: str = 'JWT', user_name_field:str = 'uname' , user_id_field: str = 'usid',member_number_field: str = 'mid'): self.secret_key = secret_key self.algorithm = algorithm self.prefix = prefix self.user_name_field = user_name_field self.user_id_field = user_id_field self.member_number_field = member_number_field @classmethod def get_token_from_header(cls, authorization: str, prefix: str): """ Parses the Authorization header and returns only the token :param authorization: :return: """ try: scheme, token = authorization.split() except ValueError: raise AuthenticationError('Could not separate Authorization scheme and token') if scheme.lower() != prefix.lower(): raise AuthenticationError(f'Authorization scheme {scheme} is not supported') return token async def authenticate(self, request): if "Authorization" not in request.headers: return None auth = request.headers["Authorization"] token = self.get_token_from_header(authorization=auth, prefix=self.prefix) try: payload = jwt.decode(token, key=self.secret_key, algorithms=self.algorithm) except jwt.InvalidTokenError as e: raise AuthenticationError(str(e)) return AuthCredentials(["jwt_authenticated"]), JWTUser(user_id_short=payload[self.user_id_field], member_number=payload[self.member_number_field],user_name=payload[self.user_name_field],token=token, payload=payload) class JWTWebSocketAuthenticationBackend(AuthenticationBackend): def __init__(self, secret_key: str, algorithm: str = 'HS256', query_param_name: str = 'jwt', user_name_field:str = 'uname' , user_id_field: str = 'usid',member_number_field: str = 'mid'): self.secret_key = secret_key self.algorithm = algorithm self.query_param_name = query_param_name self.user_name_field = user_name_field self.user_id_field = user_id_field self.member_number_field = member_number_field async def authenticate(self, request): if self.query_param_name not in request.query_params: return AuthCredentials(), UnauthenticatedUser() token = request.query_params[self.query_param_name] try: payload = jwt.decode(token, key=self.secret_key, algorithms=self.algorithm) except jwt.InvalidTokenError as e: raise AuthenticationError(str(e)) return AuthCredentials(["jwt_authenticated"]), JWTUser(user_id_short=payload[self.user_id_field], member_number=payload[self.member_number_field],user_name=payload[self.user_name_field],token=token, payload=payload) |
Recent Comments