RxJava在SOA中的运用

在做SOA服务化时,有时候一个服务依赖于其他很多服务,如下图:

8D5F1A87-787D-460D-BA2C-292DB4E6BCB0

最常规的做法是串行调用接口,最后将结果合并,如果为了提高效率,我们想并行调用每个接口,最后将结果合并,如何做呢?

首先我们想到的是使用多线程去执行,JUC中CountDownLatch可以实现这个效果,最先初始化n个任务传给countDownLatch,然后利用线程池去执行每个任务,执行完后使用countDown()方法将任务递减,CountDownLatch.awai()等待指导所有的任务执行完成。RxJava提供了比较优雅的方法,我们来看看它是怎么实现的。

rxjava的实现思路也是一样,创建多个异步处理任务,最后将结果合并,拿调用getPlane接口来说:

private Observable<PlaneBean> getPlane()
			throws Exception {
		return Observable.create(new Observable.OnSubscribe<PlaneBean>() {
			@Override
			public void call(Subscriber<? super PlaneBean> subscriber) {
				PlaneBean plane = new PlaneBean();
				try {
					/* 调用服务业务处理*/
				} catch (Exception e) {
					logger.error(FuncStatic.errorTrace(e));
				}
				subscriber.onNext(plane);
				subscriber.onCompleted();
				logger.info(requestId + " get plane info end");
			}
		}).subscribeOn(Schedulers.from(workPool));
	}

使用Observable.create创建一个异步任务,在call方法中写需要需要处理的业务逻辑,执行完成后将数据plane传入到subscriber对象中,并调用onCompleted()方法表示结束执行,核心为subscribeOn方法,这个任务会交给workPool来调度,所以最初我们还要创建一个线程池

private static ExecutorService workPool = Executors.newFixedThreadPool(50);

其他API方法调用同上,再来说下合并,RxJava提供了merge和zip方法来合并任务,merge方法要求每个任务返回的结果都相同,zip则不限制,根据需求这里我们使用zip方法

Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial()
		new Func5<DynamicBean, ShareBean, PreBean, PlaneBean, FiducialBean, GetDetailResponse>() {
			@Override
			public GetDetailResponse call(DynamicBean t1, ShareBean t2, PreBean t3,
					PlaneBean t4, FiducialBean t5) {
				if (t1 != null)
					response.setDynamic(t1);
				if (t2 != null)
					response.setShare(t2);
				if (t3 != null)
					response.setPre(t3);
				if (t4 != null)
					response.setPlane(t4);
				if (5 != null)
					response.setFiducial(t4);
				return response;
			}

		}).subscribeOn(Schedulers.from(workPool)).toBlocking().subscribe();

因为这里我调用的5个API,所有使用方法Func5,如果是3个则使用Func3,同样交给workPool线程池来处理合并的结果,注意这里要使用toBlocking来阻塞阻塞合并操作,等待所有任务都执行完成后再进行合并,最后将结果赋予GetDetailResponse对象。

以上就完成了并行调度的执行,在API的依赖逐渐增多,这样可以大大提高执行效率,但也有一个问题,如果某个API执行时间很长,将对拖慢整个接口的执行时间,导致接口发送雪崩,下次讲讲如果避免这种情况。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注