You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

71 lines
2.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import re
from django.conf import settings
from django_redis import get_redis_connection
from rest_framework.permissions import BasePermission
import logging
from ERP_5.utils.cache_permissions import cache_permissions_by_user
logger = logging.getLogger('my')
class RbacPermission(BasePermission):
"""
自定义权限认证
"""
def do_url(self, url):
"""
得到完整的访问URL
"""
base_api = settings.BASE_API
uri = '/' + base_api + '/' + url + '/'
return re.sub('/+', '/', uri) # 防止访问地址中出现重复的/,比如://users/hello/
def has_permission(self, request, view):
"""
判断是否有权限,该函数必须要重写
思路和步骤:
1、获取请求的URL和请求方法methoduser用户对象
2、判断是否白名单的url, 或者用户的角色是admin也直接放行
3、从redis中得到当前登录用户的所有权限
4、判断是否存在权限
"""
request_url = request.path # api/users/login/ == api/users/login/
request_method = request.method
logger.info(f'请求地址:{request_url}, 请求方法: {request_method}')
for safe_url in settings.WHITE_LIST:
if re.match(settings.REGEX_URL.format(url=safe_url), request_url):
# 白名单的url直接返回
return True
user = request.user
role_name_list = user.roles.values_list('name', flat=True)
logger.info(role_name_list)
if 'admin' in role_name_list:
return True
redis_conn = get_redis_connection('default') # 获得默认redis库的连接
if not redis_conn.exists(f'user_{user.id}'): # redis中没有缓存权限数据
cache_permissions_by_user(user)
# 得到所有的权限中key其中key是请求url地址
url_keys = redis_conn.hkeys(f'user_{user.id}')
for url_key in url_keys:
# 查看在redis中是否存在该接口对于的权限
if re.match(settings.REGEX_URL.format(url=self.do_url(url_key.decode())), request_url):
redis_key = url_key.decode() # redis中是字节数据
break
else: # 这行代码可以不加,如果严格规定权限。
return False
permissions = json.loads(redis_conn.hget(f'user_{user.id}', redis_key).decode())
method_hit = False # 同一接口配置不同权限验证
for permission in permissions:
if permission.get('method') == request_method:
method_hit = True
break
return method_hit