OM | Q-Learning在运筹学中的应用


本文转载自公众号 阿泽的学习笔记(ID:aze_learning)

作者:阿泽crz


1.背景

假设有一个客服排班的任务,我们需要为 100 个人安排一个星期的排班问题,并且有以下约束条件:

一天被划分为 24 个时间段,即每个时间段为 1 个小时;每个客服一个星期需要上七天班,每次上班八小时(万恶的资本主义);每个客服两次上班时间需要间隔 12 小时。

为了简化模型(虽然已经很简化了),我们再一个约束:

客服值班时,一个星期最早是 0,最晚 24*7 - 1。

评判标准:

现在有每个时间段所需客服人数,我们希望每个时段排班后的人数与实际人数尽量相近。

最优化问题可以使用启发式算法来做,现在想尝试一下强化学习。

2.Q-Learning

Q-Learning 是强化学习中 value-based 的算法,也是几年前 AlphaGO 的亲妈 DeepMind 的成名算法 DQN 的前身(注意断句)。之所以不用 DQN 是因为我现在还不会。

简单介绍下 Q-Learning,需要看详细介绍可以去看之前的推文《深度强化学习:从入门到放弃》。

Q-Learning 中的 Q 为值函数 Q(s,a),表示为某一时刻状态 s 下,采取动作 a 所能获得的收益的期望,环境会根据 Agent 的动作反馈相应的回报,所以算法的主要思想就是将状态与行为构建成一张 Q-table 来存储 Q 值,然后根据 Q 值来选取能够获得最大的收益的动作。

Q-Tablea1a2s1Q(s1,a1)Q(s1,a2)s2Q(s2,a1)Q(s2,a2)s3Q(s3,a1)Q(s3,a2)

也就是说 Q-Learning 可以通过维护 Q-Table 来记录每个状态怎么走收益最大,那现在的问题便变成为如何维护 Q-Table。

Q-Learning 的伪代码如下:

OM | Q-Learning在运筹学中的应用

Q 值采用时间差分方法进行更新,当前 Q 值不仅与回报函数有关,还与下一个状态最大回报值有关。

【岔个题外话,俞军老师给出的产品价值公式:产品价值=(新体验-旧体验)-换用成本。可能有助于理解】

3.代码实战

以 10 个客服为例,介绍下思路:

初始化客服状态:每个客服都排在每天的 0 点 [0, 24, 48, 72, 96, 120, 144](注意上班八小时);Agent 可以移动某个客服某天的排班,左移或者右移,当然也可以不动。所以共有 141 个动作【10 人 * 7 天 * 2 动作 + 1(不动)】;将客服的排班结果视为一个状态;利用加权求和方法计算匹配程度并作为奖励,取符号保证递增性。

我们来看下伪代码:

import random
import numpy as np
from copy import deepcopy
from collections import defaultdict

person_n = 10  # 先排 10 个人的班
class QLearningAgent:
    """
    创建一个智能体
    """
    def __init__(self, actions):
        self.actions = actions
        self.learning_rate = 0.01  # 学习率
        self.discount_factor = 0.9  # 下一状态最好的值的折扣系数
        self.epsilon = 0.1  # 贪婪策略的系数
        # 10 个人,每个人有 2*7 个动作(7 个 bar 左右移动), 也可以不动 -1
        self.q_table = defaultdict(lambda: [0] * (person_n * 2 * 7 + 1))  
    
    def learn(self, state, action, reward, next_state):
        current_q = self.q_table[state][action]
        # 贝尔曼方程更新
        new_q = reward + self.discount_factor * max(self.q_table[next_state])
        self.q_table[state][action] += self.learning_rate * (new_q - current_q)
    
    # 从 Q-table 中选取动作 
    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            # 贪婪策略随机探索动作
            action = np.random.choice(self.actions)
        else:
            # 从q  表中选择
            state_action = self.q_table[state]
            action = self.arg_max(state_action)
        return action
    
    def arg_max(self, state_action):
        max_index_list = []
        max_value = state_action[0]
        for index, value in enumerate(state_action):
            if value > max_value:
                max_index_list.clear()
                max_value = value
                max_index_list.append(index)
            elif value == max_value:
                max_index_list.append(index)
        return random.choice(max_index_list)
