• Future :获取异步返回的结果需要使用轮询的方式,消耗cup

            ExecutorService executorService = Executors.newFixedThreadPool(10);
    
            Future<String> future = executorService.submit(()->{
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "future";
            });
            while(true){
                if(future.isDone()){
                    System.out.println(future.get());
                    break;
                }
            }
    
  • CompletableFuture:采用观察者模式,阻塞获取异步返回的结果,性能得到优化

    		 System.out.println("=============CompletableFuture===================");
    
    
            CompletableFuture testFuture1 = CompletableFuture.supplyAsync(()->{
                return "丽丽1";
            }).thenApply((element)->{
                System.out.println("testFuture1后续操作:"+element);
                return "丽丽2";
            });
    
            System.out.println(testFuture1.get());
    
    
            System.out.println("=============CompletableFuture===================");
    
    
            CompletableFuture testFuture2 = CompletableFuture.supplyAsync(()->{
                return "丽丽1";
            }).thenAccept((element)->{
                System.out.println("testFuture2后续操作:"+element);
            });
    
            System.out.println(testFuture2.get());
    
  • CompletableFuture的使用明细

    * runAsync 无返回值
    * supplyAsync 有返回值
    *
    * thenAccept 无返回值
    * thenApply 有返回值
    * thenRun 不关心上一步执行结果,执行下一个操作
    * get() 为阻塞获取 可设置超时时间 避免长时间阻塞
    
    实现接口 AsyncFunction 用于请求分发
    
    定义一个callback回调函数,该函数用于取出异步请求的返回结果,并将返回的结果传递给ResultFuture 
    
    对DataStream的数据使用Async操作
    
    • 例子

      /**
       * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
        *  通过向数据库发送异步请求并设置回调方法
       */
      class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
      
          /** The database specific client that can issue concurrent requests with callbacks 
           可以异步请求的特定数据库的客户端 */
          private transient DatabaseClient client;
      
          @Override
          public void open(Configuration parameters) throws Exception {
              client = new DatabaseClient(host, post, credentials);
          }
      
          @Override
          public void close() throws Exception {
              client.close();
          }
      
          @Override
          public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
      
              // issue the asynchronous request, receive a future for result
              // 发起一个异步请求,返回结果的 future
              final Future<String> result = client.query(key);
      
              // set the callback to be executed once the request by the client is complete
              // the callback simply forwards the result to the result future
              // 设置请求完成时的回调.将结果传递给 result future
              CompletableFuture.supplyAsync(new Supplier<String>() {
              
                  @Override
                  public String get() {
                      try {
                          return result.get();
                      } catch (InterruptedException | ExecutionException e) {
                          // Normally handled explicitly.
                          return null;
                      }
                  }
              }).thenAccept( (String dbResult) -> {
                  resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
              });
          }
      }
      
      // create the original stream
      // 创建一个原始的流
      DataStream<String> stream = ...;
      
      // apply the async I/O transformation
      // 添加一个 async I/O ,指定超时时间,和进行中的异步请求的最大数量
      DataStream<Tuple2<String, String>> resultStream =
          AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
      
      
    • 注意事项

      • Timeout:定义请求超时时间,异步请求多久没完成会被认为是超时了
      • Capacity:定义了同时进行的异步请求的数量,可以限制并发请求数量,不会积压过多的请求
      • 超时处理:默认当一个异步 I/O 请求超时时,会引发异常并重新启动作业。 如果要处理超时,可以覆盖该AsyncFunction的timeout方法来自定义超时之后的处理方式
      • 响应结果的顺序:AsyncDataStream包含两种输出模式,
        • unorderedWait无序:响应结果的顺序与异步请求的顺序不同
        • orderedWait有序:响应结果的顺序与异步请求的顺序相同
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/xietingwei/p/17647249.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!