分布式一致性协议

分布式一致性协议

在分布式系统中为了解决数据的一致性,主要有二阶段提交协议、三阶段提交协议、Poxos算法、TTC协议、Rraft协议、ZAB协议(zookeeper的协议),下面就分别介绍这几种协议

二阶段提交协议

二阶段提交协议主要思想是:有一个协调者,多个参与者,协调者负责发送命令给参与者,确保数据能同时更新到所有节点上,主要步骤如下:

Two-phase_commit

1)协调者将事务的请求发送给所有参与者,询问是否可以提交事务,参与者锁住自己的资源,并写undo/redo日志,如果参与者都准备成功,则向协调者回应“可以提交”

2)协调者所有参与者都会有“可以提交”,此时向参与者发送“正式提交”命令,所有参与者开始提交自己的事务

如上述1)2)有任何不成功,所有参与者都将回滚自己的事务

二阶段提交原理比较简单,实现起来也比较方便,但问题也比较明显,如:

1,同步堵塞:所有参与者在没有收到协调者发送过来的“正式提交”命令,都将锁住资源,其他线程如果要获取资源只能等待

2,单点问题:所有参与者都依靠一个协调者,如果协调者在2)操作突然失去联系,这个时候所有的参与者都不知道如何处理,是否提交事务

3,数据不一致性:在2)操作中,可能由于网络原因,有些节点收到提交命令,有些没有,从而照成数据不一致的问题

三阶段提交协议

Three-phase_commit_diagram

三阶段相比二阶段的思想是在操作1)中多了一步,首先询问:是否可以锁资源,如果所有人同意了才开始锁资源,后面的步骤就童二阶段提交了,它相比二阶段协议,解决了如果协调者突然失去联系,参与者仍可以提交他们的事务

但三阶段实现起来还是比较复杂,而且由于参与者最后还是会提交自己的事务,也会造成数据不一致行

TTC协议

为了减少资源被堵塞的时间,产生了TTC协议,可以这样理解TTC是针对SOA服务的锁,2PC是针对数据库的锁

比如我们需要从A账户转100到B账户,必然会涉及到如下几个步骤 1,读取A账户金额,-100 2,读取B账户金额,+100

2PC就会先锁住A账户和B账号的数据,那么任何其它线程想要读取这个账号数据,都将堵塞

TCC则先会从A账号-100元,这时线程也可以读取A账号的数据了,如果转入B账号发生失败,那么就会调用回滚接口,再将A账号+100元,这实质是一种补偿机制

Poxos算法

理解Poxos算法,先弄清楚几个角色

1,proposers 投票人,可以理解为收集人民意见的人

2,acceptors 接受投票者,可以理解为人大代表

3,learners 学习者,可以理解为记录员,将处理的结果记录下来

608A9B29BB2FE2E9787DDB3A3876268E

你可以这样理解,首先投票人接收到人民的意见,就开始提出一个法案,并把这个法案告诉人大代表,说现在人民提了一个法案,我们来开始讨论这个法案吧,人大代表如果同意了(这里的同意指的是有半数以上的人同意了),那么就开始讨论这个法案,如果人大代表都同意(半数以上人同意)通过这个法案,那么大家就会确定下来,交给记录人员将这个法案记录下来,并告诉人民,现在已经确定了一个新法案

poxos算法也有一个问题,即如果有两个投票人接到了人民的法案,我们现在假设为投票人P1和投票人P2,他们同时接受到两个法案A1,A2,A2的版本要大于A1的版本,这里说明下人大代表会接受法案版本大的那个,如果有下面这种情况 1)投票人P1发送了法案A1,并且人大代表都接受到了这个法案,所以决定开始讨论这个法案,并告诉所有人

2)投票人P2发送了法案A2,人大代表又接收到了法案A2,发现A2的版本大于A1的,所以放弃了A1,开始讨论A2的

3)投票者P1发现自己的法案被放弃了,就又会提出一个新的法案A3,A3的版本大于A2,并会发给人大代表,人大代表又会放弃A2,开始讨论A3

4) 投票者P2发现自己的法案被放弃了,就又会提出一个新的法案A4,A4的版本大于A3,并会发给人大代表,人大代表又会放弃A3,开始讨论A4

这样依次反复,就会出现不断更换法案的问题,所以我们就需要只有一个投票人来发送法案,如果投票人退休了,就再新选一个投票人,下面我们来讲讲ZAB协议

ZAB协议

角色定义:

1,leader,主节点,对应上面的投票人,只能有一个

2,follower,从节点,对应上面的人大代表,必须为多个

