Monthly Archives: 八月 2017

Hystrix服务隔离及熔断

上次说了使用RxJava并行提升API效率,但是如何避免项目中一个服务拖慢整体的性能?hystrix优雅的解决了这个问题。

先贴个图,网上找的,我觉得这个图就可以说明问题了

hystrix-1

当依赖服务A挂了,直接就会影响到整个服务的性能,如果连接一直被A暂用,最终将导致服务发生雪崩

所以我们要保证核心服务的稳定,其他非核心的服务可以不那么重要,即使出现了错误数据可以丢弃或不用保证实时准确,再以SOA项目中的getDynamicInfo为例,如下:

87194035-3D85-4860-AB44-35EA71697A44

getFlight为核心的业务,必须要获取到数据,获取不到那就整个服务不可用了

getEntryCard、getPlane、getRefund、getWeather、getPortaitUser等方法为非核心业务,我们允许其数据丢失的情况,要保证整个服务的可用

首先我们需要把这些服务使用hystrix分离出来,先对这几个服务做先分组:

pydyn:getEntryCard、getPlane、getRefund、getWeather

portrait:getPortaitUser

Maven配置

	<dependency>
	  <groupId>com.netflix.hystrix</groupId>
	  <artifactId>hystrix-core</artifactId>
	  <version>1.5.8</version>
	</dependency>

以getEntryCard服务为例,继承HystrixCommand<T>,重写run方法

public class GetEntryCardCommand extends HystrixCommand<String>{

	private String arrCode;
        private GetEntryCardService service;

	public GetEntryCardCommand(String arrCode) {
		super(setter());
		this.arrCode = arrCode;
	}

	private static Setter setter() {
		return PySetter.setter().andCommandKey(HystrixCommandKey.Factory.asKey("getEntryCard"));
	}

	@Override
	protected String run() throws Exception {
		// TODO Auto-generated method stub
		return service.getEntryCardCity(arrCode);
	}

	@Override
	protected String getFallback() {
		return service.getFromCache(arrCode);
	}

}

run方法要执行的业务逻辑,如果出现异常则会调用getFallback方法。

public class PySetter {

	public static Setter setter() {
		// 服务分组
		HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("pydyn");
		// 服务标识
		//HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("getEntryCard");
		// 线程池名称
		HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("pydyn-pool");
		// 线程配置
		HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
				.withCoreSize(10)
				.withKeepAliveTimeMinutes(5)
				.withMaxQueueSize(Integer.MAX_VALUE)
				.withQueueSizeRejectionThreshold(10000);

		//命令属性配置
		HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
				.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                                .withExecutionIsolationThreadInterruptOnTimeout(true).withExecutionTimeoutInMilliseconds(3000) //设置超时3秒自动熔断
				.withCircuitBreakerErrorThresholdPercentage(30);//失败率达到30%自动熔断

		return HystrixCommand.Setter
				.withGroupKey(groupKey)
				//.andCommandKey(commandKey)
				.andThreadPoolKey(threadPoolKey)
				.andThreadPoolPropertiesDefaults(threadPoolProperties)
				.andCommandPropertiesDefaults(commandProperties);
	}

}

以上是对服务的线程池熔断配置

HystrixCommandGroupKey:服务分组,以上pydyn分组就包括4个服务,必填选项

HystrixCommandKey:服务的名称,唯一标识,如果不配置,则默认是类名

HystrixThreadPoolKey:线程池的名称,相同线程池名称的线程池是同一个,如果不配置,默认为分组名

HystrixThreadPoolProperties:线程池的配置,coreSize配置核心线程池的大小,maxQueueSize线程池队列的最大大小,queueSizeRejectionThreshold,限制当前队列的大小,实际队列大小由这个参数决定,即到达队列里面条数到达10000,则都会被拒绝。

HystrixCommandProperties:配置命令的一些参数,如executionIsolationStrategy,配置执行隔离策略,默认是使用线程隔离,THREAD即为线程池隔离,ExecutionIsolationThreadInterruptOnTimeout和ExecutionTimeoutInMilliseconds配置了启用超时和最大执行时间,这里为3s,circuitBreakerErrorThresholdPercentage失败率配置,默认为50%,这里配置的为30%,即失败率到达30%触发熔断

接下来就是调用GetEntryCardCommand获取数据

String result = new GetEntryCardCommand(request.getArrCode()).execute();

我使用的是阻塞的方式获取,当然也可以使用异步的方法,得到Future对象,使用

Future<String> = new GetEntryCardCommand(request.getArrCode()).queue();

其他getPlane、getRefund、getWeather方法同GetEntryCardCommand类,

getPortaitUser也一样,只需要更改下自己的分组为portait,线程池名称,并配置自己的线程池参数

好了,这样就实现了多个服务之间的隔离和熔断,不同的服务分组使用不同的线程池,即使getDynamicInfo并发量突然剧增,也不会对getPlane、getRefund、getWeather等产生影响

