RxJava使用介绍

在说RxJava之前先说说ReactiveX。

ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。

Rx介绍:

1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。

2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。

3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁

为什么说是扩展的观察者模式?

观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。

图片 1

扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理

迭代器模式

提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示

《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。

图片 1

函数式编程

//线程操作模式
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
//函数模式 (Lambda)A->B ->C->D
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

 

RxJava核心

Observable(被观察者,也就是事件源)和Subscriber(观察者)

//被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

//观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                log.info("基础写法:"+s);
            }
      };

 myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!");

		Action1<String> onNextAction = new Action1<String>() {

			@Override
			public void call(String s) {
				logger.info("Action1简化后:"+s);
			}
		};

		myObservable.subscribe(onNextAction);

		/* 写成匿名函数*/
		Observable.just("Hello World!").subscribe(new Action1<String>() {
			@Override
			public void call(String s) {
				logger.info("匿名函数写法:"+s);
			}
		});

		/*用Java 8 lambdas(Retrolambda)表达式*/
		Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));

 

RxJava操作符

创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。

String[] strings = {"张三","李四","王五","赵六"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("name",s);
            }
        });

变换操作:Map、FlatMap、ConcatMap等

public void showUserName(String userName){
	textView.setText(userName);
}

public void showUserName(String userName){
Observable.just(userName).subscribe( new  Action1<String>(){
         @Override
         public void call(String s){
            textView.setText(s);
        }
});  
}

如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”

方法1:我们可以对字符串本身操作

方法2:我们可以放到Action1.call()方法里做处理

方法3:使用操作符做变换:map

public void showUserName(String userName){
Observable.just(userName).map(new Func1<String,String>(){
          public String call(String text){
             return handleUserName(text);   
           }
   }).subscribe( new Action1<String>(){
        public void call(String s){
        }
        });
}

打印出中国的所有省份名称:flatMap()

List<Province>  provinceList = …
Observable.from(provinceList)
.flatMap(new Func1<Province,String>(){
@Override
 public String call(Province province){
            return province.getName();
      }
}).subscribe(new Action1<String>(){
         @Override
         public void call(String s){
            Log.i(“省份名称”,s)
        }
});
List<Province>  provinceList = …
Observable.from(provinceList)
.subscribe(new Action1<Province>(){
         @Override
         public void call(Province province){
            List<City> cities = province.getCities();
            for (int i = 0; i < cities.size(); i++) {
                   City city = cities.get(i);
                   Log.i(“城市”, city.getName());
            }        
          }
});

显然第一种比第二种看着更简洁,清晰

异步

调度器Scheduler:

0b350c8350142aff

操作符:

subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!

observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。

其他操作符

01E17C8B-D4F3-4D92-859D-D36D72D5A07C

Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项

Sample:“采样”,定期发射Observable最近发射的数据项

F77989EF-B666-47B5-9FCB-901E68BE64C3

String[] numbers = {"11", "2", "2", "13", "4", "5", "7"}
Observable
  .from(numbers)
  .map(s -> Integer.parseInt(s))
  .filter(s -> s < 10)
  .distinct()
  .takeLast(3)
  .reduce((number1, number2) -> 
     number1 + number2)
  )
  .subscribe(i -> System.out.println(i));//16

 

流式处理的优势

如需要从多个数据源获取数据内存、磁盘、网络依次获取。

Observable<String> memory= bservable.just("memory"); 
Observable<String> disk= Observable.just("disk");  
Observable<String> network=Observable.just("network");  

//依次检查memory、disk、network  
Observable.concat(memory, disk, network)  
.first()  
.subscribeOn(Schedulers.newThread())  
.subscribe(s -> {  
    memoryCache = "memory";  
    System.out.println("--------------subscribe: " + s);  
});

参考资料:

https://gank.io/post/560e15be2dca930e00da1083

http://www.jianshu.com/p/e0891032ee4d

 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注