685547DB334887C0A05B4A68827CBBC1

这里有一个leader,所以最开始必须有一个选举的过程,确定谁来当leader, 所有节点都向各个节点发送一条消息,表示我要当leader,这里由于节点发送的时间不同,每个命令发给其他节点的时间也不同,每个节点最开始收到请求后,都会同意这个命令,如果某个节点收到半数以上的节点回应“同意”,那么它就会将自己设置为leader,并告诉其它节点,现在我是leader,你们需要听从我的,其它节点就会把自己设置为follower

在zookeeper中,最开始的选举是会将myid最大的节点设为leader节点,之后会根据节点zxid,来确定谁来当下一个leader节点,当确定leader后,具体处理流程如下:

1)收到客户端的一个事务请求,可能是leader或follower,follower收到请求后也会转发给leader

2)leader收到follower的事务请求,开始把事务处理请求发给所有follower,follower收到事务请求开始锁资源,处理,将redo/undo写入日志,如果执行成功告诉leader

3)leader收到半数以上的follower回应成功,则再次发送一个命令给follower,说可以提交事务了

4)follower接受到命令,提交自己的事务,并将结果返给leader

5)leader将结果发给客户端

Raft协议

Raft协议同ZAB协议,只不过ZAB是follower节点向主节点发送心跳,确保主节点是否存活,而Raft是Leader节点向follower节点发送心跳,以下这个动画很生动的讲解了分布式系统下一致性的保证:

http://thesecretlivesofdata.com/raft/

你也可以模拟分布式系统不同场景各节点的情况:

https://raft.github.io/

参考文章:

https://coolshell.cn/articles/10910.html 参考书籍

《从POXOS到Zookeeper分布式原理一致性与实践》

SpringCloud链路追踪

Spring Cloud链路追踪

接着上一篇的文章,今天讲讲spring cloud在分布式系统中的链路跟踪,主要使用的是zipkin框架实现的,上篇文章写道了有一个注册中心Eureka,和两个服务方,一个消费方,我们的消费方也可以做了一个服务,注册到Eureka中,所以我们对消费方也添加EurekaClient和zipkin的maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

启动类添加@EurekaClient注解,同样服务方也要添加zipkin的maven依赖

zipkin介绍

Zipkin 是一个开放源代码分布式的跟踪系统,由Twitter公司开源,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现,架构如下:

5ec1521a-26b4-11e7-9679-8c429afdbe0c

每个服务向zipkin报告计时数据,zipkin会根据调用关系通过Zipkin UI生成依赖关系图,显示了多少跟踪请求通过每个服务,该系统让开发者可通过一个 Web 前端轻松的收集和分析数据,例如用户每次请求服务的处理时间等,可方便的监测系统中存在的瓶颈。

Zipkin提供了可插拔数据存储方式:In-Memory、MySql、Cassandra以及Elasticsearch。Zipkin默认是使用http+内存传输和收集,在并发量比较大会影响效率,下面我们我们通过Kafka+ElasticSearch实现服务的传输与收集

创建ZipKin服务

新建一个模块,我们称为zipkinserver,添加下面的依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-server</artifactId>
</dependency>
<dependency>
   <groupId>io.zipkin.java</groupId>
   <artifactId>zipkin-autoconfigure-ui</artifactId>
</dependency>

在启动类,添加如下注解:

@SpringBootApplication
@EnableEurekaClient
@EnableZipkinServer
public class ZipkinServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZipkinServerApplication, args);
    }

}

修改application.yml配置文件,添加kafka收集和ElasticSearch存储,

zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      hosts: localhost:9300
      index: zipkin

  collector:
    kafka:
      zookeeper: localhost:2181
      topic: zipkin
      groupId: zipkin

然后启动服务,zipkin的默认端口是9494,访问地址:http://localhost:9494

修改服务方和消费方的application.yml,添加zipkin的地址,kafka收集地址

spring: 
  zipkin:
    base-url: http://localhost:9411
    kafka:
      topic: zipkin
  kafka:
    bootstrap-servers: localhost:9092

  sleuth:
    sampler:
      percentage: 1.0

zipkin只有在接口调用后,才会产生数据的调用情况,所以我们先访问消费方的接口,然后再打开zipkin的界面,可以看到dynamic-service和feign的调用关系及耗时情况

31DADE5B71CF7F9EE33D80AE6B097E57 64043E75E8489933DFB3E2FA03A5AF9A

SpringCloud服务注册与发现

Spring Cloud服务注册与发现

