Future异步编程应用

ForkJoin框架

ForkJoinPool的功能说明

`作者是并发编程的大佬:Doung Lea

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 执行ForkJoinTask的线程池
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
* ...
* @since 1.7
* @author Doug Lea
*/
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
/*
* Implementation Overview
* 这个类的简介
*
* This class and its nested classes provide the main
* functionality and control for a set of worker threads:
* 这个类和它的内部类实现并管理调度工作线程池的功能:
* Submissions from non-FJ threads enter into submission queues.
* 它把非ForkJoin线程中的任务放进一系列的任务提交队列中
* Workers take these tasks and typically split them into subtasks
* that may be stolen by other workers.
* 任务执行者拿到要执行的任务后,通常会把任务再分解成更细小的子任务,其他
* 执行者也可能会偷取任务执行。(工作窃取算法)
* Preference rules give
* first priority to processing tasks from their own queues (LIFO
* or FIFO, depending on mode), then to randomized FIFO steals of
* tasks in other queues.
* 优先从自己的工作队列(LIFO或者FIFO,默认是LIFO,)中获取任务执行,然后按照FIFO
* 的方式随机地从其他工作队列中窃取任务执行。
* This framework began as vehicle for
* supporting tree-structured parallelism using work-stealing.
* 这个框架是一种利用工作窃取算法实现树型结构数据的并行计算实现方式。
*/
}

ForkJoinPool的继承关系

ForkJoinPool实现了Executor和ExecutorService接口,说明ForkJoinPool是一个线程池管理类。

image-20181128153647554

ForJoinTask的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 在ForkJoinPool中执行的任务抽象基类,ForkJoinTask是一种比线程还要轻量级的类似线程
* 实体对象。在一定的限制条件下,ForkJoinPool可以使用一小部分线程执行大量的任务和子任务。
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

}

ForkJoinTask的继承关系

ForkJoinTask实现了Future接口,实现异步计算任务。Future接口能够把结果异步返回给调用者,把调用者线程从耗时的操作中解放出来,不需要等待异步接口返回结果就可以继续执行。

image-20181128185615070

ForkJoin调用逻辑

ForkJoin框架采用一种高效的双端队列deque(dubbo-ended queue),既有栈LIFO特性,又具有队列FIFO的特性。ForkJoin框架默认把查分的子任务按照LIFO的方式放到deque中,按照FIFO的方式从队列末尾随机从其他工作队列中窃取任务执行。

image-20181124090826646

ForkJoin怎么用

使用ForkJoinPool.execute(F t)ForkJoinPool.invoke(F t)ForkJoinPool.submit(C t)三种方式提交并行任务。

ForkJoinPool执行任务的三种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** 异步执行,无返回结果 **/
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
/** 同步,返回计算结果 (会阻塞)**/
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
/** 异步,有Future返回结果 **/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
externalPush(job);
return job;
}

ForkJoinPool构造方法

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//MAX_CAP = 32767
//创建一个跟计算机内核数相等的并行度,默认为LIFO的队列
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
//parallelism 并行度
//asyncMode ? FIFO_QUEUE : LIFO_QUEUE,设置任务队列的模式是FIFO还是LIFO
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}

ForkJoinTask定义

可以集成抽象类ForkJoinTask重写exec()抽象方法实现计算逻辑,也可以实现ForkJoinTask的子类例如RecursiveTask或者RecursiveAction抽象类实现异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class LongArrayCounterRecursiveTask extends RecursiveTask<Long>{

/**
* 要计算的Long数组
*/
private long[] nums;

/**
* 数组开始计算的开始下标
*/
private Integer start;

/**
* 数组开始计算的结束下标
*/
private Integer end;

/**
* 要计算数组区间内的元素加和,最小区间长度设置为3,大于3则拆分出子任务执行。
*/
private int min = 3;

public LongArrayCounterRecursiveTask(long[] nums, Integer start, Integer end) {
this.nums = nums;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if ((end-start)>min) {
LongArrayCounterRecursiveTask lCnt = new LongArrayCounterRecursiveTask(nums, start, (end+start) / 2);
/**
* 利用另一个ForkJoinPool异步执行lCnt子任务,而不是直接调用compute()进行计算
* fork调用后不需要等待结算结果返回,知道调用join方法,计算结果才会返回
*/
lCnt.fork();
LongArrayCounterRecursiveTask rCnt = new LongArrayCounterRecursiveTask(nums, (end + start) / 2 + 1, end);
/**
* 利用当前ForkJoinPool执行rCnt子任务
*/
Long rResult = rCnt.compute();
/**
* 等待lCnt子任务执行完,得到结果;
* 注意这一句的位置,.join()操作会使主线程阻塞等待子线程返回结果
*/
Long lResult = lCnt.join();
/**
* 返回两个结果集的和
*/
return rResult + lResult;
}
long sum = 0L;
for (int i = start; i < end+1; i++) {
sum +=nums[i];
}
return sum;
}
}

CompletableFuture异步操作

Completable提交执行任务

这样提交异步任务:Future futureResult = CompletableFutre.supplyAsync(Callable r,Executor e);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//asyncPool指定了默认ForkJoinPool作为Executor执行异步并行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

//指定一个Executor,自定义线程池大小。
//调整线程池的大小的原则
//线程池中线程的数量过多,会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。
//反之,如果线程的数目过少,处理器的一些核可能就无法充分利用。
// 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

在不提供执行异步任务的线程池参数Executor时,使用CompletableFuture通过下面这段代码创建线程池:ForkJoinPool.commonPool()创建一个计算机内核数Processors-1个线程大小的ForkJoinPool线程池,具有并行计算能力;或者是只有一个线程的普通线程池。

1
2
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

Strem流操作

Stream的sort、foreach、reduce、find操作底层实现也是依赖ForkJoin框架,Stream接口中提供了parallel()方法,可以把Stream转换成并行流,并行流使用ForkJoin框架进行计算。Stream没有提供修改执行任务的线程数大小的方法和参数,建议使用默认值(系统内核数),充分利用计算机CPU资源执行计算密集型操作。

Stream并行计算需谨慎:

Stream在进行并行计算时应避免元素之间的相互影响,处理当前元素时如果需要等待另一个元素的处理结果,那么并行处理反而会因为线程切换降低性能。通过大量的数据测试时保证Stream性能的重要手段。

parallel并行计算对性能的影响

执行并行计算不一定能提高性能,举例:

Stream.iterate产生流元素时上一个元素存在后才能产生下一个元素,如果使用并行流多线程产生元素时会出现线程等待正在产生元素的线程结束操作后才能继续执行,造成了不必要的线程切换和资源竞争,导致性能下降。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ForkJoinTest {

public static void main(String[] args) throws ParseException {

System.out.println("sequentialSum:"+new ForkJoinTest().measureSumPerf(ForkJoinTest::sequentialSum, 10000000)+"ms");
System.out.println("parallelSum:"+new ForkJoinTest().measureSumPerf(ForkJoinTest::parallelSum, 10000000)+"ms");

}

//sequential计算
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}

//parallel计算
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //转换成并行流
.reduce(0L, Long::sum);
}


、、
public long measureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
long start = System.currentTimeMillis();
long sum = adder.apply(n);
long duration = (System.currentTimeMillis() - start);
if (duration < fastest)
{
fastest = duration;
}
return fastest;
}

测试结果:

1
2
sequentialSum:297ms
parallelSum:3596msStream

Stream和CompletableFuture如何选择

引用JAVA8实战:

如果你进行的是计算密集型的操作,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的,使用Stream默认的处理器核数作为线程池大小,充分利用CPU资源。

如果并行工作单元是I/O密集型操作(包括文件、网络IO等),那么使用 CompletableFuture灵活性更好,可以根据最佳线程数计算公式设置线程数,充分利用内存资源。

Loop、ForkJoin、Stream计算性能测试

在计算量很小时三者区别不大,随着计算量的增大Stream的性能优势越明显,自己编码实现ForkJoin并行计算受制于对资源的Fork算法、Join算法和计算逻辑的最优化实现,执行性能会比Stream要差一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Test {


public static void main(String[] args) throws ParseException {
/**
* Loop Sequential Parrallel ForkJoin并行计算测试
* 数据源
*/
long[] longs = LongStream.rangeClosed(1L, 150000000L).toArray();

/**
* loop
*/
long startTime = System.currentTimeMillis();
long result = loopCompute(longs);
long end = System.currentTimeMillis();
System.out.println("loop计算结果:result:"+result+",耗时:"+(end-startTime)+"ms");

/**
* sequential
*/
LongStream longStream = LongStream.rangeClosed(1L, 150000000L);
startTime = System.currentTimeMillis();
result = longStream.reduce(0, Long::sum);
end = System.currentTimeMillis();
System.out.println("sequential stream计算结果:result:"+result+",耗时:"+(end-startTime)+"ms");

/**
* parallel
*/
longStream = LongStream.rangeClosed(1L, 150000000L);
startTime = System.currentTimeMillis();
result = longStream.parallel().reduce(0, Long::sum);
end = System.currentTimeMillis();
System.out.println(" parallel stream计算结果:result:"+result+",耗时:"+(end-startTime)+"ms");

/**
* forkjoin
*/
startTime = System.currentTimeMillis();
result = new ForkJoinPool().invoke(new LongArrayCounterRecursiveTask(longs, 0, longs.length-1));
System.out.println("forkjoin计算结果:result:"+result+",耗时:"+(System.currentTimeMillis()-startTime)+"ms");

}
}

执行结果:

1
2
3
4
loop计算结果:result:11250000075000000,耗时:132ms
sequential stream计算结果:result:11250000075000000,耗时:325ms
parallel stream计算结果:result:11250000075000000,耗时:69ms
forkjoin计算结果:result:11250000075000000,耗时:127ms

SpringFramework的@Async异步操作

@Async注解要发挥作用,首先要使用@EnableAsync启用异步计算。通过引入AsyncConfigurationSelector的AOP拦截实现方法异步执行。

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

}

@Async注解原理

@Async通过AOP实现代理对象的方法拦截实现异步执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 方法的参数可以是任何类型,但是返回值要限定在void或者Future类型,Future类型的返回值可以声明为更具体的
* ListenableFuture 或者 CompletableFuture,这样能够从异步任务得到更多交互状态和处理过程的细节。
* In terms of target method signatures, any parameter types are supported. However, the return * type is constrained to either void or Future. In the latter case, you may declare the more
* specific ListenableFuture or CompletableFuture types which allow for richer interaction with the
* asynchronous task and for immediate composition with further processing steps.
*
* AOP切面定义、MethodInterceptor实现代理对象的注解方法的异步执行
* See Also:
* AnnotationAsyncExecutionInterceptor, AsyncAnnotationAdvisor
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
//value指定要执行异步任务的Executor BeanName
String value() default "";
}

@Async AOP实现

AsyncConfigurationSelector实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}

ProxyAsyncConfiguration实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor()//这句是重点
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}

}

AsyncAnnotationBeanPostProcessor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
....
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);

AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);//这句是重点
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}

}

AsyncAnnotationAdvisor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
...
//构造切面
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);//这句是重点
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);//这句是重点
this.pointcut = buildPointcut(asyncAnnotationTypes);//这句是重点
}
//返回advice增强逻辑
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);//这句是重点
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

//返回切点
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}

}

AnnotationAsyncExecutionInterceptor实现

image-20181128234357038

AsyncExecutionInterceptor逻辑增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
} else {
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future)result).get();
}
} catch (ExecutionException var4) {
this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable var5) {
this.handleError(var5, userDeclaredMethod, invocation.getArguments());
}

return null;
};
return this.doSubmit(task, executor, invocation.getMethod().getReturnType());//这句是重点
}
}

AsyncExecutionAspectSupport异步任务执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
//这里使用CompletableFuture实现异步任务执行的逻辑
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Throwable var2) {
throw new CompletionException(var2);
}
}, executor);
} else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
} else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
} else {
executor.submit(task);
return null;
}
}
谢谢你请我吃糖果