线程池实时管理与监控工具的实现与思考


0. 前言

Java线程池作为最常使用到的并发工具,大多数同学都有使用的经验,但你确定你是在正确地使用线程池吗?阿里Java代码规范要求我们不使用 Executors来快速创建线程池,但是抛弃Executors,使用其它方式创建线程池就一定不会出现问题吗?本文详细描述了一款Java线程池动态管理和实时监控插件的开发过程,希望大家对线程池有新的认识,主要内容如下:

1. 问题

Spring提供了非常友好的@Async注解帮助我们快速方便构造异步线程池,默认情况下Spring Async使用SimpleAsyncTaskExecutor来处理线程,本质上SimpleAsyncTaskExecutor不算线程池,每次方法调用都会创建新的线程,但是它提供了限流机制,通过concurrencyLimit属性来控制限流的开启(>=0:开启,-1:关闭,默认值为-1),所以通常情况下我们需要自定义线程池配置:

 @EnableAsync  
 @Configuration  
 public class SpringAsyncConfig { 
    @Bean(name = "commonExecutor")
    public Executor commonExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("CommonExecutor-");
        taskExecutor.initialize();
        return taskExecutor;
    }
} 

@Async("commonExecutor")
public void doSth() {...}

于是在项目中我们这样使用线程池来进行异步化处理

@Async("commonExecutor")
public void serviceA() {...}

@Async("commonExecutor")
public void serviceB() {...}

@Async("commonExecutor")
public void serviceC() {...}

在实际应用当中,这是一个十分危险的操作方式,在线上运行过程中,serviceA的QPS远高于serviceB和serviceC, 这样导致线程池被A大量占用,B和C的请求直接被reject或者因为长时间排队而超时。

线程池实时管理与监控工具的实现与思考

2. 思考

针对上述问题,团队做了如下思考:

2.1. 如何针对不同的业务接口使用相互隔离的线程池?

这个问题让我们很容易联想到Hystrix的资源隔离,同样是使用线程池技术来实现的,hystrix资源隔离的原理图如下图2所示:

线程池实时管理与监控工具的实现与思考

image

在上图中,线程池A用来处理对service1和service2的请求,线程池B处理service3的请求,线程池C处理service4的请求,Hystrix通过@HystrixCommand( threadPoolKey="xxx" ) 指定当前HystrixCommand实例的threadPoolKey,相同threadPoolKey的方法将使用相同的线程池实例,为了让大家聚焦到线程池本身,这里不再对hystrix线程隔离的具体原理进行说明。类似的处理方式,我们在Spring Async 中通过配置不同的线程池实例来实现:

 @EnableAsync  
 @Configuration  
 public class SpringAsyncConfig { 
    @Bean(name = "executorA")
    public Executor executorA() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorA-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean(name = "executorB")
    public Executor executorB() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(8);
        taskExecutor.setQueueCapacity(20);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorB-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean(name = "executorC")
    public Executor executorC() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(40);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorC-");
        taskExecutor.initialize();
        return taskExecutor;
    }
} 

我们在不同的业务方法中国呢使用@Async注解传入不同的实例name来使用不同的线程池实例:

@Async("executorA")
public void doSthA() {...}

@Async("executorB")
public void doSthB() {...}

@Async("executorC")
public void doSthC() {...}

当然,在实际开发过程当中,不可能每个业务接口都单独使用一个线程池,否则会引入额外的开销。我们要根据实际的业务情况来进行线程池资源的规划。

2.2. 如何合理的配置线程池参数?

公式1:Nthreads = Ncpu [图片上传失败...(image-dae170-1626021098210)]

Ucpu [图片上传失败...(image-fc21e6-1626021098210)]

[图片上传失败...(image-ed4928-1626021098210)]

其中:
Ncpu = cpu的核心数 ,Ucpu = cpu的利用率
W = 线程等待时间,C = 线程计算时间

此方案偏理论化,cpu的实际利用率(即分配多少cpu给线程池使用)和线程的计算,等待时间非常难评估,并且最后计算出来的结果也很容易偏离实际应用场景。

公式2:coreSize = 2 [图片上传失败...(image-53f63b-1626021098210)]

Ncpu , maxSize = 25 [图片上传失败...(image-fbf933-1626021098210)]

Ncpu

实际使用过程中不同的业务对线程池的需求不一样,所以统一采用cpu核心数来配置显然不太合理

公式3:coreSize = tps [图片上传失败...(image-ec6e9d-1626021098209)]

time , maxSize = tps [图片上传失败...(image-62a52a-1626021098209)]

time [图片上传失败...(image-593465-1626021098209)]

(1.7~2)

此种计算方式考虑到了实际的业务情况,引入了TPS和执行时间,假定每个接口的流量分配是平均的情况下是比较合理的,但是实际情况是接口的流量是随机的,在业务低峰期很低,在业务高峰期很高,如果按平均tps去配置线程池,可能没法在业务高峰期扛住系统的压力;按峰值来配置的话,在大多数时候线程池都是空闲了,增加了系统的开销