Spring Cloud集成了搭建分布式服务一系列框架,如服务注册与发现Eureka,熔断器Hystrix,路由网关Zuul,链路追踪zipkin,今天主要讲解Eureka的使用。

Eureka是什么?

Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现。也是springcloud体系中最重要最核心的组件之一,我们通过下面这样图就可以了解

48EB8D2E311BF36563200EF5B0015EB6

1)服务提供方向Eureka注册自己的服务,

2)消费者向Eureka获取自己需要的服务,和提供方建立连接

3) 如果服务方出现故障,Eureka会自动将服务方从注册列表中删除

搭建项目

创建Eureka服务

首先创建一个Maven项目,指定spring boot,spring cloud 版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.1.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>

创建一个模块,我们称为EurekaServer,使用Eureka只需要引入maven包,然后启动项目就可以了,很方面,如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

配置application.yml文件

server:
  port: 8081

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

spring:
  application:
    name: eurka-server

添加注解@EnableEurekaServer,并启动EurekaServer

@SpringBootApplication
@EnableEurekaServer
public class EurakaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurakaServerApplication.class, args);
    }
}

启动EurekaServer,地址为:http://localhost:8081/eureka

创建提供方服务

添加maven依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

创建服务接口

@RestController
public class AirportController {

    @Autowired
    private AirportService airportService;

    @RequestMapping("/getAirport")
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportService.getAirport(threeCode);
    }

}

@Service
public class AirportService {

    @Value("${server.port}")
    private int port;

    public AirportBean getAirport(String threeCode) {
        AirportBean bean = new AirportBean();
        bean.setName("北京首都国际机场");
        bean.setThreeCode(threeCode);
        bean.setPort(port);
        return bean;
    }

}

public class AirportBean {

    private String threeCode;
    private String name;
    private int port;
}

修改application.yml文件

<code>server:
  port: 8082

spring:
  application:
    name: dynamic-service

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8081/eureka/</code>

添加@EnableEurekaClient注解,这里我们为了方便演示负载均衡,同时也启动了两个实例,端口分别为8082,8083

@SpringBootApplication
@EnableEurekaClient
public class DynamicServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(DynamicServiceApplication.class, args);
    }
}

创建服务消费方

我们再项目下再新建一个模块,称为springcloudclient,添加maven依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

这里我们使用了feign的服务调用方式,Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign,ribbon类似一种rest风格的API调用方式,而feign整合了ribbon,具有负载均衡的能力,通过注解的方式,使代码看起来更加简洁,另外feign整合了Hystrix,具有熔断的能力

调用服务方的接口

@RestController
public class AirportFeignController {

    @Autowired
    private AirportFeignService airportFeignService;

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode) {
        return airportFeignService.getAirport(threeCode);
    }

}

@FeignClient(value = "dynamic-service", fallback = AirportFeignFallbackService.class)
public interface AirportFeignService {

    @RequestMapping(value = "/getAirport",method = RequestMethod.GET)
    public AirportBean getAirport(@RequestParam("threeCode") String threeCode);

}

// 服务失败后熔断,调用的方法
public class AirportFeignFallbackService implements AirportFeignService {
    @Override
    public AirportBean getAirport(String threeCode) {
        return null;
    }
}

public class AirportBean {
    private String threeCode;
    private String name;
    private int port;
}

配置application.yml文件

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8084
spring:
  application:
    name: service-feign

添加@ EnableEurekaClient,@EnableDiscoveryClient, @EnableFeignClients注解,端口为8084,

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
public class SpringCloudServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudServerApplication.class, args);
    }
}

好了下面可以演示springcloud的服务注册与发现了,通过上面的例子,我们启动了Eureka服务,分别为:8081,同时启动了两个服务提供方,注册到Eureka中,端口分别为8082和8083,接着我们启动了一个服务消费方,端口为8084,我们分别启动他们
打开Eureka的服务页面:http://localhost:8081

55AD7F4965A098E135257B0B04BBF3B6

可以发现有两个服务方已经注册上了,我们调用消费方的接口,发现消费方会使用负载均衡的方式分别访问服务方

 

有道词典

org.springframe …

详细X

  org.springframework.boot   spring-boot-starter-parent   2.1.1.RELEASE      utf – 8   utf – 8   1.8   Finchley.SR2

Spring Boot整合Mybatis配置多数据源

Spring Boot整合Mybatis配置多数据源

mybatis是目前比较流行的的orm框架了,在spring中,我们知道只需要通过构造器的方式在dao层注入slaveSqlSession就可以指定对应的数据库了,那么Spring Boot是怎么实现的呢?

我们现在有两个库,一个Dynamic,一个Moment,那么我们分别配置两个数据源

