在说RxJava之前先说说ReactiveX。
ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。
Rx介绍:
1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。
2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。
3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁
为什么说是扩展的观察者模式?
观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。
扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理
迭代器模式
提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示
《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。
函数式编程
//线程操作模式 new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
//函数模式 (Lambda)A->B ->C->D Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
RxJava核心
Observable(被观察者,也就是事件源)和Subscriber(观察者)
//被观察者 Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello world"); subscriber.onCompleted(); } } ); //观察者 Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(String s) { log.info("基础写法:"+s); } }; myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!"); Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { logger.info("Action1简化后:"+s); } }; myObservable.subscribe(onNextAction); /* 写成匿名函数*/ Observable.just("Hello World!").subscribe(new Action1<String>() { @Override public void call(String s) { logger.info("匿名函数写法:"+s); } }); /*用Java 8 lambdas(Retrolambda)表达式*/ Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));
RxJava操作符
创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。
String[] strings = {"张三","李四","王五","赵六"}; Observable.from(strings) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i("name",s); } });
变换操作:Map、FlatMap、ConcatMap等
public void showUserName(String userName){ textView.setText(userName); } public void showUserName(String userName){ Observable.just(userName).subscribe( new Action1<String>(){ @Override public void call(String s){ textView.setText(s); } }); }
如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”
方法1:我们可以对字符串本身操作
方法2:我们可以放到Action1.call()方法里做处理
方法3:使用操作符做变换:map
public void showUserName(String userName){ Observable.just(userName).map(new Func1<String,String>(){ public String call(String text){ return handleUserName(text); } }).subscribe( new Action1<String>(){ public void call(String s){ } }); }
打印出中国的所有省份名称:flatMap()
List<Province> provinceList = … Observable.from(provinceList) .flatMap(new Func1<Province,String>(){ @Override public String call(Province province){ return province.getName(); } }).subscribe(new Action1<String>(){ @Override public void call(String s){ Log.i(“省份名称”,s) } });
List<Province> provinceList = … Observable.from(provinceList) .subscribe(new Action1<Province>(){ @Override public void call(Province province){ List<City> cities = province.getCities(); for (int i = 0; i < cities.size(); i++) { City city = cities.get(i); Log.i(“城市”, city.getName()); } } });
显然第一种比第二种看着更简洁,清晰
异步
调度器Scheduler:
操作符:
subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!
observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。
其他操作符
Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项
Sample:“采样”,定期发射Observable最近发射的数据项
String[] numbers = {"11", "2", "2", "13", "4", "5", "7"} Observable .from(numbers) .map(s -> Integer.parseInt(s)) .filter(s -> s < 10) .distinct() .takeLast(3) .reduce((number1, number2) -> number1 + number2) ) .subscribe(i -> System.out.println(i));//16
流式处理的优势
如需要从多个数据源获取数据内存、磁盘、网络依次获取。
Observable<String> memory= bservable.just("memory"); Observable<String> disk= Observable.just("disk"); Observable<String> network=Observable.just("network"); //依次检查memory、disk、network Observable.concat(memory, disk, network) .first() .subscribeOn(Schedulers.newThread()) .subscribe(s -> { memoryCache = "memory"; System.out.println("--------------subscribe: " + s); });
参考资料:
https://gank.io/post/560e15be2dca930e00da1083
http://www.jianshu.com/p/e0891032ee4d