class Env():
    """
    环境,用来评价当前状态
    """
    def __init__(self):
        # 10 个人, 7 天,每个 bar 都可以向左向右移动,也可以不移动 '-1'
        # action 是三个字符,例如:'01R',分别表示第 0 个客服,第 2 天的班右移一个时间段
        self.actions_space = ['{}{}L'.format(i, j) for i in range(person_n) for j in range(7)] + 
                                    ['{}{}R'.format(i, j) for i in range(person_n) for j in range(7)] + ['-1']
        self.n_actions = len(self.actions_space)
        # 实际需要的排班数 24*7 个格子
        self.act_list = [random.randint(int(person_n)/2, int(person_n)) for i in range(24*7)]
  # 计算每个格子的权重,用来计算加权匹配率
        self.w_list = [i / sum(self.act_list) for i in self.act_list]
        # 记录每个上班的时间,一个二维数组
        self.state = [[i*24 for i in range(7)] for i in range(person_n)]
        # 惩罚系数,用来增加一些约束。
        self.punish = -1000
        print(self.act_list)
    
    def list_2_str(self, l):
        return ''.join(str(j) for i in l for j in i)
    
    def reset(self):
        # 初始化
        self.state = [[i*24 for i in range(7)] for i in range(person_n)]
        return self.list_2_str(self.state)
    
    # 给当前排班打分,考虑权重
    def reward(self):
        # 判断每个人的排班要间隔 8+12 小时,否则 socre = -1000
        for i in range(person_n):
            # 星期天和星期一的排班间隔 8+12 小时
            if (self.state[i][0] + (24*7-1) - self.state[i][6]) < 20:
                return self.punish
            for j in range(6):
                if (self.state[i][j+1] - self.state[i][j]) < 20:
                    return self.punish
        # 拼接完整的 list,之前只是记录了每个人的上班时间,现在需要统计排版后每个时间断安排的人数
        state_list = [[0] * 24 * 7] * person_n
        for person in range(person_n):
            for i in self.state[person]:
                for j in range(8):
                    state_list[person][i+j] = 1
        plan_list = np.sum(state_list, axis=0).tolist()
        s_list = [abs(plan_list[i] - self.act_list[i])/self.act_list[i] for i in range(len(plan_list))]
        # 奖励越大越好,所以加个负号
        score = -np.sum([s_list[i]*self.w_list[i] for i in range(len(s_list))])
        return score
    
    def step(self, action):
        # 执行动作
        actions_str = self.actions_space[action]
        if actions_str == '-1':
            # -1 就不执行了
            return self.list_2_str(self.state), self.reward()
        else:
            num = int(actions_str[0])
            day = int(actions_str[1])
            move = actions_str[2]
            if move == 'R':
                # 如果已经在最右边了就不要右移了
                if self.state[num][day] == (24*7-8-1):
                    bad_state = deepcopy(self.state)
                    bad_state[num][day] = bad_state[num][day] + 1
                    return self.list_2_str(bad_state), self.punish
                self.state[num][day] = self.state[num][day] + 1
            if move == 'L':
                # 在最左边的也不要左移了
                if self.state[num][day] == 0:
                    bad_state = deepcopy(self.state)
                    bad_state[num][day] = bad_state[num][day] - 1
                    return self.list_2_str(bad_state), self.punish
                self.state[num][day] = self.state[num][day] - 1
            # 返回状态和奖励值
            return self.list_2_str(self.state), self.reward()
# 分别创建智能体和环境
env = Env() 
agent = QLearningAgent(actions=list(range(env.n_actions)))

state = env.reset()
bst_reward = -500  # 记录最好的奖励值和状态
bst_state = state
for i in range(1000000): # 迭代吧
    action = agent.get_action(str(state))  # agent 产生动作
    next_state, reward = env.step(action)  # 计算当前状态产生该动作后的下一个状态及其奖励值
    agent.learn(state, action, reward, next_state)  # 让贝尔曼方程学习一下
    if reward < -500:  # 奖励值太小就不要用这个状态了
        continue
    state = next_state
    if bst_reward < reward:
        bst_reward = reward
        bst_state = deepcopy(env.state)
        print(i, bst_reward)
实际排班需求 [10, 10, 7, 7, 6, 5, 9, 10, 7, 6, 8, 6, 10, 5, 10, 8, 6, 6, 10, 5, 9, 9, 7, 10, 6, 5, 6, 7, 8, 8, 10, 8, 5, 10, 7, 6, 6, 10, 8, 9, 7, 6, 5, 9, 9, 8, 8, 7, 5, 6, 9, 6, 7, 6, 9, 7, 9, 7, 6, 7, 6, 8, 6, 6, 9, 9, 8, 6, 5, 5, 6, 7, 5, 8, 7, 5, 8, 9, 10, 10, 6, 8, 10, 9, 5, 10, 7, 10, 8, 8, 7, 9, 8, 9, 7, 5, 9, 7, 7, 6, 7, 7, 10, 9, 9, 10, 7, 10, 5, 7, 6, 5, 10, 10, 10, 6, 8, 9, 7, 9, 5, 6, 6, 10, 8, 10, 5, 8, 8, 8, 9, 8, 9, 9, 6, 6, 9, 9, 9, 10, 6, 6, 5, 9, 8, 7, 6, 10, 10, 9, 7, 8, 10, 8, 8, 9, 6, 6, 7, 8, 7, 7, 6, 6, 10, 8, 9, 8]
0 -0.7716966379984362
3 -0.7670054730258014
4 -0.7638780297107115
7 -0.7607505863956215
... ...
519487 -0.34480062548866297
520907 -0.34323690383111805
562333 -0.33229085222830335
print(bst_reward)  # 打印出来看一下
bst_state
-0.33229085222830335
[[8, 28, 49, 80, 103, 123, 143],
 [1, 23, 44, 65, 85, 108, 141],
 [15, 38, 58, 80, 108, 128, 153],
 [11, 31, 52, 73, 97, 133, 158],
 [5, 27, 48, 72, 94, 115, 135],
 [9, 31, 53, 79, 101, 131, 152],
 [4, 31, 51, 72, 93, 116, 138],
 [8, 28, 48, 72, 99, 120, 152],
 [9, 31, 52, 73, 96, 124, 148],
 [5, 26, 52, 72, 92, 124, 146]]

至此,该简化版的排版调度问题便得到了解决。

当然,实际问题远没有得到解决,毕竟实际问题还要考虑客服休息、客服技能等一大堆约束条件。

关于 DQN 的解决方案将在未来不久进行介绍。

展开阅读全文

页面更新:2024-06-14

标签:贝尔   客服   权重   时间段   系数   间隔   算法   收益   人数   状态   动作   小时   代码   环境   时间   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top