@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.dynamic", sqlSessionTemplateRef = "dynamicSqlSessionTemplate")
public class DynamicDataSourceConfig {
    //配置数据源
    @Bean(name = "dynamicDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.dynamic")
    public DataSource dynamicDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "dynamicSqlSessionFactory")
    public SqlSessionFactory dynamicSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.dynamic");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/dynamic/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "dynamicTransactionManger")
    public DataSourceTransactionManager dynamicTransactionManger(@Qualifier("dynamicDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "dynamicSqlSessionTemplate")
    public SqlSessionTemplate dynamicSqlSessionTemplate(@Qualifier("dynamicSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
@Configuration
@MapperScan(basePackages = "com.huoli.trip.dao.moment", sqlSessionTemplateRef = "momentSqlSessionTemplate")
public class MomentDataSourceConfig {
    private static final Logger logger = LoggerFactory.getLogger(MomentDataSourceConfig.class);

    //配置数据源
    @Bean(name = "momentDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mysql.moment")
    @Primary
    public DataSource momentDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "momentSqlSessionFactory")
    @Primary
    public SqlSessionFactory momentSqlSessionFactory(@Qualifier("momentDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setTypeAliasesPackage("com.huoli.trip.entity.moment");
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/moment/*.xml"));
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

     //配置事务
    @Bean(name = "momentTransactionManger")
    @Primary
    public DataSourceTransactionManager momentTransactionManger(@Qualifier("momentDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "momentSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate momentSqlSessionTemplate(@Qualifier("momentSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

这里我们看到配置模板、事务、数据源都相同,处理配置数据库工厂的时候指定的setMapperLocations不同,另外通过@MapperScan注解扫描不同包下面的Mapper文件并指定sqlSessionTemplateRef对应的数据库模板。 Dao及xml对应的文件

@Mapper
public interface AirportInfoMapper {

    List<Airport> selectAll();

    String findName(String airportCode);
}
public interface MomentMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(Moment record);

    MomentVo selectByPrimaryKey(@Param("id") Integer id);
}

xml就不贴了,和spring里面的配置一样,写对应的方法及SQL语句,另外在application.yml中配置需要的数据源

spring:
  datasource:
    mysql:
      moment:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/moment?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource
      dynamic:
        driverClassName: com.mysql.jdbc.Driver
        jdbcUrl: jdbc:mysql://host:port/dynamic?useUnicode=true&characterEncoding=utf-8
        username: xxx
        password: xxxx
        type: com.zaxxer.hikari.HikariDataSource

在service层就可以通过AirportInfoMapper和MomentMapper访问不同的数据库了

 

Spring Boot自动配置原理

Spring Boot自动化配置原理

Spring Boot提供了很多”开箱即用“的依赖模块,这些模块已经帮我们做了自动配置,这些依赖模块都是以spring-boot-starter-xx作为命名的,比如spring-boot-starter-redis、spring-boot-starter-data-mongodb ,Spring Boot关于自动配置的源码在spring-boot-autoconfigure.jar内 在说spring的自动配置,我们先看看Spring Boot的运作原理,我们都知道,在Spring Boot的启动类上都会添加@SpringBootApplication注解,我们点击进去看看里面的源码,自动配置的核心注解是@EnableAutoConfiguration

 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
        @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
        @Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication 

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import(AutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration

这里的关键功能时@Import注解到的配置功能,AutoConfigurationImportSelector使用SpringFactoriesLoader.loadFacotryNames方法来扫描spring-boot-autoconfigure.jar包里面spring.factories文件,此文件声明了哪些类可以自动注入

# Initializers
org.springframework.context.ApplicationContextInitializer=\
org.springframework.boot.autoconfigure.SharedMetadataReaderFactoryContextInitializer,\
org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingListener

# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer

# Auto Configuration Import Listeners
org.springframework.boot.autoconfigure.AutoConfigurationImportListener=\
org.springframework.boot.autoconfigure.condition.ConditionEvaluationReportAutoConfigurationImportListener

# Auto Configuration Import Filters
org.springframework.boot.autoconfigure.AutoConfigurationImportFilter=\
org.springframework.boot.autoconfigure.condition.OnClassCondition

# Auto Configure
org.springframework.boot.autoconfigure.web.servlet.HttpEncodingAutoConfiguration,\
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,\

平时我们知道在创建springWeb的时候,我们都需要在web.xml里面配置一个filter,设置Encode编码为UTF-8,但在spring boot中我们却没有,另外如果本地启动了Redis,我们也可以不需要配置Redis,我们点进去HttpEncodingAutoConfiguration、RedisAutoConfiguration里面看看里面的配置

@Configuration
@EnableConfigurationProperties(HttpEncodingProperties.class)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
@ConditionalOnClass(CharacterEncodingFilter.class)
@ConditionalOnProperty(prefix = "spring.http.encoding", value = "enabled", matchIfMissing = true)
public class HttpEncodingAutoConfiguration

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration

可以看到注解上面都包含了ConditionalOnClass和EnableConfigurationProperties两个注解,我们深入ConditionalOnClass看它是如何实现的

    @Override
    public ConditionOutcome getMatchOutcome(ConditionContext context,
            AnnotatedTypeMetadata metadata) {
        ClassLoader classLoader = context.getClassLoader();
        ConditionMessage matchMessage = ConditionMessage.empty();
        List<String> onClasses = getCandidates(metadata, ConditionalOnClass.class);
        if (onClasses != null) {
            List<String> missing = getMatches(onClasses, MatchType.MISSING, classLoader);
            if (!missing.isEmpty()) {
                return ConditionOutcome
                        .noMatch(ConditionMessage.forCondition(ConditionalOnClass.class)
                                .didNotFind("required class", "required classes")
                                .items(Style.QUOTE, missing));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnClass.class)
                    .found("required class", "required classes").items(Style.QUOTE,
                            getMatches(onClasses, MatchType.PRESENT, classLoader));
        }
        List<String> onMissingClasses = getCandidates(metadata,
                ConditionalOnMissingClass.class);
        if (onMissingClasses != null) {
            List<String> present = getMatches(onMissingClasses, MatchType.PRESENT,
                    classLoader);
            if (!present.isEmpty()) {
                return ConditionOutcome.noMatch(
                        ConditionMessage.forCondition(ConditionalOnMissingClass.class)
                                .found("unwanted class", "unwanted classes")
                                .items(Style.QUOTE, present));
            }
            matchMessage = matchMessage.andCondition(ConditionalOnMissingClass.class)
                    .didNotFind("unwanted class", "unwanted classes").items(Style.QUOTE,
                            getMatches(onMissingClasses, MatchType.MISSING, classLoader));
        }
        return ConditionOutcome.match(matchMessage);
    }

源码中会通过ConditionalOnClass中设置的参数,在classPath下查找是否存在,现在如果我们引入了web依赖,那么CharacterEncodingFilter这个类肯定是存在的,此时它就会通过EnableConfigurationProperties注解配置的参数使用里面的默认配置,我们点进去HttpEncodingProperties看看

@ConfigurationProperties(prefix = "spring.http.encoding")
public class HttpEncodingProperties {

    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

    /**
     * Charset of HTTP requests and responses. Added to the "Content-Type" header if not
     * set explicitly.
     */
    private Charset charset = DEFAULT_CHARSET;

果然这里面有一个参数DEFAULT_CHARSET已经默认设置为UTF-8了,这里我们也可以通过spring.http.encoding来修改它的默认编码
同样,我们看看Redis的配置源码。

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean(name = "redisTemplate")
	public RedisTemplate<Object, Object> redisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

	@Bean
	@ConditionalOnMissingBean
	public StringRedisTemplate stringRedisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		StringRedisTemplate template = new StringRedisTemplate();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

}

这里redis为我们自动注入了一个Bean,名字是redisTemplate,所以我们在项目中使用redisTemplate直接就可以用了,通过spring.redis可以修改默认的配置

所以可以总结spring boot自动配置的原理是:先在claspath下查找是否存在的依赖类,如果存在则触发自动配置,我们只要通过Maven添加依赖,这些依赖就会下载很多jar包到classpath中。

实现一个自动配置模块

通过上面的分析,我们发现spring的自动配置还是比较简单的,那我们也可以自己实现一个,创建一个Maven项目,添加autoconfigure依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
</dependencies>

定义一个配置文件,我们以chuanz开头,后面我们在application.properties中就可以直接以chuanz开头来配置了

@ConfigurationProperties(prefix = "chuanz")
public class ChuanzProperties {
    public static final String DEFAULT_NAME = "baichuan";
    public String name = DEFAULT_NAME;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

实现一个简单的服务类

public class ChuanzService {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

下面我们来实现最主要的自动配置类,注意配置完成后,需要将这个类添加到spring.factories,我们在src/main/resources/ META-INF/创建一个spring.factories文件,然后配置它

@Configuration
@ConditionalOnClass({ ChuanzService.class })
@EnableConfigurationProperties(ChuanzProperties.class)
public class ChuanzAutoConfiguration {

    @Autowired
    private ChuanzProperties chuanzProperties;

    @Bean
    @ConditionalOnMissingBean(ChuanzService.class)
    public ChuanzService chuanzService() {
        ChuanzService chuanzService = new ChuanzService();
        chuanzService.setName(chuanzProperties.getName());
        return chuanzService;
    }
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.chuanz.springboot.autoconfig.ChuanzAutoConfiguration

然后我们将项目打包成一个jar文件,这里我们可以把文件上传到本地私服上,然后再通过maven来引用

<dependency>
    <groupId>cn.chuanz</groupId>
    <artifactId>springboot-chuanz-autoconfig</artifactId>
    <version>0.1</version>
</dependency>

然后我们就可以在项目中通过@Autowired来注入ChuanzService使用了,通过chuanz前缀在application.properties中也可以配置name属性的值

 

网络释义
Autowired: 自动装配

Spring Boot读取配置的几种方式

Spring Boot读取配置几种方式

spring读取配置有三种方式,使用@Value,@ConfigurationProperties,@Environment,@PropertySource,除了@PropertySource不支持yml的文件读取,下面我们对这几种方式分别进行介绍

使用@Value

比如我们在application.properties或application.yml中定义了一些配置,如:dynamic.subscribeUrl,在项目中我们只需要使用@Value就可以获取到。

@Component
public class DynamicConfig {

    @Value("dynamic.subscribeUrl")
    private String subscribeUrl;

}

使用@ConfigurationProperties

上面的例子,我们也可以通过@ConfigurationProperties来实现,只需要指定一个前缀

@Component
@ConfigurationProperties(prefix = "dynamic")
public class DynamicConfig {

    private String subscribeUrl;

}

使用@Environment

@Environment可以获取所有加载的配置文件,我们只需要根据getProperty方法就可以获取

@Autowired
private Environment environment;

environment.getProperty(String key);

使用@PropertySource

我们获取config/flight.properties下所有的配置,前缀以dynamic开头的

@Component
@ConfigurationProperties(prefix = "dynamic")
@PropertySource("config/flight.properties")
public class DynamicConfig {

    private String subscribeUrl;

}

jenkins使用

jenkins使用

说明:jenkins是一个能实现项目自动化部署管理的工具

准备工作

下载软件:https://jenkins.io/download/ 下载war包就可以了,在不同的平台上都可以通用

安装git、maven工具

git1.7.1在和Jenkins使用的时候会出现问题,这里建议是用1.7.1以上的 由于centos6自带的git就是1.7.1的,所以我是编译安装的,安装步骤如下:

1)安装依赖工具

<code>yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel</code>

2)下载源代码,我用的是2.7.4 https://mirrors.edge.kernel.org/pub/software/scm/git/

<code>cd git-2.7.4
make prefix=/usr/local/git all
make prefix=/usr/local/git install
vim /etc/profile
// 在末尾新开一行填写下面的代码
export PATH=$PATH:/usr/local/git/bin
// :wq保存退出,然后执行下面的命令,让其生效
source /etc/profile
// 查看git版本
git --version</code>

maven安装 http://maven.apache.org/download.cgi 我下载最新版3.5.3 将文件放在/usr/local/下 配置环境变量:

<code>MAVEN_HOME=/usr/local/apache-maven-3.5.3
export MAVEN_HOME
export PATH=${MAVEN_HOME}/bin</code>

运行jenkins并创建项目

启动:在服务器上运行: nohup java -jar jenkins.war &
控制台会打印出初始化的用户名和密码 至此就在浏览器打开页面,输入账号就可以登录到管理界面,jenkins会自动扫描服务器上的安装的maven、git服务

新建一个项目:
newjenkins
选择项目源码位置,这里使用gitlab来管理源码,下面是登陆到gitlab的账号
newjenkins1

因为我使用的是maven项目,填写maven构建的执行命令,我这里是打包成一个war包
newjenkins2

添加构建后的操作,比如上传到war包到服务器,然后备份原来的项目,重启tomcat,这里面需要配合shell脚本来完成
newjenkins3

shell脚本,如下:基本的逻辑是先将原来的项目压缩,移动到备份目录,然后将修改后的项目上传到服务器对应的目录,并重启tomcat

<code>WORK_DIR=`pwd`
TOMCAT_DIR=$WORK_DIR/tomcat-dynamic-api
NOW_TIME=`date +%Y%m%d%H%M%S`
tar -jcvf $TOMCAT_DIR/webapps/dynamicAPI.tar.$NOW_TIME $TOMCAT_DIR/webapps/dynamicAPI
mv $TOMCAT_DIR/webapps/dynamicAPI.tar.$NOW_TIME $TOMCAT_DIR/backup/
mv $TOMCAT_DIR/backup/dynamicAPI.war $TOMCAT_DIR/webapps/
rm -rf $TOMCAT_DIR/webapps/dynamicAPI
kill -9 `ps aux | grep tomcat-dynamic-api | grep -v grep | awk '{print $2}'` &amp;&amp; $TOMCAT_DIR/bin/startup.sh</code>

maven配置

   
    <profiles>
        <profile>
            <id>local</id>
            <properties>
                <env>local</env>
            </properties>
        </profile>
        <profile>
            <id>product156</id>
            <!--
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
             -->
            <properties>
                <env>product_156</env>
            </properties>
        </profile>
        <profile>
            <id>product169</id>
            <properties>
                <env>product_169</env>
            </properties>
        </profile>
        <profile>
            <id>backup148</id>
            <properties>
                <env>backup_148</env>
            </properties>
            <build>
                <resources></resources>
            </build>
        </profile>

        <profile>
            <id>test157</id>
            <properties>
                <env>test_157</env>
            </properties>
        </profile>
        <profile>
            <id>test169</id>
            <properties>
                <env>test_169</env>
            </properties>
        </profile>

    </profiles>
    <!-- 配置文件 -->

    <build>
        <finalName>dynamicAPI</finalName>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>
        <resources>
            <resource>
                <directory>src/main/resources/${env}</directory>
            </resource>
        </resources>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <testResources>
            <testResource>
                <directory>src/test/resources</directory>
            </testResource>
        </testResources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <configuration>
                    <webResources>
                        <resource>
                            <directory>WebContent</directory>
                        </resource>
                        <resource>
                            <directory>libs/</directory>
                            <targetPath>WEB-INF/lib</targetPath>
                            <includes>
                                <include>**/*.jar</include>
                            </includes>
                        </resource>
                    </webResources>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.16</version>
                <configuration>
                    <skipTests>true</skipTests>
                    <junitArtifactName>junit:junit</junitArtifactName>
                    <argLine>-Dfile.encoding=UTF-8</argLine>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <excludes>
                        <exclude>product_156/**</exclude>
                        <exclude>product_169/**</exclude>
                        <exclude>test_157/**</exclude>
                        <exclude>*.properties</exclude>
                        <exclude>*.xml</exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.6</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>versions-maven-plugin</artifactId>
                <version>2.1</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.8</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

kafka和zookeeper集群安装

kafka和zookeeper集群安装

上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可

准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215

安装步骤

下载安装包:

地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

配置zookeeper

配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间

 dataDir=/data/kafka/zookeeper
 tickTime=2000
 initLimit=5
 syncLimit=2
 server.0=192.168.101.116:2888:3888
 server.1=192.168.101.115:2888:3888
 server.2=192.168.102.215:2888:3888

在每个服务器上注册zookeeper

/data/kafka/zookeeper目录下面touch myid文件
192.168.101.116上执行
echo "0" > /data/kafka/zookeeper/myid
192.168.101.115上执行
echo "1" > /data/kafka/zookeeper/myid
192.168.102.215上执行
echo "2" > /data/kafka/zookeeper/myid

配置kafka

 192.168.101.116上配置
 advertised.listeners=PLAINTEXT://zc1:9092
 broker.id=0

 192.168.101.115上配置
 advertised.listeners=PLAINTEXT://zc2:9092
 broker.id=1

 192.168.102.215上配置
 advertised.listeners=PLAINTEXT://zc3:9092
 broker.id=2

 通用配置
 log.dirs=/data/kafka/kafka-logs
 zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181

配置hosts

在每个服务器/etc/hosts中添加如下配置

192.168.101.116 zc1
192.168.101.115 zc2
192.168.102.215 zc3

开放防火墙端口

由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

启动zookeeper和kafka

先启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

再启动kafka

bin/kafka-server-start.sh config/server.properties &

测试数据写入和读取

使用命令测试

生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

>hello
>word

消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

hello
word

代码测试

生产者

public class ProducerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Producer&lt;String, String&gt; producer;
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    public ProducerTest() {
        /** 生产者*/
        InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
        Properties prop_proc = new Properties();
        try {
            prop_proc.load(in_proc);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_producer配置文件失败", e);
        }
        producer = new KafkaProducer&lt;String, String&gt;(prop_proc);
    }

    public void execute() {
        while(true) {
            try {
                String key = "CA1234";
                String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
                this.sendToTestServer(key, message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    private void sendToTestServer(String key, String message) {
        logger.info("发送消息:"+message);
        ProducerRecord&lt;String, String&gt; producerRecord = new ProducerRecord&lt;String, String&gt;(TOPIC_NAME, key, message);
        producer.send(producerRecord);
    }

    public static void main(String[] args) {
        new ProducerTest().execute();
    }

}

kafka_test_producer.properties

kafka.topic=firsttopic
group.id=chuanzh
bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092
retries=5
request.timeout.ms=10000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

public class ConsumerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Consumer&lt;String, String&gt; consumer;
    private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);

    public ConsumerTest() {
        /** 消费者*/
        InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
        Properties props = new Properties();
        try {
            props.load(ksin);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_consumer配置文件失败", e);
        }
        consumer = new KafkaConsumer&lt;String, String&gt;(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    }

    public void execute() {
        while(true) {
            try {
                ConsumerRecords&lt;String, String&gt; records = consumer.poll(2);
                logger.info("读取kafka,取到消息数量:" + records.count());
                for (ConsumerRecord&lt;String, String&gt; record : records) {
                    logger.info("value:{}", record.value());
                    logger.info("offset:{}", record.offset());
                }

                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    public static void main(String[] args) {
        new ConsumerTest().execute();
    }

}

kafka_test_consumer.properties

kafka.topic=flightcore
group.id=flightPacketPush
bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092
metadata.max.age.ms=60000
max.poll.records=50
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。

启用免费HTTPS

最近看了酷壳网耗子哥写的博客,把我的网站的http域名也改为https了

为何要使用https,而不是http?

https可以理解为一种安全的http通信协议,比如你通过浏览器访问一个网站,中间会通过网络运营商,由于http是明文传输,中间报文就可能被其他人劫持,获取你的信息,而https就可以保证传输的加密性。

我的服务器是centos 6和nginx,打开网址(https://certbot.eff.org)选择nginx和centos6,照着步骤一步步做就行了。

因为原先使用的是apache,这里顺便说下如何使用nginx替换apache

安装nginx

yum install nginx
service nginx start

nginx的网页目录默认为:/usr/share/nginx/html/

可以通过vi /etc/nginx/conf.d/default.conf来修改默认目录,我修改为原先的Apache的目录是/var/www/html

root         /var/www/html;

由于nginx只负责转发请求,并不能解析php脚本,所以我们还需要安装php解析器:php-fpm,nginx是通过把请交给php-fpm来解析php生成html页面,所以安装它:

yum install php-fpm

接着配置ngxin的域名目录并关联php-fpm

server {
        listen 80;
        root /var/www/html/blog;
        server_name www.ihnbc.cn ihnbc.cn;
        index index.shtml index.html index.htm index.php;
        charset utf-8;
        access_log /home/www.ihnbc.cn.access.log main;

        location ~.*\.(php|php5)?$ {
            fastcgi_pass 127.0.0.1:9000;
            fastcgi_index index.php;
            include fastcgi.conf;
       }
}

php结尾的域名全部交给127.0.0.1:9000来执行,而php-fpm正好监听这个端口,启动php-fpm,重启nginx

/etc/init.d/php-fpm start
service nginx restart

配置https证书,下载certauto

wget https://dl.eff.org/certbot-auto
chmod a+x certbot-auto

然后运行:

sudo ./path/to/certbot-auto --nginx

certbot会扫描你Nginx里面的域名,选择需要配置的域名,多个用空格隔开,其他选项安装步骤执行就可以了,运行完后,certbot会在你的Nginx加上如下配置:

    listen 443 ssl http2; # managed by Certbot
    ssl_certificate /etc/letsencrypt/live/ihnbc.cn/fullchain.pem; # managed by Certbot
    ssl_certificate_key /etc/letsencrypt/live/ihnbc.cn/privkey.pem; # managed by Certbot
    include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
    ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot

    if ($scheme != "https") {
        return 301 https://$host$request_uri;
    } # managed by Certbot

注意 listen 443 ssl后面http2是我添加上去的,http2开启后更有助于提高https的性能。

重启nginx,由于Let’s Encrypt的证书90天过期,所以你还需要配置一个任务定时去更新证书,使用cron,如下:

0 1 * * * /home/zc/certbot-auto renew

最后还有一件事要做,因为原先WordPress网站中使用的链接都是http的,所以你还需要更新网页中的http链接,不然你的链接上不会显示“安全锁”标识,使用WordPress的插件search-regex就可以批量替换(手动下载search-regex,放到wordpress的wp-content/plugins就可以了)