Spring Boot启动出现死锁

Spring Boot启动出现死锁

最近公司的一个项目在启动后,在调用往mongoDB写数据时,一直写不进去,运行一段时间后,程序出现内存溢出,jstack导出了线程信息,如下:

CC6B8F5EF4392AAA4F0B393FFDA32AA6 DFB379909A737E20E4A894BFF683739A

mongoDB在插入数据时发生了堵塞,在等待一个监控对象,主线程在调用afterPropertiesSet方法,一直在等待中,表示spring的初始化工作一直没有完成,所以我们结合代码分析,查看afterPropertiesSet方法

    @Override
    public void afterPropertiesSet() throws Exception {
        scheduleRulePriorityCacheService.start();

        if (isTesting == false) {
            kafkaConsumerTask.start();
            executeService.execute();
        }

    }

 

在spring属性设置完成后,执行了一个kafka消费任务,这个kafka消费任务大致的流程就是从kafka中取出数据,然后放入一个队列中,之后执行了executeService.execute()从队列里面拿出数据然后进行业务处理,我们看看execute()方法

	public void execute() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

这里就是一个无线循环操作,即如果队列有数据就消费,没有就等待200ms

我们知道,afterPropertiesSet方法是spring Bean的生命周期的一部分,这里发生了死锁,必定是锁住了一个资源,没有释放,而MongoDB又需要这个资源,我看到错误信息,在调用DefaultSingletonBeanRegistry.getSingleton方法时锁住了一个资源,我们查看spring的源代码,

7D5A37364ED4463F83CA98076DBEB6AD

可以看到锁住了singletonObjects对象,这个对象就是spring的单例容器,同样我们可以确定,在MongoDB调用注册事件的时候也需要这个对象,我们同样查看源代码

CECB8DBCEB5F05A176976D877FCEC3A1

可以看到MongoDB在注册事件的时候同时需要锁住retrievalMutex对象,那么retrievalMutex和singletonObjects有什么关系?我们接着看

ADE46D9008DA9E92021028F3E5582578 1F3CF43276456CBF7BF938AE5A1CC168

点进去查看getSingletonMutex方法,返回的就是singletonObjects对象,所有retrievalMutex实质和singletonObjects是同一个对象

那么就很容易得出结论了,在调用afterPropertiesSet方法时,singletonObjects对象一直没释放,而MongoDB又需要这个对象,所以产生了死锁。 解决方法,让afterPropertiesSet方法尽快释放singletonObjects对象,我们可以开启一个新线程来执行从队列中读取数据做业务处理的逻辑

	public void execute() {
		poolTaskExecutor.execute(this);
	}

	@Override
	public void run() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

 

Spring Boot整合Mybatis配置多数据源

Spring Boot整合Mybatis配置多数据源

mybatis是目前比较流行的的orm框架了,在spring中,我们知道只需要通过构造器的方式在dao层注入slaveSqlSession就可以指定对应的数据库了,那么Spring Boot是怎么实现的呢?

我们现在有两个库,一个Dynamic,一个Moment,那么我们分别配置两个数据源

