RODY/app/services/TokenAuthService.py

135 lines
4.3 KiB
Python
Raw Normal View History

2022-11-04 17:37:08 +08:00
import datetime
from functools import wraps
from flask import g, jsonify, request
import jwt
from app.configs.default import SECRET_KEY
from flask_httpauth import HTTPTokenAuth, HTTPAuth
# 生成token,有效时间为 60*60 秒
from app.exts import db
def generate_auth_token(user_name, expiration=3600):
reset_token = jwt.encode(
{
"user_name": user_name,
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration)
},
SECRET_KEY,
algorithm="HS256"
)
return reset_token
# 重写 HTTPTokenAuth
class HTTPTokenAuthReReturn(HTTPTokenAuth):
def __init__(self, scheme=None, realm=None, header=None):
super().__init__(scheme, realm, header)
# 重写default_auth_error, 或者也可以重新定义一个函数
# def default_auth_error(status):
# return jsonify(data={}, message="token 错误!", code=status), status
def default_auth_error(status, message):
return jsonify(data={}, message=message, code=status), status
# 如果重新定义函数的话,这里就传入新定义的函数名
super().error_handler(default_auth_error)
# 重写 login_required
def login_required(self, f=None, role=None, optional=None):
if f is not None and \
(role is not None or optional is not None): # pragma: no cover
raise ValueError(
'role and optional are the only supported arguments')
def login_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = self.get_auth()
if request.method != 'OPTIONS':
password = self.get_auth_password(auth)
status = None # 添加状态信息
message = None
user = self.authenticate(auth, password)
# 这里判断verify_token的返回值是否是这里的一员
if user in (False, None, 'BadSignature', 'SignatureExpired'):
status = 401
if user == 'BadSignature':
message = "Bad Signature"
elif user == 'SignatureExpired':
message = "Signature Expired"
elif not self.authorize(role, user, auth):
status = 403
message = "Forbidden"
if not optional and status:
# Clear TCP receive buffer of any pending data
request.data
try:
# 因为之前重写了default_auth_error所以多传入一个message
return self.auth_error_callback(status, message)
except TypeError:
return self.auth_error_callback()
g.flask_httpauth_user = user if user is not True \
else auth.username if auth else None
return f(*args, **kwargs)
return decorated
if f:
return login_required_internal(f)
return login_required_internal
auth = HTTPTokenAuthReReturn(scheme="Bearer")
# 获取用户权限
@auth.get_user_roles
def get_user_roles(user):
the_role = get_role_token(user["token"])
return the_role.role_key
# 验证token
@auth.verify_token
def verify_token(token):
try:
print(token)
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
print(data)
except jwt.ExpiredSignatureError:
print("token过期")
# 这里不用False而是用自定义字符串
return "SignatureExpired"
except jwt.PyJWTError:
print("token错误")
# 这里不用False而是用自定义字符串
return "BadSignature"
return True
# 解析token不加密部分
def token_parse(token):
the_token = str.replace(str(token), 'Bearer ', '')
decoded = jwt.decode(the_token, options={"verify_signature": False})
return decoded
# 根据 username 获取角色
def get_role_username(username: str):
return []
# 根据 username 获取角色
def get_role_token(token: str):
user_info = token_parse(token)
username = user_info["user_name"]
the_role = get_role_username(username)
return the_role