java多线程- 工具类

准备继续巩固下知识,先更新一些关于java多线程的学习笔记

但是总觉得掌握得不扎实,所以重新来过,好事多磨嘛。


本篇准备学习的内容:

tip:本片偏向的应用,底层源码后在后面的博客更新

Fork/Join

Fork/Join 是分而治之。

规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。内部可以理解为递归调用。

客官,先给您上菜

/**
 * @author mark
 *产生整形数组
 */
public class MakeArray {
    //数组长度
    public static final int LENGTH  = 1000000;

    public static int[] makeArray() {

        //new一个随机数发生器
        Random r = new Random();
        int[] result = new int[LENGTH];
        for(int i=0; i {
        // 阈值
        private final static int THRESHOLD = MakeArray.LENGTH/10;
        //表示我们要实际统计的数组
        private int[] data;
        //开始统计的下标
        private int fromIndex;
        //统计到哪里结束的下标
        private int toIndex;

        public SumTask(int[] data, int fromIndex, int toIndex) {
            this.data = data;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        /**
         * 计算方法
         * @return
         *   1、规模没有超过阈值则直接计算
         *   2、规模超过阈值则拆分为子任务
         *          invokeAll
         *          join
         */
        @Override
        protected Integer compute() {
            if(toIndex-fromIndex < THRESHOLD) {
                int count = 0;
                for(int i=fromIndex; i<=toIndex; i++) {
                    count = count + data[i];
                }
                return count;
            }else {
                // 拆分逻辑 fromIndex....mid....toIndex
                int mid = (fromIndex+toIndex)/2;
                SumTask left = new SumTask(data, fromIndex, mid);
                SumTask right = new SumTask(data,mid+1, toIndex);
                invokeAll(left, right);
                return left.join() + right.join();
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {

        // 1、创建 forkJoinPool实例
        ForkJoinPool pool = new ForkJoinPool();
        int[] data = MakeArray.makeArray();
        // 2、创建任务
        SumTask task = new SumTask(data,0,data.length-1);
        // 3、执行任务
        pool.invoke(task);
        // 4、获取结果
        Integer result = task.join();
        System.out.println(result);
    }
}

总结下fork/join

首先,创建一个自己的任务执行类(继承RecursiveTask),重写处理方法,这里面要注意阈值的处理及子任务的划分,当然这个要看具体的需求场景。这里两个重要的方法invokeAll RecursiveTask.join()。用来将任务执行和获取任务执行结果

然后我们可以进行fork/join的范式调用

1、创建 forkJoinPool实例

2、创建任务

3、执行任务

4、获取结果

并发工具类

CountDownLatch

CyclicBarrier

Semaphore

Exchange

…… …… …… …… …… …… …… …… …… ……∞’ …… …… …… …… …… …… …… …… …… ……∞’ …… …… …… …… …… ……

这里在业务开发中相对比较常用的可能是CountDownLatch 和 CyclicBarrier。

现在我们想想有一场短跑比赛。一共10个选手参赛。(这里有11个线程,10个选手线程和一个开始比赛线程)

只有当所有选手都到场的时候才能开始比赛。这个时候就可以使用CountDownLatch 。开始比赛线程必须等10个选手线程都完成才能开始。

现在换一个场景,玩moba类游戏,dota2、LOL啥的,都需要一个匹配,匹配成功后开始游戏。假设有10位玩家。匹配过程中所有玩家都必须加载完成才能继续进行后面的比赛。那么这里就可以使用CyclicBarrier。注意这里只有10个匹配线程,每个线程在加载完成的时候阻塞,当10个匹配线程都加载完成后,一起进入后面的比赛。

废话一堆,show me code

package concurrentClass.countdown;

import java.util.concurrent.CountDownLatch;

public class CutDownLatchTest {

    public static void main(String[] args) {

        final CountDownLatch latch = new CountDownLatch(2); // 构造方法,放入2个线程

        new Thread(){
            public void run() {
                try {
                    System.out.println("选手子线程"+Thread.currentThread().getName()+"选手准备入场");
                    Thread.sleep(3000);
                    System.out.println("选手子线程"+Thread.currentThread().getName()+"选手到场");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread(){
            public void run() {
                try {
                    System.out.println("选手子线程"+Thread.currentThread().getName()+"选手准备入场");
                    Thread.sleep(3000);
                    System.out.println("选手子线程"+Thread.currentThread().getName()+"选手到场");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        // 使用await()方法让主线程等待子线程执行完后在执行
        try {
            latch.await();
            System.out.println("选手到齐,开始比赛");
        } catch (InterruptedException e) {

        }
    }
}

// 输出
选手子线程Thread-0选手准备入场
选手子线程Thread-1选手准备入场
选手子线程Thread-0选手到场
选手子线程Thread-1选手到场
选手到齐,开始比赛    
package concurrentClass.cyclicBarrier;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("玩家" + Thread.currentThread().getName() + "加载完成");
                    Thread.sleep(1000);
                    c.await();
                    System.out.println("玩家" + Thread.currentThread().getName() + "时间:"
                            + System.currentTimeMillis() +"准备进入比赛");
                } catch (Exception e) {

                }
                System.out.println("阿杜跟");
            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("玩家" + Thread.currentThread().getName() + "加载完成");
                    Thread.sleep(3000);
                    c.await();
                    System.out.println("玩家" + Thread.currentThread().getName() + "时间:"
                            + System.currentTimeMillis() + "准备进入比赛");
                } catch (Exception e) {

                }
                System.out.println("哈撒剋");
            }
        }).start();
    }
}

// 输出
    
玩家Thread-1加载完成
玩家Thread-0加载完成
玩家Thread-1时间:1618666778562准备进入比赛
玩家Thread-0时间:1618666778562准备进入比赛
哈撒剋
阿杜跟
package concurrentClass.semaphore;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {

    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i

两个线程间的数据交换(这个我接触很少)

package concurrentClass.exchange;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;

public class ExchangeDemo {
    private static final Exchanger> exchange 
        = new Exchanger>();

    public static void main(String[] args) {

        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set setA = new HashSet();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * */
                    setA = exchange.exchange(setA);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set setB = new HashSet();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    setB = exchange.exchange(setB);//交换set
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

尾巴 phaser类

Callable、Future和FutureTask

这套工具可以获取子线程的结果信息。

package future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureDemo {
    
    /*实现Callable接口,允许有返回值*/
    private static class UseCallable implements Callable{

        private int sum;
        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for(int i=0; i<300; i++) {
                sum = sum+i;
            }
            System.out.println("Callable子线程计算完成,结果="+sum);
            return sum;
        }

    }
    
    public static void main(String[] args) 
            throws InterruptedException, ExecutionException {
        
        UseCallable useCallable = new UseCallable();
        FutureTask futureTask = new FutureTask(useCallable);
        new Thread(futureTask).start();
        Random r = new Random();
        Thread.sleep(1000);
        if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
            System.out.println("Get UseCallable result = "+futureTask.get());
        }else {
            System.out.println("中断计算");
            futureTask.cancel(true);
        }
        
    }
}


// 输出
Callable子线程开始计算
Callable子线程计算完成,结果=44850
Get UseCallable result = 44850



页面更新:2024-04-28

标签:递归   分而治之   阈值   工具   随机数   客官   好事多磨   小规模   数组   发生器   线程   对立   容器   接口   数据   科技

1 2 3 4 5

上滑加载更多 ↓
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top