我们通常都是开启一个新的子线程去执行比较耗时的代码,这使用起来非常简单,只需要将耗时的代码封装在Runnable中的run()方法里面,然后调用thread.start()就行。
但是我相信很多人有时候都有这样的需求,就是获取子线程运行的结果,
比如客户端远程调用服务(耗时服务),我们有需要得到该调用服务返回的结果,这该怎么办呢?很显然子线程运行的run()方法是没有返回值。这个时候Future的作用就发挥出来了。

Future如何使用能够获取子线程运行的结果呢?在这里顺便提一下Callable接口,Callable产生结果,Future获取结果。如何使用他们两个来获取子线程的运行结果呢?我们先来看个简单的例子。
package test; import java.util.concurrent.Callable; import
java.util.concurrent.ExecutionException; import
java.util.concurrent.FutureTask; public class CallableFutureTest { public
static void main(String[] args) { long startTime = System.currentTimeMillis();
Callable<Integer> calculateCallable = new Callable<Integer>() { @Override
public Integer call() throws Exception { // TODO Auto-generated method stub
Thread.sleep(2000);//模拟耗时时间 int result = 1+2; return result; } };
FutureTask<Integer> calculateFutureTask = new FutureTask<>(calculateCallable);
Thread t1 = new Thread(calculateFutureTask); t1.start();
//现在加入Thread运行的是一个模拟远程调用耗时的服务,并且依赖他的计算结果(比如网络计算器) try {
//模拟耗时任务,主线程做自己的事情,体现多线程的优势 Thread.sleep(3000); int a = 3+5; Integer result =
calculateFutureTask.get(); System.out.println("result =
"+(a+result));//模拟主线程依赖子线程的运行结果 long endTime = System.currentTimeMillis();
System.out.println("time = "+(endTime-startTime)+"ms"); } catch
(InterruptedException | ExecutionException e) { // TODO Auto-generated catch
block e.printStackTrace(); } } }
 运行结果如下:

 

 

从上面可以看到上面耗时大概是3s,其实主要就是主线程sleep(3000)所耗费的时间,如果不使用Future,并且依赖线程的结果,我们可能需要的时间可能是需要5s(子线程2s+主线程3s)。接下来我们从源码的角度理解上述代码工作的原理。

1,先看看类的FureTask类的关系图

 

 

FutureTask实现了RunnableTask接口,RunnableTask继承了Runnable和Future接口(接口是支持多继承的)

 

2,t1.start()执行的是FutureTask类的run()方法,看下FutureTask.run()方法源码
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this,
runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c =
callable;//构造方法传入的calculateCallable if (c != null && state == NEW) { V result;
boolean ran; try { result = c.call();//最终执行的是callablede.call()方法,有返回值的 ran =
true; } catch (Throwable ex) { result = null; ran = false;
setException(ex);//保存抛出的异常 } if (ran) set(result);//保存执行的结果 } } finally { //
runner must be non-null until state is settled to // prevent concurrent calls
to run() runner = null; // state must be re-read after nulling runner to
prevent // leaked interrupts int s = state; if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s); } }
 

3,执行的Callable.call(),即执行了calculateCallable .call(),得到call()返回的结果

接下来看看,如何将执行的结果保存起来,然后方便Future获取到,那就是调用set(result)方法

4,看看set(result)方法
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
COMPLETING)) { outcome = v;//将result赋值给outcome UNSAFE.putOrderedInt(this,
stateOffset, NORMAL); // final state finishCompletion(); } } /** * Removes and
signals all waiting threads, invokes done(), and * nulls out callable. */
//这个方法,就是当执行完成完成的时候,唤醒get()方法挂起的线程,从而使得get()方法在阻塞 //的for循环中能够正确的获取执行完的结果
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode
q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this,
waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) {
q.thread = null; LockSupport.unpark(t);//唤醒线程 } WaitNode next = q.next; if
(next == null) break; q.next = null; // unlink to help gc q = next; } break; }
} done(); callable = null; // to reduce footprint }
 

最终FutureTask中的outcome变量为执行的结果

 

5,接下来看FutureTask.get()方法如何获取执行完的结果
//get方法表示如果执行过程完成,就获取执行的结果,否则就将当前线程挂起 public V get() throws
InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING)
s = awaitDone(false, 0L);//如果状态没完成 return report(s);//返回执行完的结果 } //这里是执行的状态
private volatile int state; private static final int NEW = 0; private static
final int COMPLETING = 1; private static final int NORMAL = 2; private static
final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private
static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q =
null; boolean queued = false; for (;;) {//堵塞的方法 if (Thread.interrupted()) {
removeWaiter(q); throw new InterruptedException(); } int s = state; if (s >
COMPLETING) { if (q != null) q.thread = null; return s; } else if (s ==
COMPLETING) // cannot time out yet Thread.yield();//将线程挂起,让出cpu资源 else if (q ==
null) q = new WaitNode(); else if (!queued) queued =
UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if
(timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) {
removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else
LockSupport.park(this); } } private V report(int s) throws ExecutionException {
Object x = outcome; if (s == NORMAL) return (V)x;//返回结果 if (s >= CANCELLED)
throw new CancellationException(); throw new ExecutionException((Throwable)x); }
通过上面的几个方法看出,FutureTask.get()是一个阻塞的方法,直到运行完成之后,返回线程执行的结果。

 上述中出现很多状态的常量(NEW、COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING
、INTERRUPTED)这些状态表示执行的过程,一般执行的过程转变如下:

1)一个FutureTask新建出来,state就是NEW状态;COMPETING和INTERRUPTING用的进行时,表示瞬时状态,存在时间极短;NORMAL代表顺利完成;EXCEPTIONAL代表执行过程出现异常;CANCELED代表执行过程被取消;INTERRUPTED被中断

2)执行过程顺利完成:NEW -> COMPLETING -> NORMAL

3)执行过程出现异常:NEW -> COMPLETING -> EXCEPTIONAL

4)执行过程被取消:NEW -> CANCELLED

5)执行过程中,线程中断:NEW -> INTERRUPTING -> INTERRUPTED

另外FutureTask中还有其他的方法,如cancel()-取消,isDone()-判断是否执行完,isCancelled()-判断执行是否取消等,感兴趣的可以自己去看相应的源码

 

 

 

技术
©2019-2020 Toolsou All rights reserved,
13.解决 git 合并冲突Linux页面置换算法的C语言实现LinkedHashMap基本用法&使用实现简单缓存1190 反转每对括号间的子串 leetcode一个猜数字的小游戏,用JavaScript实现 Chrome OS,对程序员和Windows意味着什么?,互联网营销 网站安全有哪些防护措施?Java集合------LinkedHashMap底层原理 全球第一免费开源ERP Odoo Ubuntu最佳开发环境独家首发分享 如何建设数据安全体系?