下面来说说对服务的监控,hystrix已经帮我们实现了数据的记录,只需要安装他们的管理后台就可以查看数据,他们家有2个监控后台:hystrix-dashboard和Turbine,第一个是监控单个项目的日志,第二个可以把多个项目的日志聚合在一起

hystrix-dashboard监控:

maven配置:

	 <dependency>  
	     <groupId>com.netflix.hystrix</groupId>  
	     <artifactId>hystrix-metrics-event-stream</artifactId>  
	     <version>1.1.2</version>  
 	</dependency>

web.xml配置

    <servlet>  
	    <display-name>HystrixMetricsStreamServlet</display-name>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>  
	</servlet>  
	<servlet-mapping>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <url-pattern>/hystrix.stream</url-pattern>  
	</servlet-mapping>

访问http://ip:port/projectName/hystrix.stream查看是否有监控数据

hystrix-dashboard:先安装hystrix-dashboard.war,下载后放入web容器(Tomcat或Jetty)中,启动容器,访问http://ip:port/hystrix-dashboard,如下的界面:

371D6055-27B8-41ED-ADF3-544E92ECC934

最后点击Monitor Stream就可以看到数据了

EFD10FC9-9BAB-43DE-B5EB-09082222744B

可以看到每个服务及他们的线程池使用情况

CirCuit主要监控成功的请求数,请求超时数,失败率,平均耗时、90%,99%,99.5%的耗时(将鼠标移动到对于数字位置可以看到描述)

Thread Pools主要监控线程池的配置数,线程队列的配置数,最大活跃线程

 

Turbine监控(将多个实例的数据聚合起来)

1)下载turbine-web-1.0.0.war,并将war放入web容器中。

2)在容器下路径为turbine-web-1.0.0/WEB-INF/classes下新建config.properties文件。

InstanceDiscovery.impl=com.netflix.turbine.discovery.ConfigPropertyBasedDiscovery
#cluster
turbine.aggregator.clusterConfig=dynamicAPI
turbine.instanceUrlSuffix=:8080/projectName/hystrix.stream
turbine.ConfigPropertyBasedDiscovery.dynamicAPI.instances=127.0.0.1,127.0.0.2

这里我配置了2个实例127.0.0.1和127.0.0.2,他会同时调用http://127.0.0.1:8080/projectName/hystrix.stream和http://127.0.0.2:8080/projectName/hystrix.stream,然后根据分组名,服务名,线程池名将两个实例数据合并

3)调用http://ip:port/turbine-web-1.0.0/turbine.stream?cluster=dynamicAPI,即可查看到聚合的数据

4)打开hystrix-dashboard配置turbine

766892D0-ADE9-4BC4-A991-7844965A1557

点击Monitor Stream即可查看到两个实例监控的数据

5EC0BFFE-3C41-4F14-980D-39614A231A16

 

参考资料:

http://tech.lede.com/2017/06/15/rd/server/hystrix/

http://www.cnblogs.com/java-zhao/archive/2016/09/01/5831002.html

 

RxJava使用介绍

在说RxJava之前先说说ReactiveX。

ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。

Rx介绍:

1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。

2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。

3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁

为什么说是扩展的观察者模式?

观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。

图片 1

扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理

迭代器模式

提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示

《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。

图片 1

函数式编程

//线程操作模式
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
//函数模式 (Lambda)A->B ->C->D
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

 

RxJava核心

Observable(被观察者,也就是事件源)和Subscriber(观察者)

//被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

//观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                log.info("基础写法:"+s);
            }
      };

 myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!");

		Action1<String> onNextAction = new Action1<String>() {

			@Override
			public void call(String s) {
				logger.info("Action1简化后:"+s);
			}
		};

		myObservable.subscribe(onNextAction);

		/* 写成匿名函数*/
		Observable.just("Hello World!").subscribe(new Action1<String>() {
			@Override
			public void call(String s) {
				logger.info("匿名函数写法:"+s);
			}
		});

		/*用Java 8 lambdas(Retrolambda)表达式*/
		Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));

 

RxJava操作符

创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。

String[] strings = {"张三","李四","王五","赵六"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("name",s);
            }
        });

变换操作:Map、FlatMap、ConcatMap等

public void showUserName(String userName){
	textView.setText(userName);
}

public void showUserName(String userName){
Observable.just(userName).subscribe( new  Action1<String>(){
         @Override
         public void call(String s){
            textView.setText(s);
        }
});  
}

如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”

方法1:我们可以对字符串本身操作

方法2:我们可以放到Action1.call()方法里做处理

方法3:使用操作符做变换:map

public void showUserName(String userName){
Observable.just(userName).map(new Func1<String,String>(){
          public String call(String text){
             return handleUserName(text);   
           }
   }).subscribe( new Action1<String>(){
        public void call(String s){
        }
        });
}

打印出中国的所有省份名称:flatMap()