综上,不管哪种计算方式都无法准确地评估线程池的规模,我们需要找到一种比较灵活的配置方式。

2.3. 如何动态的对线程池参数进行调整?

有没有办法动态的调整线程池的各项参数,要解决这个问题,我们需要对Java线程池的原理有深入的了解。下图3是Java线程池创建工作线程Worker的流程图:

线程池实时管理与监控工具的实现与思考

image

图3描述了一个线程池执行一个工作任务的基本流程,具体原理这里不再赘述,这里聊一下自己对JAVA线程池模型的一些思考:

那么,Java线程池是如何对线程的容量进行管理的?对应的源码如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

通过源码可以发现,ThreadPoolExecutor使用一个32位的AtomicInteger类型来同时标识线程池的运行状态和线程数量,然后通过位运算的左移操作来修改状态值,这是一种非常巧妙的算法设计,具体计算过程如下图所示:

线程池实时管理与监控工具的实现与思考

当需要修改工作线程的数量时,通过CAS(Compare And Swap)操作来保证并发安全,CAS使用JVM底层Unsafe提供的API来直接修改变量的值,类似于乐观锁的机制。

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

下面对线程池的几个重要参数进行说明,部分细节会引用线程池的源码。

核心线程数:corePoolSize , 最大线程数:maxPoolSize

最大线程数量: maximumPoolSize

线程池会保证所有可用状态的Worker线程的数量不会超过此最大值

工作队列: workQueue

当工作任务的数量超过coreSize时会将新创建的Worker对象放入等待队列,队列的实现可以自己选择,可选的实现有SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue等,具体原理本文不再详述,请参考文章末尾引用的文档。

非核心线程所允许的空闲时间: keepAliveTime

当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到超过了keepAliveTime。

拒绝策略执行器: RejectedExecutionHandler

表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

那我们有没有办法在线程池运行过程当中,动态的修改corePoolSize和maxPoolSize的值呢,以setCorePoolSize方法为例,我们看一下ThreadPoolExecutor的源码:

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

通过源码可以看出,当新设置的corePoolSize的值大于当前值时,会按照等待队列中的任务数量来创建新的工作线程;当新设置的corePoolSize小于当前工作线程时,则会调用interruptIdleWorkers方法来中断空闲的工作线程,我们继续看一下interruptIdleWorkers()的源码:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

interruptIdleWorkers方法遍历workers集合中所有的工作线程,如果通过tryLock获取锁成功,就中断该线程。
这里为什么需要使用mainLock?因为workers是HashSet类型的,不能保证线程安全。我们再来看看其它线程池参数的set方法:

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

发现都会调用interruptIdleWorkers()方法来对线程池的容量进行调节,所以结论是我们可以在线程池运行的过程中实时的对线程池的规模重新进行调节,并且是安全可靠的。

3. 实现

经过充分的调研,我们决定实现一个非常轻量级的动态创建线程池SpringBoot插件,主要功能如下图所示:

线程池实时管理与监控工具的实现与思考

3.1 配置管理

将线程池的基本参数存储在Apollo配置中心和MySQL,可以很方便地进行配置的修改操作(第一个版本只支持Apollo配置中心,后续会提供MySQL的支持),Apollo配置参考如下:

#DataMonitor 监控业务报表数据
naughty.threadpools.executors[0].corePoolSize = 10
naughty.threadpools.executors[0].maximumPoolSize = 30
naughty.threadpools.executors[0].keepAliveTime = 300
naughty.threadpools.waitRefreshConfigSeconds = 10
naughty.threadpools.executors[0].threadPoolName = DataMonitor
naughty.threadpools.executors[0].queueCapacity = 5
naughty.threadpools.executors[0].rejectedExecutionType = AbortPolicy

#ExposureExecutor 用户准入接口异步调用
naughty.threadpools.executors[1].threadPoolName = ExposureExecutor
naughty.threadpools.executors[1].queueCapacity = 1
naughty.threadpools.executors[1].rejectedExecutionType = CallerRunsPolicy
naughty.threadpools.executors[1].corePoolSize = 5
naughty.threadpools.executors[1].maximumPoolSize = 20
naughty.threadpools.executors[1].keepAliveTime = 300

3.2 配置监听

我们利用Apollo的ChangeListener来实现对配置变更的监听,(如果是MySQL,可以修改完配置后直接同过HTTP接口通知客户端进行配置刷新),代码片段如下:

public class ThreadPoolConfigUpdateListener {

    @Value("${apollo.bootstrap.namespaces:application}")
    private String namespace;

    @Autowired
    private DynamicThreadPoolFacade dynamicThreadPoolManager;

    @Autowired
    private DynamicThreadPoolProperties poolProperties;

    @PostConstruct
    public void init() {
        initConfigUpdateListener();
    }

    public void initConfigUpdateListener() {
        String apolloNamespace = namespace;
        if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
            apolloNamespace = poolProperties.getApolloNamespace();
        }
        String finalApolloNamespace = apolloNamespace;
        Config config = ConfigService.getConfig(finalApolloNamespace);
        config.addChangeListener(changeEvent -> {
            try {
                Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
            } catch (InterruptedException e) {
                log.error("配置刷新异常",e);
            }
            dynamicThreadPoolManager.refreshThreadPoolExecutor();
            log.info("线程池配置有变化,刷新完成");
        });
    }

}

