准备继续巩固下知识,先更新一些关于java多线程的学习笔记
但是总觉得掌握得不扎实,所以重新来过,好事多磨嘛。
本篇准备学习的内容:
tip:本片偏向的应用,底层源码后在后面的博客更新
Fork/Join 是分而治之。
规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。内部可以理解为递归调用。
客官,先给您上菜
/**
* @author mark
*产生整形数组
*/
public class MakeArray {
//数组长度
public static final int LENGTH = 1000000;
public static int[] makeArray() {
//new一个随机数发生器
Random r = new Random();
int[] result = new int[LENGTH];
for(int i=0; i {
// 阈值
private final static int THRESHOLD = MakeArray.LENGTH/10;
//表示我们要实际统计的数组
private int[] data;
//开始统计的下标
private int fromIndex;
//统计到哪里结束的下标
private int toIndex;
public SumTask(int[] data, int fromIndex, int toIndex) {
this.data = data;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
/**
* 计算方法
* @return
* 1、规模没有超过阈值则直接计算
* 2、规模超过阈值则拆分为子任务
* invokeAll
* join
*/
@Override
protected Integer compute() {
if(toIndex-fromIndex < THRESHOLD) {
int count = 0;
for(int i=fromIndex; i<=toIndex; i++) {
count = count + data[i];
}
return count;
}else {
// 拆分逻辑 fromIndex....mid....toIndex
int mid = (fromIndex+toIndex)/2;
SumTask left = new SumTask(data, fromIndex, mid);
SumTask right = new SumTask(data,mid+1, toIndex);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
// 1、创建 forkJoinPool实例
ForkJoinPool pool = new ForkJoinPool();
int[] data = MakeArray.makeArray();
// 2、创建任务
SumTask task = new SumTask(data,0,data.length-1);
// 3、执行任务
pool.invoke(task);
// 4、获取结果
Integer result = task.join();
System.out.println(result);
}
}
总结下fork/join
首先,创建一个自己的任务执行类(继承RecursiveTask
),重写处理方法,这里面要注意阈值的处理及子任务的划分,当然这个要看具体的需求场景。这里两个重要的方法invokeAll RecursiveTask.join()。用来将任务执行和获取任务执行结果 然后我们可以进行fork/join的范式调用
1、创建 forkJoinPool实例
2、创建任务
3、执行任务
4、获取结果
CountDownLatch
CyclicBarrier
Semaphore
Exchange
…… …… …… …… …… …… …… …… …… ……∞’ …… …… …… …… …… …… …… …… …… ……∞’ …… …… …… …… …… ……
这里在业务开发中相对比较常用的可能是CountDownLatch 和 CyclicBarrier。
现在我们想想有一场短跑比赛。一共10个选手参赛。(这里有11个线程,10个选手线程和一个开始比赛线程)
只有当所有选手都到场的时候才能开始比赛。这个时候就可以使用CountDownLatch 。开始比赛线程必须等10个选手线程都完成才能开始。
现在换一个场景,玩moba类游戏,dota2、LOL啥的,都需要一个匹配,匹配成功后开始游戏。假设有10位玩家。匹配过程中所有玩家都必须加载完成才能继续进行后面的比赛。那么这里就可以使用CyclicBarrier。注意这里只有10个匹配线程,每个线程在加载完成的时候阻塞,当10个匹配线程都加载完成后,一起进入后面的比赛。
废话一堆,show me code
package concurrentClass.countdown;
import java.util.concurrent.CountDownLatch;
public class CutDownLatchTest {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2); // 构造方法,放入2个线程
new Thread(){
public void run() {
try {
System.out.println("选手子线程"+Thread.currentThread().getName()+"选手准备入场");
Thread.sleep(3000);
System.out.println("选手子线程"+Thread.currentThread().getName()+"选手到场");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("选手子线程"+Thread.currentThread().getName()+"选手准备入场");
Thread.sleep(3000);
System.out.println("选手子线程"+Thread.currentThread().getName()+"选手到场");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
// 使用await()方法让主线程等待子线程执行完后在执行
try {
latch.await();
System.out.println("选手到齐,开始比赛");
} catch (InterruptedException e) {
}
}
}
// 输出
选手子线程Thread-0选手准备入场
选手子线程Thread-1选手准备入场
选手子线程Thread-0选手到场
选手子线程Thread-1选手到场
选手到齐,开始比赛
package concurrentClass.cyclicBarrier;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("玩家" + Thread.currentThread().getName() + "加载完成");
Thread.sleep(1000);
c.await();
System.out.println("玩家" + Thread.currentThread().getName() + "时间:"
+ System.currentTimeMillis() +"准备进入比赛");
} catch (Exception e) {
}
System.out.println("阿杜跟");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("玩家" + Thread.currentThread().getName() + "加载完成");
Thread.sleep(3000);
c.await();
System.out.println("玩家" + Thread.currentThread().getName() + "时间:"
+ System.currentTimeMillis() + "准备进入比赛");
} catch (Exception e) {
}
System.out.println("哈撒剋");
}
}).start();
}
}
// 输出
玩家Thread-1加载完成
玩家Thread-0加载完成
玩家Thread-1时间:1618666778562准备进入比赛
玩家Thread-0时间:1618666778562准备进入比赛
哈撒剋
阿杜跟
package concurrentClass.semaphore;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i
两个线程间的数据交换(这个我接触很少)
package concurrentClass.exchange;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
public class ExchangeDemo {
private static final Exchanger> exchange
= new Exchanger>();
public static void main(String[] args) {
//第一个线程
new Thread(new Runnable() {
@Override
public void run() {
Set setA = new HashSet();//存放数据的容器
try {
/*添加数据
* set.add(.....)
* */
setA = exchange.exchange(setA);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
//第二个线程
new Thread(new Runnable() {
@Override
public void run() {
Set setB = new HashSet();//存放数据的容器
try {
/*添加数据
* set.add(.....)
* set.add(.....)
* */
setB = exchange.exchange(setB);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
}
}
尾巴 phaser类
这套工具可以获取子线程的结果信息。
package future;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureDemo {
/*实现Callable接口,允许有返回值*/
private static class UseCallable implements Callable{
private int sum;
@Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算");
Thread.sleep(2000);
for(int i=0; i<300; i++) {
sum = sum+i;
}
System.out.println("Callable子线程计算完成,结果="+sum);
return sum;
}
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();
FutureTask futureTask = new FutureTask(useCallable);
new Thread(futureTask).start();
Random r = new Random();
Thread.sleep(1000);
if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
System.out.println("Get UseCallable result = "+futureTask.get());
}else {
System.out.println("中断计算");
futureTask.cancel(true);
}
}
}
// 输出
Callable子线程开始计算
Callable子线程计算完成,结果=44850
Get UseCallable result = 44850
页面更新:2024-04-28
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号