Future模式

Future模式是多线程开发中常见的一种设计模式,核心思想是异步调用。

Posted by 南方 on September 12, 2017
  • Future模式的核心在于去掉了主函数中的等待时间,并使得原本需要等待的时间段可以用于处理其他的业务逻辑,从而充分利用计算机资源。

传统串行程序调用流程 VS Future模式流程图


Future模式的主要角色

参与者 作用
Main 系统启动,调用Client发出请求
Client 返回Data对象,立即返回Future对象,并开启ClientThread线程装配RealData
Data 返回数据的接口
FutureData Future数据,构造很快,但是是一个虚拟数据,需要装配RealData
RealData 真实数据,其构造比较慢

Future模式结构图


Future模式简单实现

public interface Data {
    public String getResult();
}

public class RealData implements Data {

    protected final String result;

    public RealData(String param) {
        // RealData的构造可能很慢,需要用户等待很久,使用sleep模拟
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 10; i++) {
            sb.append(param);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("RealData完成时间: "+System.currentTimeMillis()/1000);
        result = sb.toString();
    }

    public String getResult() {
        return result;
    }
}

public class FutureData implements Data {  // FutureData实际上是真实数据RealData的代理,封装了获取RealData的等待过程
    protected RealData realData = null; // FutureData是Data的包装
    protected boolean isReady = false;
    public synchronized void setRealData(RealData realData) {
        if (isReady) {
            return;
        }
        this.realData = realData;
        isReady = true;
        notifyAll(); // RealData已被注入,通知getResult()
    }
    public synchronized String getResult() { // 会等待RealData构造完成
        while (!isReady) {
            try {
                wait(); // 一直等待,直到RealData注入完成
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.result; // 由RealData实现
    }
}

public class Client {
    public Data request(final String queryStr) {
        final FutureData future = new FutureData();
        new Thread() {
            public void run() {
                RealData realData = new RealData(queryStr);
                future.setRealData(realData);
            }
        }.start();
        return future;
    }
}

public class FutureDemo {
    public static void main(String[] args) {
        Client client = new Client();
        // 这里会立即返回,因为得到的是FutureData而不是RealData
        Data data = client.request("name");
        System.out.println("请求完毕");
        try {
            // 用sleep代替其他业务逻辑的处理
            // 在处理其他业务逻辑中,RealData被创建,从而充分利用了等待时间
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 使用真实的数据
        System.out.println("真实数据: " + data.getResult());
        System.out.println("main完成时间: "+System.currentTimeMillis()/1000);
    }
}

运行结果:

请求完毕
RealData完成时间: 1505157288
真实数据: namenamenamenamenamenamenamenamenamename
main完成时间: 1505157288

JDK中的Future模式

RunnaleFuture继承了Future和Runnable接口,其中run()方法用于构造真实的数据,它有一个具体的实现类FutureTask类。

FutureTask有一个内部类Sync,实现一些实质性的工作。而Sync最终会调Callable接口,完成实际数据的组装工作。

执行的逻辑有:

  1. 可以取消这个执行逻辑,如果这个逻辑已经正在执行,提供可选的参数来控制是否取消已经正在执行的逻辑。
  2. 可以判断执行逻辑是否已经被取消。
  3. 可以判断执行逻辑是否已经执行完成。
  4. 可以获取执行逻辑的执行结果。
  5. 可以允许在一定时间内去等待获取执行结果,如果超过这个时间,抛TimeoutException。

线程池中的FutureTask

在JCU中,FutureTask是Future的具体实现,额外实现了Runnable接口,既然实现Runnable接口,那么就满足了Task的行为,于是我们得到了一个可以被用来执行的Future。值得一提的是FutureTask的身份,她是JCU提供的线程池实现用到的任务基本单元,线程池接收两种对象,一个是Runnable任务,一种是Callable任务,两者区别在于前者返回执行结果给外部,后者不需要。按照默认线程池是实现ExecutorService接口的,按照ExecutorService接口定义的行为,我们可以将Runnable或Callable任务提交到线程池让其去被执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,丢到任务队列,由线程池的工作线程去执行。

FutureTask任务状态流转

在FutureTask中定义了七种任务状态:

  • NEW:当FutureTask被初始创建的时候的状态。
  • COMPLETING:当任务被执行完毕,FutureTask会将执行结果设置给FutureTask的outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。
  • NORMAL:当任务被执行完毕,FutureTask会将执行结果设置给FutureTask的outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。
  • EXCEPTIONAL:当任务在被执行的过程中抛了异常,FutureTask会将异常信息设置给FutureTask的outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后会将FutureTask的状态修改为EXCEPTIONAL。
  • CANCELLED:当外部想要取消任务,而又不允许当任务正在执行的时候被取消的时候会将FutureTask的状态修改为CANCELLED。
  • INTERRUPTING:当外部想要取消任务,同时允许当任务正在执行的时候被取消的时候,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位。
  • INTERRUPTED:当外部想要取消任务,同时允许当任务正在执行的时候被取消的时候,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。

综上,我们也可以总结下FutureTask的状态流转可能流程:

  1. NEW—>COMPLETING—>NORMAL(任务执行正常)
  2. NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
  3. NEW—>CANCELLED(不允许执行中的取消)
  4. NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

JDK内置的Future模式实现

public class RealData implements Callable<String> { // 实现Callable,call()方法会构造我们需要的真实数据并返回
    private String para;
    public RealData(String para) {
        this.para = para;
    }

    public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        System.out.println("RealData 正在构造真实数据。。。");
        for (int i = 0; i < 10; i++) {
            sb.append(para);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("RealData 真实数据构造完成。。。");
        return sb.toString();
    }
}

public class FutureMain {
    public static void main(String[] args) throws Exception {
        FutureTask<String> future = new FutureTask<String>(new RealData("name"));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 执行FutureTask
        // 在此开启线程进行RealData的call()执行
        executor.submit(future);
        System.out.println("请求完毕");
        try {
            System.out.println("Main 正在调用其他业务逻辑。。。");
            // 在此可以做额外的数据操作
            Thread.sleep(2000);
            System.out.println("Main 其他业务处理完成。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 在此取得call()方法的返回值
        // 如果此时call()方法没有执行完成,则依然会等待
        // 阻塞,一直等到RealData的call方法执行完毕,并返回泛型结果。
        System.out.println("数据: " + future.get());
        executor.shutdown();
    }
}

运行结果:

请求完毕
Main 正在调用其他业务逻辑。。。
RealData 正在构造真实数据。。。
RealData 真实数据构造完成。。。
Main 其他业务处理完成。。。
数据: namenamenamenamenamenamenamenamenamenamea