线程池配置的刷新的逻辑简单描述如下:

    public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
            dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
                NaughtyThreadPoolTaskExecutor executor = getExecutor(poolProperties.getThreadPoolName());
                if (executor == null) {
                    executor = new NaughtyThreadPoolTaskExecutor();

                    managerExecutor(executor, poolProperties);
                    executor.setBlockingQueue(getBlockingQueue(poolProperties.getQueueType(), poolProperties.getQueueCapacity()));

                    executor.initialize();
                    //将new出的对象放入Spring容器中
                    defaultListableBeanFactory.registerSingleton(poolProperties.getThreadPoolName(), executor);
                    //自动注入依赖
                    autowireCapableBeanFactory.autowireBean(executor);
                }else{
                    managerExecutor(executor, poolProperties);
                    BlockingQueue queue = executor.getThreadPoolExecutor().getQueue();
                    if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
                        ((ResizableCapacityLinkedBlockIngQueue) queue).setCapacity(poolProperties.getQueueCapacity());
                    }
                }

        });
    }

    private void managerExecutor(NaughtyThreadPoolTaskExecutor executor, ThreadPoolProperties poolProperties) {
        try {
            if (executor!=null) {
                executor.setBeanName(poolProperties.getThreadPoolName());
                executor.setCorePoolSize(poolProperties.getCorePoolSize());
                executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
                executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
                executor.setRejectedExecutionHandler(this.getRejectedExecutionHandler(poolProperties.getRejectedExecutionType(), poolProperties.getThreadPoolName()));
                executor.setThreadPoolName(poolProperties.getThreadPoolName());
            }
        }catch(Exception e){
            log.error("Executor 参数设置异常",e);
        }
    }

3.3 状态监控

ThreadPoolExecutor提供了beforeExecute, afterExecute 等钩子方法,我们可以可以在钩子方法中对线程池任务的执行时间上报CAT,代码片段如下:

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        String threadName = Thread.currentThread().getName();
        Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
        transactionMap.put(threadName, transaction);
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        String threadName = Thread.currentThread().getName();
        Transaction transaction = transactionMap.get(threadName);
        transaction.setStatus(Message.SUCCESS);
        if (t != null) {
            Cat.logError(t);
            transaction.setStatus(t);
        }
        transaction.complete();
        transactionMap.remove(threadName);
    }

对应的CAT监控数据如下图所示:

线程池实时管理与监控工具的实现与思考

image

通过使用CAT的StatusExtension,可以定时将线程池的运行时状态数据发送到CAT并生成柱状图,相关实现代码如下:

    public StatusExtension registerStatusExtension(ThreadPoolProperties prop, Object object) {
        NaughtyThreadPoolTaskExecutor executor = (NaughtyThreadPoolTaskExecutor) object;
        StatusExtension statusExtension =  new StatusExtension() {
            @Override
            public String getId() {
                return "thread.pool.info." + prop.getThreadPoolName();
            }

            @Override
            public String getDescription() {
                return "线程池监控";
            }

            @Override
            public Map getProperties() {
                AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());

                Map pool = new HashMap<>();
              pool.put("activeCount", String.valueOf(executor.getActiveCount()));
                    pool.put("keepAliveTime", String.valueOf(executor.getKeepAliveSeconds()));
                    int coreSize = executor.getCorePoolSize();
                    int maxSize = executor.getMaxPoolSize();
                    if (coreSize!=0){
                        pool.put("active/core", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(coreSize)));
                    }
                    if (maxSize!=0){
                        pool.put("active/max", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(maxSize)));
                    }
                    pool.put("coreSize", String.valueOf(executor.getCorePoolSize()));
                    pool.put("maxSize", String.valueOf(executor.getMaxPoolSize()));
                    ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
                    pool.put("completedTaskCount", String.valueOf(threadPoolExecutor.getCompletedTaskCount()));
                    pool.put("largestPoolSize", String.valueOf(threadPoolExecutor.getLargestPoolSize()));
                    pool.put("taskCount", String.valueOf(threadPoolExecutor.getTaskCount()));
                    pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
                    pool.put("queueSize", String.valueOf(threadPoolExecutor.getQueue().size()));
                return pool;
            }
        };
        StatusExtensionRegister.getInstance().register(statusExtension);
        return statusExtension;
    }

各项监控指标的说明如下:(以下部分观点可能需要经过进一步的验证,仅供大家参考)

实际生产环境的线程池状态监控如下图所示:

线程池实时管理与监控工具的实现与思考

image

展望

项目在使用线程池监控插件以后,获得了如下收益:

未来我们会考虑进行后续版本的迭代,尝试加入以下功能:

展开阅读全文

页面更新:2024-04-26

标签:线程   比值   队列   实时   源码   实例   接口   数量   状态   核心   参数   方式   业务   工具   方法   动态   工作   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top