@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.dynamic", sqlSessionTemplateRef = "dynamicSqlSessionTemplate")
public class DynamicDataSourceConfig {
    //配置数据源
    @Bean(name = "dynamicDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.dynamic")
    public DataSource dynamicDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "dynamicSqlSessionFactory")
    public SqlSessionFactory dynamicSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.dynamic");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/dynamic/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "dynamicTransactionManger")
    public DataSourceTransactionManager dynamicTransactionManger(@Qualifier("dynamicDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "dynamicSqlSessionTemplate")
    public SqlSessionTemplate dynamicSqlSessionTemplate(@Qualifier("dynamicSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.moment", sqlSessionTemplateRef = "momentSqlSessionTemplate")
public class MomentDataSourceConfig {
    private static final Logger logger = LoggerFactory.getLogger(MomentDataSourceConfig.class);

    //配置数据源
    @Bean(name = "momentDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.moment")
    @Primary
    public DataSource momentDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "momentSqlSessionFactory")
    @Primary
    public SqlSessionFactory momentSqlSessionFactory(@Qualifier("momentDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.moment");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/moment/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "momentTransactionManger")
    @Primary
    public DataSourceTransactionManager momentTransactionManger(@Qualifier("momentDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "momentSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate momentSqlSessionTemplate(@Qualifier("momentSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

这里我们看到配置模板、事务、数据源都相同,处理配置数据库工厂的时候指定的setMapperLocations不同,另外通过@MapperScan注解扫描不同包下面的Mapper文件并指定sqlSessionTemplateRef对应的数据库模板。 Dao及xml对应的文件

@Mapper
public interface AirportInfoMapper {

    List<Airport> selectAll();

    String findName(String airportCode);
}
public interface MomentMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(Moment record);

    MomentVo selectByPrimaryKey(@Param("id") Integer id);
}

xml就不贴了,和spring里面的配置一样,写对应的方法及SQL语句,另外在application.yml中配置需要的数据源

spring:
  datasource:
    mysql:
      moment:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/moment?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource
      dynamic:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/dynamic?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource

在service层就可以通过AirportInfoMapper和MomentMapper访问不同的数据库了

 

Spring Boot自动配置原理

Spring Boot自动化配置原理

Spring Boot提供了很多”开箱即用“的依赖模块,这些模块已经帮我们做了自动配置,这些依赖模块都是以spring-boot-starter-xx作为命名的,比如spring-boot-starter-redis、spring-boot-starter-data-mongodb ,Spring Boot关于自动配置的源码在spring-boot-autoconfigure.jar内 在说spring的自动配置,我们先看看Spring Boot的运作原理,我们都知道,在Spring Boot的启动类上都会添加@SpringBootApplication注解,我们点击进去看看里面的源码,自动配置的核心注解是@EnableAutoConfiguration

 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
        @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
        @Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication 

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import(AutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration

这里的关键功能时@Import注解到的配置功能,AutoConfigurationImportSelector使用SpringFactoriesLoader.loadFacotryNames方法来扫描spring-boot-autoconfigure.jar包里面spring.factories文件,此文件声明了哪些类可以自动注入

# Initializers
org.springframework.context.ApplicationContextInitializer=\
org.springframework.boot.autoconfigure.SharedMetadataReaderFactoryContextInitializer,\
org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingListener

# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer

# Auto Configuration Import Listeners
org.springframework.boot.autoconfigure.AutoConfigurationImportListener=\
org.springframework.boot.autoconfigure.condition.ConditionEvaluationReportAutoConfigurationImportListener

# Auto Configuration Import Filters
org.springframework.boot.autoconfigure.AutoConfigurationImportFilter=\
org.springframework.boot.autoconfigure.condition.OnClassCondition

# Auto Configure
org.springframework.boot.autoconfigure.web.servlet.HttpEncodingAutoConfiguration,\
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,\

平时我们知道在创建springWeb的时候,我们都需要在web.xml里面配置一个filter,设置Encode编码为UTF-8,但在spring boot中我们却没有,另外如果本地启动了Redis,我们也可以不需要配置Redis,我们点进去HttpEncodingAutoConfiguration、RedisAutoConfiguration里面看看里面的配置

@Configuration
@EnableConfigurationProperties(HttpEncodingProperties.class)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
@ConditionalOnClass(CharacterEncodingFilter.class)
@ConditionalOnProperty(prefix = "spring.http.encoding", value = "enabled", matchIfMissing = true)
public class HttpEncodingAutoConfiguration

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration

可以看到注解上面都包含了ConditionalOnClass和EnableConfigurationProperties两个注解,我们深入ConditionalOnClass看它是如何实现的

    @Override
    public ConditionOutcome getMatchOutcome(ConditionContext context,
            AnnotatedTypeMetadata metadata) {
        ClassLoader classLoader = context.getClassLoader();
        ConditionMessage matchMessage = ConditionMessage.empty();
        List<String> onClasses = getCandidates(metadata, ConditionalOnClass.class);
        if (onClasses != null) {
            List<String> missing = getMatches(onClasses, MatchType.MISSING, classLoader);
            if (!missing.isEmpty()) {
                return ConditionOutcome
                        .noMatch(ConditionMessage.forCondition(ConditionalOnClass.class)
                                .didNotFind("required class", "required classes")
                                .items(Style.QUOTE, missing));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnClass.class)
                    .found("required class", "required classes").items(Style.QUOTE,
                            getMatches(onClasses, MatchType.PRESENT, classLoader));
        }
        List<String> onMissingClasses = getCandidates(metadata,
                ConditionalOnMissingClass.class);
        if (onMissingClasses != null) {
            List<String> present = getMatches(onMissingClasses, MatchType.PRESENT,
                    classLoader);
            if (!present.isEmpty()) {
                return ConditionOutcome.noMatch(
                        ConditionMessage.forCondition(ConditionalOnMissingClass.class)
                                .found("unwanted class", "unwanted classes")
                                .items(Style.QUOTE, present));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnMissingClass.class)
                    .didNotFind("unwanted class", "unwanted classes").items(Style.QUOTE,
                            getMatches(onMissingClasses, MatchType.MISSING, classLoader));
        }
        return ConditionOutcome.match(matchMessage);
    }

源码中会通过ConditionalOnClass中设置的参数,在classPath下查找是否存在,现在如果我们引入了web依赖,那么CharacterEncodingFilter这个类肯定是存在的,此时它就会通过EnableConfigurationProperties注解配置的参数使用里面的默认配置,我们点进去HttpEncodingProperties看看

@ConfigurationProperties(prefix = "spring.http.encoding")
public class HttpEncodingProperties {

    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

    /**
     * Charset of HTTP requests and responses. Added to the "Content-Type" header if not
     * set explicitly.
     */
    private Charset charset = DEFAULT_CHARSET;

果然这里面有一个参数DEFAULT_CHARSET已经默认设置为UTF-8了,这里我们也可以通过spring.http.encoding来修改它的默认编码
同样,我们看看Redis的配置源码。

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean(name = "redisTemplate")
	public RedisTemplate<Object, Object> redisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

	@Bean
	@ConditionalOnMissingBean
	public StringRedisTemplate stringRedisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		StringRedisTemplate template = new StringRedisTemplate();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

}

这里redis为我们自动注入了一个Bean,名字是redisTemplate,所以我们在项目中使用redisTemplate直接就可以用了,通过spring.redis可以修改默认的配置

所以可以总结spring boot自动配置的原理是:先在claspath下查找是否存在的依赖类,如果存在则触发自动配置,我们只要通过Maven添加依赖,这些依赖就会下载很多jar包到classpath中。

实现一个自动配置模块

通过上面的分析,我们发现spring的自动配置还是比较简单的,那我们也可以自己实现一个,创建一个Maven项目,添加autoconfigure依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
</dependencies>

定义一个配置文件,我们以chuanz开头,后面我们在application.properties中就可以直接以chuanz开头来配置了

@ConfigurationProperties(prefix = "chuanz")
public class ChuanzProperties {
    public static final String DEFAULT_NAME = "baichuan";
    public String name = DEFAULT_NAME;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

实现一个简单的服务类

public class ChuanzService {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

下面我们来实现最主要的自动配置类,注意配置完成后,需要将这个类添加到spring.factories,我们在src/main/resources/ META-INF/创建一个spring.factories文件,然后配置它

@Configuration
@ConditionalOnClass({ ChuanzService.class })
@EnableConfigurationProperties(ChuanzProperties.class)
public class ChuanzAutoConfiguration {

    @Autowired
    private ChuanzProperties chuanzProperties;

    @Bean
    @ConditionalOnMissingBean(ChuanzService.class)
    public ChuanzService chuanzService() {
        ChuanzService chuanzService = new ChuanzService();
        chuanzService.setName(chuanzProperties.getName());
        return chuanzService;
    }
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.chuanz.springboot.autoconfig.ChuanzAutoConfiguration

然后我们将项目打包成一个jar文件,这里我们可以把文件上传到本地私服上,然后再通过maven来引用

<dependency>
    <groupId>cn.chuanz</groupId>
    <artifactId>springboot-chuanz-autoconfig</artifactId>
    <version>0.1</version>
</dependency>

然后我们就可以在项目中通过@Autowired来注入ChuanzService使用了,通过chuanz前缀在application.properties中也可以配置name属性的值

 

网络释义
Autowired: 自动装配

Spring Boot读取配置的几种方式

Spring Boot读取配置几种方式

spring读取配置有三种方式,使用@Value,@ConfigurationProperties,@Environment,@PropertySource,除了@PropertySource不支持yml的文件读取,下面我们对这几种方式分别进行介绍

使用@Value

比如我们在application.properties或application.yml中定义了一些配置,如:dynamic.subscribeUrl,在项目中我们只需要使用@Value就可以获取到。

@Component
public class DynamicConfig {

    @Value("dynamic.subscribeUrl")
    private String subscribeUrl;

}

使用@ConfigurationProperties

上面的例子,我们也可以通过@ConfigurationProperties来实现,只需要指定一个前缀

@Component
@ConfigurationProperties(prefix = "dynamic")
public class DynamicConfig {

    private String subscribeUrl;

}

使用@Environment

@Environment可以获取所有加载的配置文件,我们只需要根据getProperty方法就可以获取

@Autowired
private Environment environment;

environment.getProperty(String key);

使用@PropertySource

我们获取config/flight.properties下所有的配置,前缀以dynamic开头的

@Component
@ConfigurationProperties(prefix = "dynamic")
@PropertySource("config/flight.properties")
public class DynamicConfig {

    private String subscribeUrl;

}

RxJava使用介绍

在说RxJava之前先说说ReactiveX。

ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。

Rx介绍:

1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。

2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。

3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁

为什么说是扩展的观察者模式?

观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。

图片 1

扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理

迭代器模式

提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示

《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。

图片 1

函数式编程

//线程操作模式
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
//函数模式 (Lambda)A->B ->C->D
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

 

RxJava核心

Observable(被观察者,也就是事件源)和Subscriber(观察者)

//被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

//观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                log.info("基础写法:"+s);
            }
      };

 myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!");

		Action1<String> onNextAction = new Action1<String>() {

			@Override
			public void call(String s) {
				logger.info("Action1简化后:"+s);
			}
		};

		myObservable.subscribe(onNextAction);

		/* 写成匿名函数*/
		Observable.just("Hello World!").subscribe(new Action1<String>() {
			@Override
			public void call(String s) {
				logger.info("匿名函数写法:"+s);
			}
		});

		/*用Java 8 lambdas(Retrolambda)表达式*/
		Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));

 

RxJava操作符

创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。

String[] strings = {"张三","李四","王五","赵六"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("name",s);
            }
        });

变换操作:Map、FlatMap、ConcatMap等

public void showUserName(String userName){
	textView.setText(userName);
}

public void showUserName(String userName){
Observable.just(userName).subscribe( new  Action1<String>(){
         @Override
         public void call(String s){
            textView.setText(s);
        }
});  
}

如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”

方法1:我们可以对字符串本身操作

方法2:我们可以放到Action1.call()方法里做处理

方法3:使用操作符做变换:map

public void showUserName(String userName){
Observable.just(userName).map(new Func1<String,String>(){
          public String call(String text){
             return handleUserName(text);   
           }
   }).subscribe( new Action1<String>(){
        public void call(String s){
        }
        });
}

打印出中国的所有省份名称:flatMap()

List<Province>  provinceList = …
Observable.from(provinceList)
.flatMap(new Func1<Province,String>(){
@Override
 public String call(Province province){
            return province.getName();
      }
}).subscribe(new Action1<String>(){
         @Override
         public void call(String s){
            Log.i(“省份名称”,s)
        }
});
List<Province>  provinceList = …
Observable.from(provinceList)
.subscribe(new Action1<Province>(){
         @Override
         public void call(Province province){
            List<City> cities = province.getCities();
            for (int i = 0; i < cities.size(); i++) {
                   City city = cities.get(i);
                   Log.i(“城市”, city.getName());
            }        
          }
});

显然第一种比第二种看着更简洁,清晰

异步

调度器Scheduler:

0b350c8350142aff

操作符:

subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!

observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。

其他操作符

01E17C8B-D4F3-4D92-859D-D36D72D5A07C

Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项

Sample:“采样”,定期发射Observable最近发射的数据项

F77989EF-B666-47B5-9FCB-901E68BE64C3

String[] numbers = {"11", "2", "2", "13", "4", "5", "7"}
Observable
  .from(numbers)
  .map(s -> Integer.parseInt(s))
  .filter(s -> s < 10)
  .distinct()
  .takeLast(3)
  .reduce((number1, number2) -> 
     number1 + number2)
  )
  .subscribe(i -> System.out.println(i));//16

 

流式处理的优势

如需要从多个数据源获取数据内存、磁盘、网络依次获取。

Observable<String> memory= bservable.just("memory"); 
Observable<String> disk= Observable.just("disk");  
Observable<String> network=Observable.just("network");  

//依次检查memory、disk、network  
Observable.concat(memory, disk, network)  
.first()  
.subscribeOn(Schedulers.newThread())  
.subscribe(s -> {  
    memoryCache = "memory";  
    System.out.println("--------------subscribe: " + s);  
});

参考资料:

https://gank.io/post/560e15be2dca930e00da1083

http://www.jianshu.com/p/e0891032ee4d