SpringCloud+Tornado基于jwt实现请求安全校验功能
项目背景
在实际项目中,Tornado
项目作为一个微服务纳入SpringCloud
体系,该过程中涉及到Tornado
与Spring
体系的安全验证,也就是权限调用校验,在该项目中Tornado
是通过SpringCloud
中的Feign
调用的,经过一系列实验,最后选用jwt
来实现这个权限效验的过程。
实现思路
用户进行登陆认证(后台微服务),认证成功后调用Tornado
项目的认证接口生成token
,该值返回到后台微服务保存在会话中,下一次请求时带上该token
值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。
项目结构
common - authServer.py是认证接口 common - basicServer.py是示例接口 handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出 utils - jwtUtils.py是生成与校验token的 utils - responseUtils.py是返回结果工具类
具体实现
jwtUtils.py
# -*- coding:utf-8 -*- import jwt import datetime from jwt import exceptions from utils.responseUtils import JsonUtils JWT_SALT = '1qazxdr5' def create_token(payload, timeout=12): """ 创建token :param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息 :param timeout: token的过期时间,默认20分钟 :return: """ headers = { 'typ': 'jwt', 'alg': 'HS256' } payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8') return result def parse_payload(token): """ 对token进行校验并获取payload :param token: :return: """ try: verified_payload = jwt.decode(token, JWT_SALT, True) print(verified_payload) return JsonUtils.success('认证通过') except exceptions.ExpiredSignatureError: return JsonUtils.noAuth('token已失效') except jwt.DecodeError: return JsonUtils.noAuth('token认证失败') except jwt.InvalidTokenError: return JsonUtils.noAuth('非法的token')
baseHandler.py
# -*- coding:utf-8 -*- import functools import json import tornado.web from utils import jwtUtils from utils.responseUtils import JsonUtils # 方式一:authenticated 装饰器 def authenticated(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): """ 这里调用的是 current_user 的 get 方法(property装饰), """ # 通过token请求头传递token head = self.request.headers token = head.get("token", "") if not token: self.write(JsonUtils.noAuth("未获取到token请求头")) self.set_header('Content-Type', 'application/json') return result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) token_msg = json.dumps(result) if result['sta'] != '00': self.write(token_msg) self.set_header('Content-Type', 'application/json') return return method(self, *args, **kwargs) return wrapper # 方式二:进行预设 继承tornado的RequestHandler class BaseHandler(tornado.web.RequestHandler): def prepare(self): super(BaseHandler, self).prepare() def set_default_headers(self): super().set_default_headers() # 进行token校验,继承上面的BaseHandler class TokenHandler(BaseHandler): def prepare(self): # 通过token请求头传递token head = self.request.headers token = head.get("token","") if not token: self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头")) result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) if result['sta'] != '00': self.isAuth = False else: self.isAuth = True self.authMsg = json.dumps(result)
authServer.py
import tornado.web from utils import jwtUtils from utils.responseUtils import JsonUtils class authHandler(tornado.web.RequestHandler): def post(self, *args, **kwargs): """ 安全认证接口 :param args: :param kwargs: :return: """ username = self.get_argument("username") print("authHandler:" + username) if not username: self.write(JsonUtils.error("参数异常")) else: token = jwtUtils.create_token({"username": username}) print("token:" + token) self.write(JsonUtils.success(token)) self.set_header('Content-Type', 'application/json')
basicServer.py
import tornado.web import json from pandas.core.frame import DataFrame from handlers import baseHandler from utils.responseUtils import JsonUtils from handlers.baseHandler import authenticated class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler): """ *** TokenHandler验证,对应baseHandler.py中的方式二 *** """ def get(self): username = self.get_argument('username', 'Hello') # 权限认证通过 if self.isAuth: self.write(JsonUtils.success(username)) else: self.write(self.authMsg)) self.set_header('Content-Type', 'application/json') class TestHandler(tornado.web.RequestHandler): """ *** authenticated验证,对应baseHandler.py中的方式一 *** """ @authenticated def post(self): username = self.get_argument('username', 'Hello') self.write(JsonUtils.success(username)) self.set_header('Content-Type', 'application/json')
responseUtils.py
from tornado.escape import json_encode, utf8 class JsonUtils(object): @staticmethod def success(response): """ 正确返回 :param response: 返回结果 :return: string, {"message": "ok", "sta": "00", "data": } """ return json_encode({"message": "ok", "sta": "00", "data": response}) @staticmethod def info(message): """ 提示返回 :param message: 提示信息 :return: string, """ return json_encode({"message": str(message), "sta": "99001", "data": None}) @staticmethod def error(message): """ 错误返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "9999", "data": None}) @staticmethod def noAuth(message): """ 无权限返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "403", "data": None})
下面是一些调用的结果图示:
.end
到此这篇关于SpringCloud+Tornado基于jwt实现请求安全校验的文章就介绍到这了,更多相关SpringCloud+Tornado实现请求安全校验内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java阻塞队列必看类:BlockingQueue快速了解大体框架和实现思路
这篇文章主要介绍了Java阻塞队列必看类:BlockingQueue快速了解大体框架和实现思路,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-10-10
最新评论