博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发编程学习15--CompletableFuture(二)
阅读量:7256 次
发布时间:2019-06-29

本文共 5460 字,大约阅读时间需要 18 分钟。

【模拟情景

上一篇说到每一个shop都会提供一个价格查询的服务,但是现在我们进行假设:

1. 所有的价格查询是同步方式提供的2. shop在返回价格的同时会返回一个折扣码3. 我们需要解析返回的字符串,并且根据折扣码区获取折扣后的价格4. 折扣后的价格计算依然是同步执行的5. 查询价格返回的字符串格式为shopName:price:discountCode("沃尔玛:200:15")

定义商店对象:Shop.java

public class Shop {    private String name;    public Shop(String name){        this.name = name;    }    public String getName(){        return name;    }       public String getPriceFormat(String product){        double price = calculatePrice(product);        //随机返回一个折扣码        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];        return String.format("%s:%.2f:%s",name,price,code);    }    private double calculatePrice(String product){        delay();        return random.nextDouble() * product.charAt(0) + product.charAt(1);    }    private Random random = new Random();    /**     * 模拟耗时操作:延迟一秒     */    private static void delay(){        try {            Thread.sleep(1000L);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }    }}

定义折扣对象:Discount.java

public class Discount {    public enum Code{        NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);        private final int percantage;        Code(int percentage){            this.percantage = percentage;        }    }    public static String applyDiscount(Quote quote){        return quote.getShopName() + "prices is " + Discount.apply(quote.getPrice(),quote.getDiscountCode());    }    //计算折扣价格    private static Double apply(double price ,Code code){        //模拟远程操作的延迟        delay();        return (price * (100 - code.percantage)) / 100;    }    private static void delay(){        try {            Thread.sleep(1000L);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

用于封装解析getPriceFormat的字符串对象:Quote.java

public class Quote {    private final String shopName;    private final double price;    private final Discount.Code discountCode;    public Quote(String shopName,double price,Discount.Code code){        this.shopName = shopName;        this.price = price;        this.discountCode = code;    }    public static Quote parse(String s){        String[] split = s.split(":");        String shopName = split[0];        Double price = Double.valueOf(split[1]);        Discount.Code code = Discount.Code.valueOf(split[2]);        return new Quote(shopName,price,code);    }    public double getPrice() {        return price;    }    public String getShopName() {        return shopName;    }    public Discount.Code getDiscountCode() {        return discountCode;    }}

于是现在的任务就是:

1. 远程查询商品价格2. 将获得的字符串解析成为Quote对象3. 根据Quote对象远程获取折扣后的价格

现在先看看同步的方式来执行这个操作:

public List
findPrices2(String product){ return shops.stream() .map(shop -> shop.getPriceFormat(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); }

因为有两个耗时操作,每个1秒,耗时毫无疑问20秒以上:

图片描述

【对多个异步任务进行流水线操作

1. 获取价格:使用CompletableFuture.supplyAsync()工厂方法即可,一旦运行结束每个CompletableFuture对象会包含一个shop返回的字符串,这里记住使用我们自定义的执行器。

2. 解析报价:一般情况下解析操作并不涉及到IO处理,所可以采用同步处理,所以这里我们直接使用CompletableFuture对象的thenApply()方法,表明在的带运算结果后立刻同步处理。

3. 计算折扣价格:这是一个远程操作,肯定是需要异步执行的,于是我们现在就有了两次异步处理(1.获取价格,2.计算折扣)。现在使用级联的方式将它们串联起来工作。CompletableFuture提供了thenCompose方法,表明将两个异步操作进行流水线处理。第一个异步操作的结果会成为第二个异步操作的入参。使用这样的方式,即使Future在向不同的shop手机报价,主线程依然可以执行其他操作,比如响应UI事件。

于是我们有了如下代码:

/**     * 异步查询     * 相比并行流的话CompletableFuture更有优势:可以对执行器配置,设置线程池大小     */    @SuppressWarnings("all")    private final Executor myExecutor =       Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            return t;        }    });    public List
findPrices2Async(String product){ List
> futurePrices = shops.stream() //首先异步获取价格 .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceFormat(product),myExecutor)) //将获取的字符串解析成对象 .map(future -> future.thenApply(Quote::parse)) //使用另一个异步任务有获取折扣价格 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),myExecutor))) .collect(Collectors.toList()); //等待所有异步任务完成 return futurePrices.stream().map(CompletableFuture::join).collect(Collectors.toList());

运算结果不到3秒:

图片描述

【整合两个CompletableFuture对象

我们刚才使用thenCompose()将两个CompletableFuture结合了起来,并且一个CompletableFuture的运算结果将作为第二个CompletableFuture的入参。但是更多的情况是两个不相干的CompletableFuture对象相互结合,并且我们也不希望第一个任务结束之后才开始第二个任务。这时可以使用thenCombine()。

比如我们获取价格的同时也获取汇率:

远程获取汇率方法:

/**     * 获取汇率     */    public double getRate(String type){        delay();        if("$".equals(type)){            return 0.3;        }        if("¥".equals(type)){            return 0.7;        }        return 1;    }

结合俩个异步操作:

@Test    public void combine(){        Shop shop = new Shop("沃尔玛");        Future
futurePrice = CompletableFuture.supplyAsync(() -> shop.getPrice("iphoneX")) .thenCombine(CompletableFuture.supplyAsync(() -> shop.getRate("$")), (price,rate) -> price * rate); }

thenCombine()接受两个参数:

1. CompletableFuture对象:表明第二个异步操作2. BiFunction
接口:两个异步操作的结果合并处理

转载地址:http://ndvdm.baihongyu.com/

你可能感兴趣的文章