从0到1开发自动化运维平台-接口文档分页视图和权限配置

安装依赖

pip install djangorestframework-simplejwt
pip install django-filter
pip install coreapi
pip install drf-yasg

配置swagger接口文档

1、添加drf_yasg到settings.py

INSTALLED_APPS = [
...
    'rest_framework',
    'drf_yasg',
    'cmdb.apps.CmdbConfig',
]

2、配置路由

from rest_framework import permissions
from drf_yasg.views import get_schema_view
from drf_yasg import openapi

schema_view = get_schema_view(
    openapi.Info(
        title="DevOps运维平台",
        default_version='v1',
        description="DevOps运维平台 接口文档",
        terms_of_service="",
        contact=openapi.Contact(email="qqing_lai@hotmail.com"),
        license=openapi.License(name="BSD License"),
    ),
    public=True,
    permission_classes=(permissions.AllowAny,),
)
...
urlpatterns = [
    path('apidoc/', schema_view.with_ui('swagger', 
...
]

restframework配置

# drf配置
REST_FRAMEWORK = {
    # 自定义分页
    'DEFAULT_PAGINATION_CLASS': 'common.extends.pagination.CustomPagination',
    'PAGE_SIZE': 20,
    # 用户登陆认证方式
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework_simplejwt.authentication.JWTAuthentication',
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication',
    ),
    # 全局权限拦截
    'DEFAULT_PERMISSION_CLASSES': (
        'common.extends.permissions.RbacPermission',
    ),
    'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
    'DEFAULT_FILTER_BACKENDS': (
        'django_filters.rest_framework.DjangoFilterBackend',
        'rest_framework.filters.SearchFilter',
        'rest_framework.filters.OrderingFilter',
    ),
    'DEFAULT_RENDERER_CLASSES': ['rest_framework.renderers.JSONRenderer',
                                 'rest_framework.renderers.BrowsableAPIRenderer'] if DEBUG else [
        'rest_framework.rend

分页扩展

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   pagination.py
@time    :   2023/03/26 15:22
@contact :   qqing_lai@hotmail.com
'''

# here put the import lib
from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination
from rest_framework.response import Response
from rest_framework import status


class CustomPagination(PageNumberPagination):
    def get_paginated_response(self, data):
        return Response({'data': {'list': data, 'total': self.page.paginator.count, 'next': self.get_next_link(),
            'previous': self.get_previous_link()}, 'success': True, 'errorCode': 0, 'errorMessage': None}, status=status.HTTP_200_OK)

自定义公共视图

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   viewsets.py
@time    :   2023/03/26 14:42
@contact :   qqing_lai@hotmail.com
'''

# here put the import lib
import inspect
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from rest_framework import viewsets
from rest_framework import pagination
from rest_framework.settings import api_settings
from rest_framework.filters import OrderingFilter
from django.db.models.query import QuerySet
from django.db.models import ProtectedError
from django.core.cache import cache
import pytz
import logging

logger = logging.getLogger(__name__)


def ops_response(data, success=True, errorCode=0, errorMessage=None, status=status.HTTP_200_OK):
    """
    返回自定义
    data列表数据格式:{
 list: [
 ],
 current?: number,
 pageSize?: number,
 total?: number,
}
    """
    return Response({'data': data, 'success': success, 'errorCode': errorCode, 'errorMessage': errorMessage}, status=status)


class AutoModelViewSet(viewsets.ModelViewSet):
    """
    A viewset that provides default `create()`, `retrieve()`, `update()`,
    `partial_update()`, `destroy()` and `list()` actions.
    """

    permission_classes = [IsAuthenticated]
    permission_classes_by_action = {}
    filter_backends = (OrderingFilter, )

    def __init__(self, *args, **kwargs):
        if not hasattr(self, 'queryset'):
            raise AttributeError('必须定义 类属性 queryset')

        if not hasattr(self, 'serializer_class'):
            raise AttributeError('必须定义 类属性 serializer_class')

        super().__init__(*args, **kwargs)

    def get_serializer(self, *args, **kwargs):
        """
        重写 get_serializer 类,用来支持自动获取不同的 serializer_class
        例子:  list 方法, 设置一个serializer_list_class, 则调用get_serializer的时候, 优先获取
        命名格式 serializer_{call_func_name}_class
        :param args:
        :param kwargs:
        :return:
        """
        call_func_name = inspect.stack()[1][3]
        serializer_class = getattr(self, f'serializer_{call_func_name}_class', None)
        if not serializer_class:
            serializer_class = self.get_serializer_class()
        kwargs['context'] = self.get_serializer_context()
        return serializer_class(*args, **kwargs)

    def get_object(self):
        return super(AutoModelViewSet, self).get_object()

    def get_permissions(self):
        try:
            return [permission() for permission in self.permission_classes_by_action[self.action]]
        except KeyError:
            return [permission() for permission in self.permission_classes]
        
    def get_permission_from_role(self, request):
        try:
            perms = request.user.roles.values(
                'permissions__method',
            ).distinct()
            return [p['permissions__method'] for p in perms]
        except (AttributeError, TypeError):
            return []

    def extend_filter(self, queryset):
        return queryset

    def get_queryset(self):
        assert self.queryset is not None, (
                "'%s' should either include a `queryset` attribute, "
                "or override the `get_queryset()` method."
                % self.__class__.__name__
        )
        queryset = self.extend_filter(self.queryset)
        if isinstance(queryset, QuerySet):
            queryset = queryset.all()
        return queryset.distinct()

    def create(self, request, *args, **kwargs):
        try:
            request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
        except BaseException as e:
            logger.debug('exception ', str(e))
        serializer = self.get_serializer(data=request.data)
        if not serializer.is_valid():
            return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
        try:
            self.perform_create(serializer)
        except BaseException as e:
            return ops_response({}, success=False, errorCode=50000, errorMessage=str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        return ops_response(serializer.data)

    def list(self, request, pk=None, *args, **kwargs):
        queryset = self.filter_queryset(self.get_queryset())
        page_size = request.query_params.get('page_size', None)
        if not page_size:
            page_size = api_settings.PAGE_SIZE
        pagination.PageNumberPagination.page_size = page_size
        page = self.paginate_queryset(queryset)
        if page is not None:
            serializer = self.get_serializer(page, many=True)
            return self.get_paginated_response(serializer.data)
        serializer = self.get_serializer(queryset, many=True)
        return ops_response({'list': serializer.data, 'total': queryset.count()})

    def update(self, request, *args, **kwargs):
        instance = self.get_object()
        partial = kwargs.pop('partial', False)
        try:
            request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
        except BaseException as e:
            logger.warning(f'不包含name字段: {str(e)}')
        serializer = self.get_serializer(instance, data=request.data, partial=partial)
        if not serializer.is_valid():
            return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
        try:
            self.perform_update(serializer)
        except BaseException as e:
            return ops_response({}, success=False, errorCode=50000, errorMessage=str(e))

        if getattr(instance, '_prefetched_objects_cache', None):
            instance._prefetched_objects_cache = {}
        data = {'data': serializer.data, 'status': 'success', 'code': 20000}
        return ops_response(serializer.data)

    def retrieve(self, request, *args, **kwargs):
        instance = self.get_object()
        serializer = self.get_serializer(instance)
        return ops_response(serializer.data)

    def destroy(self, request, *args, **kwargs):
        instance = self.get_object()
        try:
            self.perform_destroy(instance)
        except ProtectedError:
            # 存在关联数据,不可删除
            return ops_response({}, success=False, errorCode=40300, errorMessage='存在关联数据,禁止删除!')
        except BaseException as e:
            logger.exception(f'删除数据发生错误 {e}, {e.__class__}')
            return ops_response({}, success=False, errorCode=50000, errorMessage=f'删除异常: {str(e)}')
        return ops_response('删除成功')


class AutoModelParentViewSet(AutoModelViewSet):

    def get_queryset(self):
        assert self.queryset is not None, (
                "'%s' should either include a `queryset` attribute, "
                "or override the `get_queryset()` method."
                % self.__class__.__name__
        )
        queryset = self.extend_filter(self.queryset)
        if self.action == 'list':
            if not self.request.query_params.get('search'):
                queryset = queryset.filter(parent__isnull=True)
        if isinstance(queryset, QuerySet):
            queryset = queryset.all()
        return queryset.distinct()

自定义权限校验

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   permissions.py
@time    :   2023/03/26 15:27
@contact :   qqing_lai@hotmail.com
'''

# here put the import lib
from rest_framework.permissions import BasePermission
from config import platform

import logging

logger = logging.getLogger(__name__)


class RbacPermission(BasePermission):
    """
    自定义权限
    """

    @classmethod
    def check_is_admin(cls, request):
        return request.user.is_authenticated and request.user.roles.filter(name='管理员').count() > 0

    @classmethod
    def get_permission_from_role(cls, request):
        try:
            perms = request.user.roles.values(
                'permissions__method',
            ).distinct()
            return [p['permissions__method'] for p in perms]
        except AttributeError:
            return []

    def _has_permission(self, request, view):
        """
        权限获取方式
            从 perms_map 中获取, 通过 request.method, http 请求方法来获取对应权限点
            1. 默认格式
                perms_map = (
                    {'*': ('admin', '管理员')},
                    {'*': ('k8s_all', 'k8s管理')},
                    {'get': ('k8s_list', '查看k8s')},
                    {'post': ('k8s_create', '创建k8s')},
                    {'put': ('k8s_edit', '编辑k8s')},
                    {'delete': ('k8s_delete', '删除k8s')}
                )
            2. 自定义方法格式
                perms_map = (
                    {'get_test_data': ('get_test_data', '获取测试数据')},
                )
                此时 格式为  {http请求方法}_{ViewSet自定义action}

        :param request: rest_framework request 对象
        :param view: rest_framework view 对象
        :return:
        """
        _method = request._request.method.lower()
        url_whitelist = platform['whitelist'] if platform else []
        path_info = request.path_info
        for item in url_whitelist:
            url = item['url']
            if url in path_info:
                logger.debug(f'请求地址 {path_info} 命中白名单 {url}, 放行')
                return True
            
        is_superuser = request.user.is_superuser
        # 超级管理员 或者 白名单模式 直接放行
        if is_superuser:
            logger.debug(f'用户 {request.user} 是超级管理员, 放行 is_superuser = {is_superuser}')
            return True

        is_admin = RbacPermission.check_is_admin(request)
        perms = self.get_permission_from_role(request)
        # 不是管理员 且 权限列表为空的情况下, 直接拒绝
        if not is_admin and not perms:
            logger.debug(f'用户 {request.user} 不是管理员 且 权限列表为空, 直接拒绝')
            return False

        perms_map = view.perms_map
        # 未配置权限映射的视图一律禁止访问
        if not hasattr(view, 'perms_map'):
            logger.debug(f'未配置权限映射的视图一律禁止访问 {view}')
            return False

        # _custom_method = None
        # default_funcs = ['create', 'list', 'retrieve', 'update', 'destroy']
        action = view.action
        _custom_method = f'{_method}_{action}'
        for i in perms_map:
            logger.debug(f'perms_map item ===  {i}')
            for method, alias in i.items():
                # 如果是管理员, 判断当前perms_map是否带有 {'*': ('admin', '管理员')} 标记,如果有, 则当前 ViewSet 所有方法全放行
                if is_admin and (method == '*' and alias[0] == 'admin'):
                    logger.debug('管理员判断通过, 放行')
                    return True
                # 如果带有某个模块的管理权限, 则当前模块所有方法都放行
                if method == '*' and alias[0] in perms:
                    logger.debug('模块管理权限 判断通过, 放行')
                    return True

                # 判断自定义action的情况
                # {'get_test_data': ('get_test_data', '获取测试数据')},
                # {'*_test_data': ('get_test_data', '获取测试数据')},
                if _custom_method and alias[0] in perms and (_custom_method == method or method == f'*_{action}'):
                    logger.debug('自定义action权限 判断通过, 放行')
                    return True

                # 判断是否拥有ViewSet 某个方法的权限, 有则放行
                # {'get': ('workflow_list', '查看工单')},
                if _method == method and alias[0] in perms:
                    logger.debug(f'{method}方法权限 判断通过, 放行')
                    return True
        logger.debug(f'{path_info} 没有符合条件的, 则默认禁止访问')
        return False

    def has_permission(self, request, view):
        res = self._has_permission(request, view)
        # 记录权限异常的操作
        if not res:
            pass
        return res


class AdminPermission(BasePermission):

    def has_permission(self, request, view):
        if RbacPermission.check_is_admin(request):
            return True
        return False


class ObjPermission(BasePermission):
    """
    密码管理对象级权限控制
    """

    def has_object_permission(self, request, view, obj):
        perms = RbacPermission.get_permission_from_role(request)
        if 'admin' in perms:
            return True
        elif request.user.id == obj.uid_id:
            return True

更新cmdb模块视图

我们之前已经完成的view_cmdb.py文件里,有几处需要更新:

1、将viewsets.ModelViewSet更改成公共视图里的AutoModelViewSet

2、将Response更改成ops_reponse,确保返回内容格式一致.

运行项目

(venv) ➜  ydevops-backend  cd /home/charles/ydevops-backend ; /usr/bin/env /home/charles/ydevops-backend/venv/bin/python /home/charles/.vscode-server/extensions/ms-python.python-2023.5.10791008/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher 41223 -- /home/charles/ydevops-backend/manage.py runserver 0.0.0.0:9000 
Watching for file changes with StatReloader
Performing system checks...

System check identified no issues (0 silenced).
March 26, 2023 - 18:50:30
Django version 4.1.7, using settings 'devops_backend.settings'
Starting development server at http://0.0.0.0:9000/
Quit the server with CONTROL-C.

访问接口文档 http://localhost:9000/apidoc/

在这个页面,我们可以做一些CRUD操作,如查看环境(由于数据量小,可以先把settings.py里的默认PAGE_SIZE改为1)

Okay,今天先到这吧...

展开阅读全文

页面更新:2024-03-23

标签:视图   接口   权限   文档   模块   对象   管理员   格式   方法   数据   平台

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top