在做SOA服务化时,有时候一个服务依赖于其他很多服务,如下图:
最常规的做法是串行调用接口,最后将结果合并,如果为了提高效率,我们想并行调用每个接口,最后将结果合并,如何做呢?
首先我们想到的是使用多线程去执行,JUC中CountDownLatch可以实现这个效果,最先初始化n个任务传给countDownLatch,然后利用线程池去执行每个任务,执行完后使用countDown()方法将任务递减,CountDownLatch.awai()等待指导所有的任务执行完成。RxJava提供了比较优雅的方法,我们来看看它是怎么实现的。
rxjava的实现思路也是一样,创建多个异步处理任务,最后将结果合并,拿调用getPlane接口来说:
private Observable<PlaneBean> getPlane() throws Exception { return Observable.create(new Observable.OnSubscribe<PlaneBean>() { @Override public void call(Subscriber<? super PlaneBean> subscriber) { PlaneBean plane = new PlaneBean(); try { /* 调用服务业务处理*/ } catch (Exception e) { logger.error(FuncStatic.errorTrace(e)); } subscriber.onNext(plane); subscriber.onCompleted(); logger.info(requestId + " get plane info end"); } }).subscribeOn(Schedulers.from(workPool)); }
使用Observable.create创建一个异步任务,在call方法中写需要需要处理的业务逻辑,执行完成后将数据plane传入到subscriber对象中,并调用onCompleted()方法表示结束执行,核心为subscribeOn方法,这个任务会交给workPool来调度,所以最初我们还要创建一个线程池
private static ExecutorService workPool = Executors.newFixedThreadPool(50);
其他API方法调用同上,再来说下合并,RxJava提供了merge和zip方法来合并任务,merge方法要求每个任务返回的结果都相同,zip则不限制,根据需求这里我们使用zip方法
Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial() new Func5<DynamicBean, ShareBean, PreBean, PlaneBean, FiducialBean, GetDetailResponse>() { @Override public GetDetailResponse call(DynamicBean t1, ShareBean t2, PreBean t3, PlaneBean t4, FiducialBean t5) { if (t1 != null) response.setDynamic(t1); if (t2 != null) response.setShare(t2); if (t3 != null) response.setPre(t3); if (t4 != null) response.setPlane(t4); if (5 != null) response.setFiducial(t4); return response; } }).subscribeOn(Schedulers.from(workPool)).toBlocking().subscribe();
因为这里我调用的5个API,所有使用方法Func5,如果是3个则使用Func3,同样交给workPool线程池来处理合并的结果,注意这里要使用toBlocking来阻塞阻塞合并操作,等待所有任务都执行完成后再进行合并,最后将结果赋予GetDetailResponse对象。
以上就完成了并行调度的执行,在API的依赖逐渐增多,这样可以大大提高执行效率,但也有一个问题,如果某个API执行时间很长,将对拖慢整个接口的执行时间,导致接口发送雪崩,下次讲讲如果避免这种情况。