List<Province>  provinceList = …
Observable.from(provinceList)
.flatMap(new Func1<Province,String>(){
@Override
 public String call(Province province){
            return province.getName();
      }
}).subscribe(new Action1<String>(){
         @Override
         public void call(String s){
            Log.i(“省份名称”,s)
        }
});
List<Province>  provinceList = …
Observable.from(provinceList)
.subscribe(new Action1<Province>(){
         @Override
         public void call(Province province){
            List<City> cities = province.getCities();
            for (int i = 0; i < cities.size(); i++) {
                   City city = cities.get(i);
                   Log.i(“城市”, city.getName());
            }        
          }
});

显然第一种比第二种看着更简洁,清晰

异步

调度器Scheduler:

0b350c8350142aff

操作符:

subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!

observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。

其他操作符

01E17C8B-D4F3-4D92-859D-D36D72D5A07C

Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项

Sample:“采样”,定期发射Observable最近发射的数据项

F77989EF-B666-47B5-9FCB-901E68BE64C3

String[] numbers = {"11", "2", "2", "13", "4", "5", "7"}
Observable
  .from(numbers)
  .map(s -> Integer.parseInt(s))
  .filter(s -> s < 10)
  .distinct()
  .takeLast(3)
  .reduce((number1, number2) -> 
     number1 + number2)
  )
  .subscribe(i -> System.out.println(i));//16

 

流式处理的优势

如需要从多个数据源获取数据内存、磁盘、网络依次获取。

Observable<String> memory= bservable.just("memory"); 
Observable<String> disk= Observable.just("disk");  
Observable<String> network=Observable.just("network");  

//依次检查memory、disk、network  
Observable.concat(memory, disk, network)  
.first()  
.subscribeOn(Schedulers.newThread())  
.subscribe(s -> {  
    memoryCache = "memory";  
    System.out.println("--------------subscribe: " + s);  
});

参考资料:

https://gank.io/post/560e15be2dca930e00da1083

http://www.jianshu.com/p/e0891032ee4d

 

RxJava在SOA中的运用

在做SOA服务化时,有时候一个服务依赖于其他很多服务,如下图:

8D5F1A87-787D-460D-BA2C-292DB4E6BCB0

最常规的做法是串行调用接口,最后将结果合并,如果为了提高效率,我们想并行调用每个接口,最后将结果合并,如何做呢?

首先我们想到的是使用多线程去执行,JUC中CountDownLatch可以实现这个效果,最先初始化n个任务传给countDownLatch,然后利用线程池去执行每个任务,执行完后使用countDown()方法将任务递减,CountDownLatch.awai()等待指导所有的任务执行完成。RxJava提供了比较优雅的方法,我们来看看它是怎么实现的。

rxjava的实现思路也是一样,创建多个异步处理任务,最后将结果合并,拿调用getPlane接口来说:

private Observable<PlaneBean> getPlane()
			throws Exception {
		return Observable.create(new Observable.OnSubscribe<PlaneBean>() {
			@Override
			public void call(Subscriber<? super PlaneBean> subscriber) {
				PlaneBean plane = new PlaneBean();
				try {
					/* 调用服务业务处理*/
				} catch (Exception e) {
					logger.error(FuncStatic.errorTrace(e));
				}
				subscriber.onNext(plane);
				subscriber.onCompleted();
				logger.info(requestId + " get plane info end");
			}
		}).subscribeOn(Schedulers.from(workPool));
	}

使用Observable.create创建一个异步任务,在call方法中写需要需要处理的业务逻辑,执行完成后将数据plane传入到subscriber对象中,并调用onCompleted()方法表示结束执行,核心为subscribeOn方法,这个任务会交给workPool来调度,所以最初我们还要创建一个线程池

private static ExecutorService workPool = Executors.newFixedThreadPool(50);

其他API方法调用同上,再来说下合并,RxJava提供了merge和zip方法来合并任务,merge方法要求每个任务返回的结果都相同,zip则不限制,根据需求这里我们使用zip方法

Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial()
		new Func5<DynamicBean, ShareBean, PreBean, PlaneBean, FiducialBean, GetDetailResponse>() {
			@Override
			public GetDetailResponse call(DynamicBean t1, ShareBean t2, PreBean t3,
					PlaneBean t4, FiducialBean t5) {
				if (t1 != null)
					response.setDynamic(t1);
				if (t2 != null)
					response.setShare(t2);
				if (t3 != null)
					response.setPre(t3);
				if (t4 != null)
					response.setPlane(t4);
				if (5 != null)
					response.setFiducial(t4);
				return response;
			}

		}).subscribeOn(Schedulers.from(workPool)).toBlocking().subscribe();

因为这里我调用的5个API,所有使用方法Func5,如果是3个则使用Func3,同样交给workPool线程池来处理合并的结果,注意这里要使用toBlocking来阻塞阻塞合并操作,等待所有任务都执行完成后再进行合并,最后将结果赋予GetDetailResponse对象。

以上就完成了并行调度的执行,在API的依赖逐渐增多,这样可以大大提高执行效率,但也有一个问题,如果某个API执行时间很长,将对拖慢整个接口的执行时间,导致接口发送雪崩,下次讲讲如果避免这种情况。