135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
import datetime
|
||
from functools import wraps
|
||
|
||
from flask import g, jsonify, request
|
||
|
||
import jwt
|
||
from app.configs.default import SECRET_KEY
|
||
from flask_httpauth import HTTPTokenAuth, HTTPAuth
|
||
|
||
# 生成token,有效时间为 60*60 秒
|
||
from app.exts import db
|
||
|
||
|
||
def generate_auth_token(user_name, expiration=3600):
|
||
reset_token = jwt.encode(
|
||
{
|
||
"user_name": user_name,
|
||
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration)
|
||
},
|
||
SECRET_KEY,
|
||
algorithm="HS256"
|
||
)
|
||
return reset_token
|
||
|
||
|
||
# 重写 HTTPTokenAuth
|
||
class HTTPTokenAuthReReturn(HTTPTokenAuth):
|
||
def __init__(self, scheme=None, realm=None, header=None):
|
||
super().__init__(scheme, realm, header)
|
||
|
||
# 重写default_auth_error, 或者也可以重新定义一个函数
|
||
# def default_auth_error(status):
|
||
# return jsonify(data={}, message="token 错误!", code=status), status
|
||
|
||
def default_auth_error(status, message):
|
||
return jsonify(data={}, message=message, code=status), status
|
||
|
||
# 如果重新定义函数的话,这里就传入新定义的函数名
|
||
super().error_handler(default_auth_error)
|
||
# 重写 login_required
|
||
|
||
def login_required(self, f=None, role=None, optional=None):
|
||
if f is not None and \
|
||
(role is not None or optional is not None): # pragma: no cover
|
||
raise ValueError(
|
||
'role and optional are the only supported arguments')
|
||
|
||
def login_required_internal(f):
|
||
@wraps(f)
|
||
def decorated(*args, **kwargs):
|
||
auth = self.get_auth()
|
||
|
||
if request.method != 'OPTIONS':
|
||
password = self.get_auth_password(auth)
|
||
|
||
status = None # 添加状态信息
|
||
message = None
|
||
user = self.authenticate(auth, password)
|
||
# 这里判断verify_token的返回值是否是这里的一员
|
||
if user in (False, None, 'BadSignature', 'SignatureExpired'):
|
||
status = 401
|
||
if user == 'BadSignature':
|
||
message = "Bad Signature"
|
||
elif user == 'SignatureExpired':
|
||
message = "Signature Expired"
|
||
elif not self.authorize(role, user, auth):
|
||
status = 403
|
||
message = "Forbidden"
|
||
if not optional and status:
|
||
# Clear TCP receive buffer of any pending data
|
||
request.data
|
||
try:
|
||
# 因为之前重写了default_auth_error所以多传入一个message
|
||
return self.auth_error_callback(status, message)
|
||
except TypeError:
|
||
return self.auth_error_callback()
|
||
|
||
g.flask_httpauth_user = user if user is not True \
|
||
else auth.username if auth else None
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated
|
||
|
||
if f:
|
||
return login_required_internal(f)
|
||
return login_required_internal
|
||
|
||
|
||
auth = HTTPTokenAuthReReturn(scheme="Bearer")
|
||
|
||
|
||
# 获取用户权限
|
||
@auth.get_user_roles
|
||
def get_user_roles(user):
|
||
the_role = get_role_token(user["token"])
|
||
return the_role.role_key
|
||
|
||
|
||
# 验证token
|
||
@auth.verify_token
|
||
def verify_token(token):
|
||
try:
|
||
print(token)
|
||
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
||
print(data)
|
||
except jwt.ExpiredSignatureError:
|
||
print("token过期")
|
||
# 这里不用False,而是用自定义字符串
|
||
return "SignatureExpired"
|
||
except jwt.PyJWTError:
|
||
print("token错误")
|
||
# 这里不用False,而是用自定义字符串
|
||
return "BadSignature"
|
||
return True
|
||
|
||
|
||
# 解析token不加密部分
|
||
def token_parse(token):
|
||
the_token = str.replace(str(token), 'Bearer ', '')
|
||
decoded = jwt.decode(the_token, options={"verify_signature": False})
|
||
return decoded
|
||
|
||
|
||
# 根据 username 获取角色
|
||
def get_role_username(username: str):
|
||
return []
|
||
|
||
|
||
# 根据 username 获取角色
|
||
def get_role_token(token: str):
|
||
user_info = token_parse(token)
|
||
username = user_info["user_name"]
|
||
the_role = get_role_username(username)
|
||
return the_role
|