135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
|
import datetime
|
|||
|
from functools import wraps
|
|||
|
|
|||
|
from flask import g, jsonify, request
|
|||
|
|
|||
|
import jwt
|
|||
|
from app.configs.default import SECRET_KEY
|
|||
|
from flask_httpauth import HTTPTokenAuth, HTTPAuth
|
|||
|
|
|||
|
# 生成token,有效时间为 60*60 秒
|
|||
|
from app.exts import db
|
|||
|
|
|||
|
|
|||
|
def generate_auth_token(user_name, expiration=3600):
|
|||
|
reset_token = jwt.encode(
|
|||
|
{
|
|||
|
"user_name": user_name,
|
|||
|
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration)
|
|||
|
},
|
|||
|
SECRET_KEY,
|
|||
|
algorithm="HS256"
|
|||
|
)
|
|||
|
return reset_token
|
|||
|
|
|||
|
|
|||
|
# 重写 HTTPTokenAuth
|
|||
|
class HTTPTokenAuthReReturn(HTTPTokenAuth):
|
|||
|
def __init__(self, scheme=None, realm=None, header=None):
|
|||
|
super().__init__(scheme, realm, header)
|
|||
|
|
|||
|
# 重写default_auth_error, 或者也可以重新定义一个函数
|
|||
|
# def default_auth_error(status):
|
|||
|
# return jsonify(data={}, message="token 错误!", code=status), status
|
|||
|
|
|||
|
def default_auth_error(status, message):
|
|||
|
return jsonify(data={}, message=message, code=status), status
|
|||
|
|
|||
|
# 如果重新定义函数的话,这里就传入新定义的函数名
|
|||
|
super().error_handler(default_auth_error)
|
|||
|
# 重写 login_required
|
|||
|
|
|||
|
def login_required(self, f=None, role=None, optional=None):
|
|||
|
if f is not None and \
|
|||
|
(role is not None or optional is not None): # pragma: no cover
|
|||
|
raise ValueError(
|
|||
|
'role and optional are the only supported arguments')
|
|||
|
|
|||
|
def login_required_internal(f):
|
|||
|
@wraps(f)
|
|||
|
def decorated(*args, **kwargs):
|
|||
|
auth = self.get_auth()
|
|||
|
|
|||
|
if request.method != 'OPTIONS':
|
|||
|
password = self.get_auth_password(auth)
|
|||
|
|
|||
|
status = None # 添加状态信息
|
|||
|
message = None
|
|||
|
user = self.authenticate(auth, password)
|
|||
|
# 这里判断verify_token的返回值是否是这里的一员
|
|||
|
if user in (False, None, 'BadSignature', 'SignatureExpired'):
|
|||
|
status = 401
|
|||
|
if user == 'BadSignature':
|
|||
|
message = "Bad Signature"
|
|||
|
elif user == 'SignatureExpired':
|
|||
|
message = "Signature Expired"
|
|||
|
elif not self.authorize(role, user, auth):
|
|||
|
status = 403
|
|||
|
message = "Forbidden"
|
|||
|
if not optional and status:
|
|||
|
# Clear TCP receive buffer of any pending data
|
|||
|
request.data
|
|||
|
try:
|
|||
|
# 因为之前重写了default_auth_error所以多传入一个message
|
|||
|
return self.auth_error_callback(status, message)
|
|||
|
except TypeError:
|
|||
|
return self.auth_error_callback()
|
|||
|
|
|||
|
g.flask_httpauth_user = user if user is not True \
|
|||
|
else auth.username if auth else None
|
|||
|
return f(*args, **kwargs)
|
|||
|
|
|||
|
return decorated
|
|||
|
|
|||
|
if f:
|
|||
|
return login_required_internal(f)
|
|||
|
return login_required_internal
|
|||
|
|
|||
|
|
|||
|
auth = HTTPTokenAuthReReturn(scheme="Bearer")
|
|||
|
|
|||
|
|
|||
|
# 获取用户权限
|
|||
|
@auth.get_user_roles
|
|||
|
def get_user_roles(user):
|
|||
|
the_role = get_role_token(user["token"])
|
|||
|
return the_role.role_key
|
|||
|
|
|||
|
|
|||
|
# 验证token
|
|||
|
@auth.verify_token
|
|||
|
def verify_token(token):
|
|||
|
try:
|
|||
|
print(token)
|
|||
|
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
|||
|
print(data)
|
|||
|
except jwt.ExpiredSignatureError:
|
|||
|
print("token过期")
|
|||
|
# 这里不用False,而是用自定义字符串
|
|||
|
return "SignatureExpired"
|
|||
|
except jwt.PyJWTError:
|
|||
|
print("token错误")
|
|||
|
# 这里不用False,而是用自定义字符串
|
|||
|
return "BadSignature"
|
|||
|
return True
|
|||
|
|
|||
|
|
|||
|
# 解析token不加密部分
|
|||
|
def token_parse(token):
|
|||
|
the_token = str.replace(str(token), 'Bearer ', '')
|
|||
|
decoded = jwt.decode(the_token, options={"verify_signature": False})
|
|||
|
return decoded
|
|||
|
|
|||
|
|
|||
|
# 根据 username 获取角色
|
|||
|
def get_role_username(username: str):
|
|||
|
return []
|
|||
|
|
|||
|
|
|||
|
# 根据 username 获取角色
|
|||
|
def get_role_token(token: str):
|
|||
|
user_info = token_parse(token)
|
|||
|
username = user_info["user_name"]
|
|||
|
the_role = get_role_username(username)
|
|||
|
return the_role
|