pip install djangorestframework-simplejwt
pip install django-filter
pip install coreapi
pip install drf-yasg
1、添加drf_yasg到settings.py
INSTALLED_APPS = [
...
'rest_framework',
'drf_yasg',
'cmdb.apps.CmdbConfig',
]
2、配置路由
from rest_framework import permissions
from drf_yasg.views import get_schema_view
from drf_yasg import openapi
schema_view = get_schema_view(
openapi.Info(
title="DevOps运维平台",
default_version='v1',
description="DevOps运维平台 接口文档",
terms_of_service="",
contact=openapi.Contact(email="qqing_lai@hotmail.com"),
license=openapi.License(name="BSD License"),
),
public=True,
permission_classes=(permissions.AllowAny,),
)
...
urlpatterns = [
path('apidoc/', schema_view.with_ui('swagger',
...
]
# drf配置
REST_FRAMEWORK = {
# 自定义分页
'DEFAULT_PAGINATION_CLASS': 'common.extends.pagination.CustomPagination',
'PAGE_SIZE': 20,
# 用户登陆认证方式
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
),
# 全局权限拦截
'DEFAULT_PERMISSION_CLASSES': (
'common.extends.permissions.RbacPermission',
),
'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
'DEFAULT_FILTER_BACKENDS': (
'django_filters.rest_framework.DjangoFilterBackend',
'rest_framework.filters.SearchFilter',
'rest_framework.filters.OrderingFilter',
),
'DEFAULT_RENDERER_CLASSES': ['rest_framework.renderers.JSONRenderer',
'rest_framework.renderers.BrowsableAPIRenderer'] if DEBUG else [
'rest_framework.rend
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author : Charles Lai
@file : pagination.py
@time : 2023/03/26 15:22
@contact : qqing_lai@hotmail.com
'''
# here put the import lib
from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination
from rest_framework.response import Response
from rest_framework import status
class CustomPagination(PageNumberPagination):
def get_paginated_response(self, data):
return Response({'data': {'list': data, 'total': self.page.paginator.count, 'next': self.get_next_link(),
'previous': self.get_previous_link()}, 'success': True, 'errorCode': 0, 'errorMessage': None}, status=status.HTTP_200_OK)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author : Charles Lai
@file : viewsets.py
@time : 2023/03/26 14:42
@contact : qqing_lai@hotmail.com
'''
# here put the import lib
import inspect
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from rest_framework import viewsets
from rest_framework import pagination
from rest_framework.settings import api_settings
from rest_framework.filters import OrderingFilter
from django.db.models.query import QuerySet
from django.db.models import ProtectedError
from django.core.cache import cache
import pytz
import logging
logger = logging.getLogger(__name__)
def ops_response(data, success=True, errorCode=0, errorMessage=None, status=status.HTTP_200_OK):
"""
返回自定义
data列表数据格式:{
list: [
],
current?: number,
pageSize?: number,
total?: number,
}
"""
return Response({'data': data, 'success': success, 'errorCode': errorCode, 'errorMessage': errorMessage}, status=status)
class AutoModelViewSet(viewsets.ModelViewSet):
"""
A viewset that provides default `create()`, `retrieve()`, `update()`,
`partial_update()`, `destroy()` and `list()` actions.
"""
permission_classes = [IsAuthenticated]
permission_classes_by_action = {}
filter_backends = (OrderingFilter, )
def __init__(self, *args, **kwargs):
if not hasattr(self, 'queryset'):
raise AttributeError('必须定义 类属性 queryset')
if not hasattr(self, 'serializer_class'):
raise AttributeError('必须定义 类属性 serializer_class')
super().__init__(*args, **kwargs)
def get_serializer(self, *args, **kwargs):
"""
重写 get_serializer 类,用来支持自动获取不同的 serializer_class
例子: list 方法, 设置一个serializer_list_class, 则调用get_serializer的时候, 优先获取
命名格式 serializer_{call_func_name}_class
:param args:
:param kwargs:
:return:
"""
call_func_name = inspect.stack()[1][3]
serializer_class = getattr(self, f'serializer_{call_func_name}_class', None)
if not serializer_class:
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_object(self):
return super(AutoModelViewSet, self).get_object()
def get_permissions(self):
try:
return [permission() for permission in self.permission_classes_by_action[self.action]]
except KeyError:
return [permission() for permission in self.permission_classes]
def get_permission_from_role(self, request):
try:
perms = request.user.roles.values(
'permissions__method',
).distinct()
return [p['permissions__method'] for p in perms]
except (AttributeError, TypeError):
return []
def extend_filter(self, queryset):
return queryset
def get_queryset(self):
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.extend_filter(self.queryset)
if isinstance(queryset, QuerySet):
queryset = queryset.all()
return queryset.distinct()
def create(self, request, *args, **kwargs):
try:
request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
except BaseException as e:
logger.debug('exception ', str(e))
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
try:
self.perform_create(serializer)
except BaseException as e:
return ops_response({}, success=False, errorCode=50000, errorMessage=str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return ops_response(serializer.data)
def list(self, request, pk=None, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page_size = request.query_params.get('page_size', None)
if not page_size:
page_size = api_settings.PAGE_SIZE
pagination.PageNumberPagination.page_size = page_size
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return ops_response({'list': serializer.data, 'total': queryset.count()})
def update(self, request, *args, **kwargs):
instance = self.get_object()
partial = kwargs.pop('partial', False)
try:
request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
except BaseException as e:
logger.warning(f'不包含name字段: {str(e)}')
serializer = self.get_serializer(instance, data=request.data, partial=partial)
if not serializer.is_valid():
return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
try:
self.perform_update(serializer)
except BaseException as e:
return ops_response({}, success=False, errorCode=50000, errorMessage=str(e))
if getattr(instance, '_prefetched_objects_cache', None):
instance._prefetched_objects_cache = {}
data = {'data': serializer.data, 'status': 'success', 'code': 20000}
return ops_response(serializer.data)
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return ops_response(serializer.data)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
try:
self.perform_destroy(instance)
except ProtectedError:
# 存在关联数据,不可删除
return ops_response({}, success=False, errorCode=40300, errorMessage='存在关联数据,禁止删除!')
except BaseException as e:
logger.exception(f'删除数据发生错误 {e}, {e.__class__}')
return ops_response({}, success=False, errorCode=50000, errorMessage=f'删除异常: {str(e)}')
return ops_response('删除成功')
class AutoModelParentViewSet(AutoModelViewSet):
def get_queryset(self):
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.extend_filter(self.queryset)
if self.action == 'list':
if not self.request.query_params.get('search'):
queryset = queryset.filter(parent__isnull=True)
if isinstance(queryset, QuerySet):
queryset = queryset.all()
return queryset.distinct()
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author : Charles Lai
@file : permissions.py
@time : 2023/03/26 15:27
@contact : qqing_lai@hotmail.com
'''
# here put the import lib
from rest_framework.permissions import BasePermission
from config import platform
import logging
logger = logging.getLogger(__name__)
class RbacPermission(BasePermission):
"""
自定义权限
"""
@classmethod
def check_is_admin(cls, request):
return request.user.is_authenticated and request.user.roles.filter(name='管理员').count() > 0
@classmethod
def get_permission_from_role(cls, request):
try:
perms = request.user.roles.values(
'permissions__method',
).distinct()
return [p['permissions__method'] for p in perms]
except AttributeError:
return []
def _has_permission(self, request, view):
"""
权限获取方式
从 perms_map 中获取, 通过 request.method, http 请求方法来获取对应权限点
1. 默认格式
perms_map = (
{'*': ('admin', '管理员')},
{'*': ('k8s_all', 'k8s管理')},
{'get': ('k8s_list', '查看k8s')},
{'post': ('k8s_create', '创建k8s')},
{'put': ('k8s_edit', '编辑k8s')},
{'delete': ('k8s_delete', '删除k8s')}
)
2. 自定义方法格式
perms_map = (
{'get_test_data': ('get_test_data', '获取测试数据')},
)
此时 格式为 {http请求方法}_{ViewSet自定义action}
:param request: rest_framework request 对象
:param view: rest_framework view 对象
:return:
"""
_method = request._request.method.lower()
url_whitelist = platform['whitelist'] if platform else []
path_info = request.path_info
for item in url_whitelist:
url = item['url']
if url in path_info:
logger.debug(f'请求地址 {path_info} 命中白名单 {url}, 放行')
return True
is_superuser = request.user.is_superuser
# 超级管理员 或者 白名单模式 直接放行
if is_superuser:
logger.debug(f'用户 {request.user} 是超级管理员, 放行 is_superuser = {is_superuser}')
return True
is_admin = RbacPermission.check_is_admin(request)
perms = self.get_permission_from_role(request)
# 不是管理员 且 权限列表为空的情况下, 直接拒绝
if not is_admin and not perms:
logger.debug(f'用户 {request.user} 不是管理员 且 权限列表为空, 直接拒绝')
return False
perms_map = view.perms_map
# 未配置权限映射的视图一律禁止访问
if not hasattr(view, 'perms_map'):
logger.debug(f'未配置权限映射的视图一律禁止访问 {view}')
return False
# _custom_method = None
# default_funcs = ['create', 'list', 'retrieve', 'update', 'destroy']
action = view.action
_custom_method = f'{_method}_{action}'
for i in perms_map:
logger.debug(f'perms_map item === {i}')
for method, alias in i.items():
# 如果是管理员, 判断当前perms_map是否带有 {'*': ('admin', '管理员')} 标记,如果有, 则当前 ViewSet 所有方法全放行
if is_admin and (method == '*' and alias[0] == 'admin'):
logger.debug('管理员判断通过, 放行')
return True
# 如果带有某个模块的管理权限, 则当前模块所有方法都放行
if method == '*' and alias[0] in perms:
logger.debug('模块管理权限 判断通过, 放行')
return True
# 判断自定义action的情况
# {'get_test_data': ('get_test_data', '获取测试数据')},
# {'*_test_data': ('get_test_data', '获取测试数据')},
if _custom_method and alias[0] in perms and (_custom_method == method or method == f'*_{action}'):
logger.debug('自定义action权限 判断通过, 放行')
return True
# 判断是否拥有ViewSet 某个方法的权限, 有则放行
# {'get': ('workflow_list', '查看工单')},
if _method == method and alias[0] in perms:
logger.debug(f'{method}方法权限 判断通过, 放行')
return True
logger.debug(f'{path_info} 没有符合条件的, 则默认禁止访问')
return False
def has_permission(self, request, view):
res = self._has_permission(request, view)
# 记录权限异常的操作
if not res:
pass
return res
class AdminPermission(BasePermission):
def has_permission(self, request, view):
if RbacPermission.check_is_admin(request):
return True
return False
class ObjPermission(BasePermission):
"""
密码管理对象级权限控制
"""
def has_object_permission(self, request, view, obj):
perms = RbacPermission.get_permission_from_role(request)
if 'admin' in perms:
return True
elif request.user.id == obj.uid_id:
return True
我们之前已经完成的view_cmdb.py文件里,有几处需要更新:
1、将viewsets.ModelViewSet更改成公共视图里的AutoModelViewSet
2、将Response更改成ops_reponse,确保返回内容格式一致.
(venv) ➜ ydevops-backend cd /home/charles/ydevops-backend ; /usr/bin/env /home/charles/ydevops-backend/venv/bin/python /home/charles/.vscode-server/extensions/ms-python.python-2023.5.10791008/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher 41223 -- /home/charles/ydevops-backend/manage.py runserver 0.0.0.0:9000
Watching for file changes with StatReloader
Performing system checks...
System check identified no issues (0 silenced).
March 26, 2023 - 18:50:30
Django version 4.1.7, using settings 'devops_backend.settings'
Starting development server at http://0.0.0.0:9000/
Quit the server with CONTROL-C.
访问接口文档 http://localhost:9000/apidoc/
在这个页面,我们可以做一些CRUD操作,如查看环境(由于数据量小,可以先把settings.py里的默认PAGE_SIZE改为1)
Okay,今天先到这吧...
页面更新:2024-03-23
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号