Author Archives: chuan.zhang

jenkins使用

jenkins使用

说明:jenkins是一个能实现项目自动化部署管理的工具

准备工作

下载软件:https://jenkins.io/download/ 下载war包就可以了,在不同的平台上都可以通用

安装git、maven工具

git1.7.1在和Jenkins使用的时候会出现问题,这里建议是用1.7.1以上的 由于centos6自带的git就是1.7.1的,所以我是编译安装的,安装步骤如下:

1)安装依赖工具

<code>yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel</code>

2)下载源代码,我用的是2.7.4 https://mirrors.edge.kernel.org/pub/software/scm/git/

<code>cd git-2.7.4
make prefix=/usr/local/git all
make prefix=/usr/local/git install
vim /etc/profile
// 在末尾新开一行填写下面的代码
export PATH=$PATH:/usr/local/git/bin
// :wq保存退出,然后执行下面的命令,让其生效
source /etc/profile
// 查看git版本
git --version</code>

maven安装 http://maven.apache.org/download.cgi 我下载最新版3.5.3 将文件放在/usr/local/下 配置环境变量:

<code>MAVEN_HOME=/usr/local/apache-maven-3.5.3
export MAVEN_HOME
export PATH=${MAVEN_HOME}/bin</code>

运行jenkins并创建项目

启动:在服务器上运行: nohup java -jar jenkins.war &
控制台会打印出初始化的用户名和密码 至此就在浏览器打开页面,输入账号就可以登录到管理界面,jenkins会自动扫描服务器上的安装的maven、git服务

新建一个项目:
newjenkins
选择项目源码位置,这里使用gitlab来管理源码,下面是登陆到gitlab的账号
newjenkins1

因为我使用的是maven项目,填写maven构建的执行命令,我这里是打包成一个war包
newjenkins2

添加构建后的操作,比如上传到war包到服务器,然后备份原来的项目,重启tomcat,这里面需要配合shell脚本来完成
newjenkins3

shell脚本,如下:基本的逻辑是先将原来的项目压缩,移动到备份目录,然后将修改后的项目上传到服务器对应的目录,并重启tomcat

<code>WORK_DIR=`pwd`
TOMCAT_DIR=$WORK_DIR/tomcat-dynamic-api
NOW_TIME=`date +%Y%m%d%H%M%S`
tar -jcvf $TOMCAT_DIR/webapps/dynamicAPI.tar.$NOW_TIME $TOMCAT_DIR/webapps/dynamicAPI
mv $TOMCAT_DIR/webapps/dynamicAPI.tar.$NOW_TIME $TOMCAT_DIR/backup/
mv $TOMCAT_DIR/backup/dynamicAPI.war $TOMCAT_DIR/webapps/
rm -rf $TOMCAT_DIR/webapps/dynamicAPI
kill -9 `ps aux | grep tomcat-dynamic-api | grep -v grep | awk '{print $2}'` &amp;&amp; $TOMCAT_DIR/bin/startup.sh</code>

maven配置

   
    <profiles>
        <profile>
            <id>local</id>
            <properties>
                <env>local</env>
            </properties>
        </profile>
        <profile>
            <id>product156</id>
            <!--
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
             -->
            <properties>
                <env>product_156</env>
            </properties>
        </profile>
        <profile>
            <id>product169</id>
            <properties>
                <env>product_169</env>
            </properties>
        </profile>
        <profile>
            <id>backup148</id>
            <properties>
                <env>backup_148</env>
            </properties>
            <build>
                <resources></resources>
            </build>
        </profile>

        <profile>
            <id>test157</id>
            <properties>
                <env>test_157</env>
            </properties>
        </profile>
        <profile>
            <id>test169</id>
            <properties>
                <env>test_169</env>
            </properties>
        </profile>

    </profiles>
    <!-- 配置文件 -->

    <build>
        <finalName>dynamicAPI</finalName>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>
        <resources>
            <resource>
                <directory>src/main/resources/${env}</directory>
            </resource>
        </resources>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <testResources>
            <testResource>
                <directory>src/test/resources</directory>
            </testResource>
        </testResources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <configuration>
                    <webResources>
                        <resource>
                            <directory>WebContent</directory>
                        </resource>
                        <resource>
                            <directory>libs/</directory>
                            <targetPath>WEB-INF/lib</targetPath>
                            <includes>
                                <include>**/*.jar</include>
                            </includes>
                        </resource>
                    </webResources>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.16</version>
                <configuration>
                    <skipTests>true</skipTests>
                    <junitArtifactName>junit:junit</junitArtifactName>
                    <argLine>-Dfile.encoding=UTF-8</argLine>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <excludes>
                        <exclude>product_156/**</exclude>
                        <exclude>product_169/**</exclude>
                        <exclude>test_157/**</exclude>
                        <exclude>*.properties</exclude>
                        <exclude>*.xml</exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.6</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>versions-maven-plugin</artifactId>
                <version>2.1</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.8</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

kafka和zookeeper集群安装

kafka和zookeeper集群安装

上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可

准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215

安装步骤

下载安装包:

地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

配置zookeeper

配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间

 dataDir=/data/kafka/zookeeper
 tickTime=2000
 initLimit=5
 syncLimit=2
 server.0=192.168.101.116:2888:3888
 server.1=192.168.101.115:2888:3888
 server.2=192.168.102.215:2888:3888

在每个服务器上注册zookeeper

/data/kafka/zookeeper目录下面touch myid文件
192.168.101.116上执行
echo "0" > /data/kafka/zookeeper/myid
192.168.101.115上执行
echo "1" > /data/kafka/zookeeper/myid
192.168.102.215上执行
echo "2" > /data/kafka/zookeeper/myid

配置kafka

 192.168.101.116上配置
 advertised.listeners=PLAINTEXT://zc1:9092
 broker.id=0

 192.168.101.115上配置
 advertised.listeners=PLAINTEXT://zc2:9092
 broker.id=1

 192.168.102.215上配置
 advertised.listeners=PLAINTEXT://zc3:9092
 broker.id=2

 通用配置
 log.dirs=/data/kafka/kafka-logs
 zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181

配置hosts

在每个服务器/etc/hosts中添加如下配置

192.168.101.116 zc1
192.168.101.115 zc2
192.168.102.215 zc3

开放防火墙端口

由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

启动zookeeper和kafka

先启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

再启动kafka

bin/kafka-server-start.sh config/server.properties &

测试数据写入和读取

使用命令测试

生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

>hello
>word

消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

hello
word

代码测试

生产者

public class ProducerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Producer&lt;String, String&gt; producer;
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    public ProducerTest() {
        /** 生产者*/
        InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
        Properties prop_proc = new Properties();
        try {
            prop_proc.load(in_proc);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_producer配置文件失败", e);
        }
        producer = new KafkaProducer&lt;String, String&gt;(prop_proc);
    }

    public void execute() {
        while(true) {
            try {
                String key = "CA1234";
                String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
                this.sendToTestServer(key, message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    private void sendToTestServer(String key, String message) {
        logger.info("发送消息:"+message);
        ProducerRecord&lt;String, String&gt; producerRecord = new ProducerRecord&lt;String, String&gt;(TOPIC_NAME, key, message);
        producer.send(producerRecord);
    }

    public static void main(String[] args) {
        new ProducerTest().execute();
    }

}

kafka_test_producer.properties

kafka.topic=firsttopic
group.id=chuanzh
bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092
retries=5
request.timeout.ms=10000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

public class ConsumerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Consumer&lt;String, String&gt; consumer;
    private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);

    public ConsumerTest() {
        /** 消费者*/
        InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
        Properties props = new Properties();
        try {
            props.load(ksin);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_consumer配置文件失败", e);
        }
        consumer = new KafkaConsumer&lt;String, String&gt;(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    }

    public void execute() {
        while(true) {
            try {
                ConsumerRecords&lt;String, String&gt; records = consumer.poll(2);
                logger.info("读取kafka,取到消息数量:" + records.count());
                for (ConsumerRecord&lt;String, String&gt; record : records) {
                    logger.info("value:{}", record.value());
                    logger.info("offset:{}", record.offset());
                }

                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    public static void main(String[] args) {
        new ConsumerTest().execute();
    }

}

kafka_test_consumer.properties

kafka.topic=flightcore
group.id=flightPacketPush
bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092
metadata.max.age.ms=60000
max.poll.records=50
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。

启用免费HTTPS

最近看了酷壳网耗子哥写的博客,把我的网站的http域名也改为https了

为何要使用https,而不是http?

https可以理解为一种安全的http通信协议,比如你通过浏览器访问一个网站,中间会通过网络运营商,由于http是明文传输,中间报文就可能被其他人劫持,获取你的信息,而https就可以保证传输的加密性。

我的服务器是centos 6和nginx,打开网址(https://certbot.eff.org)选择nginx和centos6,照着步骤一步步做就行了。

因为原先使用的是apache,这里顺便说下如何使用nginx替换apache

安装nginx

yum install nginx
service nginx start

nginx的网页目录默认为:/usr/share/nginx/html/

可以通过vi /etc/nginx/conf.d/default.conf来修改默认目录,我修改为原先的Apache的目录是/var/www/html

root         /var/www/html;

由于nginx只负责转发请求,并不能解析php脚本,所以我们还需要安装php解析器:php-fpm,nginx是通过把请交给php-fpm来解析php生成html页面,所以安装它:

yum install php-fpm

接着配置ngxin的域名目录并关联php-fpm

server {
        listen 80;
        root /var/www/html/blog;
        server_name www.ihnbc.cn ihnbc.cn;
        index index.shtml index.html index.htm index.php;
        charset utf-8;
        access_log /home/www.ihnbc.cn.access.log main;

        location ~.*\.(php|php5)?$ {
            fastcgi_pass 127.0.0.1:9000;
            fastcgi_index index.php;
            include fastcgi.conf;
       }
}

php结尾的域名全部交给127.0.0.1:9000来执行,而php-fpm正好监听这个端口,启动php-fpm,重启nginx

/etc/init.d/php-fpm start
service nginx restart

配置https证书,下载certauto

wget https://dl.eff.org/certbot-auto
chmod a+x certbot-auto

然后运行:

sudo ./path/to/certbot-auto --nginx

certbot会扫描你Nginx里面的域名,选择需要配置的域名,多个用空格隔开,其他选项安装步骤执行就可以了,运行完后,certbot会在你的Nginx加上如下配置:

    listen 443 ssl http2; # managed by Certbot
    ssl_certificate /etc/letsencrypt/live/ihnbc.cn/fullchain.pem; # managed by Certbot
    ssl_certificate_key /etc/letsencrypt/live/ihnbc.cn/privkey.pem; # managed by Certbot
    include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
    ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot

    if ($scheme != "https") {
        return 301 https://$host$request_uri;
    } # managed by Certbot

注意 listen 443 ssl后面http2是我添加上去的,http2开启后更有助于提高https的性能。

重启nginx,由于Let’s Encrypt的证书90天过期,所以你还需要配置一个任务定时去更新证书,使用cron,如下:

0 1 * * * /home/zc/certbot-auto renew

最后还有一件事要做,因为原先WordPress网站中使用的链接都是http的,所以你还需要更新网页中的http链接,不然你的链接上不会显示“安全锁”标识,使用WordPress的插件search-regex就可以批量替换(手动下载search-regex,放到wordpress的wp-content/plugins就可以了)

 

Hystrix服务隔离及熔断

上次说了使用RxJava并行提升API效率,但是如何避免项目中一个服务拖慢整体的性能?hystrix优雅的解决了这个问题。

先贴个图,网上找的,我觉得这个图就可以说明问题了

hystrix-1

当依赖服务A挂了,直接就会影响到整个服务的性能,如果连接一直被A暂用,最终将导致服务发生雪崩

所以我们要保证核心服务的稳定,其他非核心的服务可以不那么重要,即使出现了错误数据可以丢弃或不用保证实时准确,再以SOA项目中的getDynamicInfo为例,如下:

87194035-3D85-4860-AB44-35EA71697A44

getFlight为核心的业务,必须要获取到数据,获取不到那就整个服务不可用了

getEntryCard、getPlane、getRefund、getWeather、getPortaitUser等方法为非核心业务,我们允许其数据丢失的情况,要保证整个服务的可用

首先我们需要把这些服务使用hystrix分离出来,先对这几个服务做先分组:

pydyn:getEntryCard、getPlane、getRefund、getWeather

portrait:getPortaitUser

Maven配置

	<dependency>
	  <groupId>com.netflix.hystrix</groupId>
	  <artifactId>hystrix-core</artifactId>
	  <version>1.5.8</version>
	</dependency>

以getEntryCard服务为例,继承HystrixCommand<T>,重写run方法

public class GetEntryCardCommand extends HystrixCommand<String>{

	private String arrCode;
        private GetEntryCardService service;

	public GetEntryCardCommand(String arrCode) {
		super(setter());
		this.arrCode = arrCode;
	}

	private static Setter setter() {
		return PySetter.setter().andCommandKey(HystrixCommandKey.Factory.asKey("getEntryCard"));
	}

	@Override
	protected String run() throws Exception {
		// TODO Auto-generated method stub
		return service.getEntryCardCity(arrCode);
	}

	@Override
	protected String getFallback() {
		return service.getFromCache(arrCode);
	}

}

run方法要执行的业务逻辑,如果出现异常则会调用getFallback方法。

public class PySetter {

	public static Setter setter() {
		// 服务分组
		HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("pydyn");
		// 服务标识
		//HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("getEntryCard");
		// 线程池名称
		HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("pydyn-pool");
		// 线程配置
		HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
				.withCoreSize(10)
				.withKeepAliveTimeMinutes(5)
				.withMaxQueueSize(Integer.MAX_VALUE)
				.withQueueSizeRejectionThreshold(10000);

		//命令属性配置
		HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
				.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                                .withExecutionIsolationThreadInterruptOnTimeout(true).withExecutionTimeoutInMilliseconds(3000) //设置超时3秒自动熔断
				.withCircuitBreakerErrorThresholdPercentage(30);//失败率达到30%自动熔断

		return HystrixCommand.Setter
				.withGroupKey(groupKey)
				//.andCommandKey(commandKey)
				.andThreadPoolKey(threadPoolKey)
				.andThreadPoolPropertiesDefaults(threadPoolProperties)
				.andCommandPropertiesDefaults(commandProperties);
	}

}

以上是对服务的线程池熔断配置

HystrixCommandGroupKey:服务分组,以上pydyn分组就包括4个服务,必填选项

HystrixCommandKey:服务的名称,唯一标识,如果不配置,则默认是类名

HystrixThreadPoolKey:线程池的名称,相同线程池名称的线程池是同一个,如果不配置,默认为分组名

HystrixThreadPoolProperties:线程池的配置,coreSize配置核心线程池的大小,maxQueueSize线程池队列的最大大小,queueSizeRejectionThreshold,限制当前队列的大小,实际队列大小由这个参数决定,即到达队列里面条数到达10000,则都会被拒绝。

HystrixCommandProperties:配置命令的一些参数,如executionIsolationStrategy,配置执行隔离策略,默认是使用线程隔离,THREAD即为线程池隔离,ExecutionIsolationThreadInterruptOnTimeout和ExecutionTimeoutInMilliseconds配置了启用超时和最大执行时间,这里为3s,circuitBreakerErrorThresholdPercentage失败率配置,默认为50%,这里配置的为30%,即失败率到达30%触发熔断

接下来就是调用GetEntryCardCommand获取数据

String result = new GetEntryCardCommand(request.getArrCode()).execute();

我使用的是阻塞的方式获取,当然也可以使用异步的方法,得到Future对象,使用

Future<String> = new GetEntryCardCommand(request.getArrCode()).queue();

其他getPlane、getRefund、getWeather方法同GetEntryCardCommand类,

getPortaitUser也一样,只需要更改下自己的分组为portait,线程池名称,并配置自己的线程池参数

好了,这样就实现了多个服务之间的隔离和熔断,不同的服务分组使用不同的线程池,即使getDynamicInfo并发量突然剧增,也不会对getPlane、getRefund、getWeather等产生影响

下面来说说对服务的监控,hystrix已经帮我们实现了数据的记录,只需要安装他们的管理后台就可以查看数据,他们家有2个监控后台:hystrix-dashboard和Turbine,第一个是监控单个项目的日志,第二个可以把多个项目的日志聚合在一起

hystrix-dashboard监控:

maven配置:

	 <dependency>  
	     <groupId>com.netflix.hystrix</groupId>  
	     <artifactId>hystrix-metrics-event-stream</artifactId>  
	     <version>1.1.2</version>  
 	</dependency>

web.xml配置

    <servlet>  
	    <display-name>HystrixMetricsStreamServlet</display-name>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>  
	</servlet>  
	<servlet-mapping>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <url-pattern>/hystrix.stream</url-pattern>  
	</servlet-mapping>

访问http://ip:port/projectName/hystrix.stream查看是否有监控数据

hystrix-dashboard:先安装hystrix-dashboard.war,下载后放入web容器(Tomcat或Jetty)中,启动容器,访问http://ip:port/hystrix-dashboard,如下的界面:

371D6055-27B8-41ED-ADF3-544E92ECC934

最后点击Monitor Stream就可以看到数据了

EFD10FC9-9BAB-43DE-B5EB-09082222744B

可以看到每个服务及他们的线程池使用情况

CirCuit主要监控成功的请求数,请求超时数,失败率,平均耗时、90%,99%,99.5%的耗时(将鼠标移动到对于数字位置可以看到描述)

Thread Pools主要监控线程池的配置数,线程队列的配置数,最大活跃线程

 

Turbine监控(将多个实例的数据聚合起来)

1)下载turbine-web-1.0.0.war,并将war放入web容器中。

2)在容器下路径为turbine-web-1.0.0/WEB-INF/classes下新建config.properties文件。

InstanceDiscovery.impl=com.netflix.turbine.discovery.ConfigPropertyBasedDiscovery
#cluster
turbine.aggregator.clusterConfig=dynamicAPI
turbine.instanceUrlSuffix=:8080/projectName/hystrix.stream
turbine.ConfigPropertyBasedDiscovery.dynamicAPI.instances=127.0.0.1,127.0.0.2

这里我配置了2个实例127.0.0.1和127.0.0.2,他会同时调用http://127.0.0.1:8080/projectName/hystrix.stream和http://127.0.0.2:8080/projectName/hystrix.stream,然后根据分组名,服务名,线程池名将两个实例数据合并

3)调用http://ip:port/turbine-web-1.0.0/turbine.stream?cluster=dynamicAPI,即可查看到聚合的数据

4)打开hystrix-dashboard配置turbine

766892D0-ADE9-4BC4-A991-7844965A1557

点击Monitor Stream即可查看到两个实例监控的数据

5EC0BFFE-3C41-4F14-980D-39614A231A16

 

参考资料:

http://tech.lede.com/2017/06/15/rd/server/hystrix/

http://www.cnblogs.com/java-zhao/archive/2016/09/01/5831002.html

 

RxJava使用介绍

在说RxJava之前先说说ReactiveX。

ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言,RxJava则是java语言的实现。

Rx介绍:

1)扩展的观察者模式:通过订阅可观测对象的序列流然后做出反应。

2)迭代器模式:对对象序列进行迭代输出从而使订阅者可以依次对其处理。

3)函数式编程思想:简化问题的解决的步骤,让你的代码更优雅和简洁

为什么说是扩展的观察者模式?

观察者模式:被观察者发出事件,然后观察者(事件源)订阅然后进行处理。

图片 1

扩展:如果没有观察者,被观察者是不会发出任何事件的。另外知道事件何时结束,还有错误通知处理

迭代器模式

提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示

《RxJava Essentials》一书做的的对比:迭代器模式在事件处理上采用的是“同步/拉式”的方式,而被观察者采用的是“异步/推式”的方式,而对观察者而言,显然后者更灵活。

图片 1

函数式编程

//线程操作模式
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
//函数模式 (Lambda)A->B ->C->D
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

 

RxJava核心

Observable(被观察者,也就是事件源)和Subscriber(观察者)

//被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

//观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                log.info("基础写法:"+s);
            }
      };

 myObservable.subscribe(mySubscriber);
Observable<String> myObservable = Observable.just("Hello World!");

		Action1<String> onNextAction = new Action1<String>() {

			@Override
			public void call(String s) {
				logger.info("Action1简化后:"+s);
			}
		};

		myObservable.subscribe(onNextAction);

		/* 写成匿名函数*/
		Observable.just("Hello World!").subscribe(new Action1<String>() {
			@Override
			public void call(String s) {
				logger.info("匿名函数写法:"+s);
			}
		});

		/*用Java 8 lambdas(Retrolambda)表达式*/
		Observable.just("Hello World!").subscribe(s -> logger.info("lambdas表达式写法:"+s));

 

RxJava操作符

创建操作符:Create, Defer, From, Interval, Just, Range, Repeat, Timer等。

String[] strings = {"张三","李四","王五","赵六"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("name",s);
            }
        });

变换操作:Map、FlatMap、ConcatMap等

public void showUserName(String userName){
	textView.setText(userName);
}

public void showUserName(String userName){
Observable.just(userName).subscribe( new  Action1<String>(){
         @Override
         public void call(String s){
            textView.setText(s);
        }
});  
}

如果需要在显示前对这个字符串做处理,然后再展示,比如加“张三,你好”

方法1:我们可以对字符串本身操作

方法2:我们可以放到Action1.call()方法里做处理

方法3:使用操作符做变换:map

public void showUserName(String userName){
Observable.just(userName).map(new Func1<String,String>(){
          public String call(String text){
             return handleUserName(text);   
           }
   }).subscribe( new Action1<String>(){
        public void call(String s){
        }
        });
}

打印出中国的所有省份名称:flatMap()

List<Province>  provinceList = …
Observable.from(provinceList)
.flatMap(new Func1<Province,String>(){
@Override
 public String call(Province province){
            return province.getName();
      }
}).subscribe(new Action1<String>(){
         @Override
         public void call(String s){
            Log.i(“省份名称”,s)
        }
});
List<Province>  provinceList = …
Observable.from(provinceList)
.subscribe(new Action1<Province>(){
         @Override
         public void call(Province province){
            List<City> cities = province.getCities();
            for (int i = 0; i < cities.size(); i++) {
                   City city = cities.get(i);
                   Log.i(“城市”, city.getName());
            }        
          }
});

显然第一种比第二种看着更简洁,清晰

异步

调度器Scheduler:

0b350c8350142aff

操作符:

subscribeOn():指定回调发生的线程,事件消费的线程,可以执行多次!

observeOn():订阅事件发生的线程,事件产生的线程,只允许执行一次。

其他操作符

01E17C8B-D4F3-4D92-859D-D36D72D5A07C

Debounce:“去抖”,只有在空闲了一段时间后才发射数据,过滤掉发射速率过快的数据项

Sample:“采样”,定期发射Observable最近发射的数据项

F77989EF-B666-47B5-9FCB-901E68BE64C3

String[] numbers = {"11", "2", "2", "13", "4", "5", "7"}
Observable
  .from(numbers)
  .map(s -> Integer.parseInt(s))
  .filter(s -> s < 10)
  .distinct()
  .takeLast(3)
  .reduce((number1, number2) -> 
     number1 + number2)
  )
  .subscribe(i -> System.out.println(i));//16

 

流式处理的优势

如需要从多个数据源获取数据内存、磁盘、网络依次获取。

Observable<String> memory= bservable.just("memory"); 
Observable<String> disk= Observable.just("disk");  
Observable<String> network=Observable.just("network");  

//依次检查memory、disk、network  
Observable.concat(memory, disk, network)  
.first()  
.subscribeOn(Schedulers.newThread())  
.subscribe(s -> {  
    memoryCache = "memory";  
    System.out.println("--------------subscribe: " + s);  
});

参考资料:

https://gank.io/post/560e15be2dca930e00da1083

http://www.jianshu.com/p/e0891032ee4d

 

RxJava在SOA中的运用

在做SOA服务化时,有时候一个服务依赖于其他很多服务,如下图:

8D5F1A87-787D-460D-BA2C-292DB4E6BCB0

最常规的做法是串行调用接口,最后将结果合并,如果为了提高效率,我们想并行调用每个接口,最后将结果合并,如何做呢?

首先我们想到的是使用多线程去执行,JUC中CountDownLatch可以实现这个效果,最先初始化n个任务传给countDownLatch,然后利用线程池去执行每个任务,执行完后使用countDown()方法将任务递减,CountDownLatch.awai()等待指导所有的任务执行完成。RxJava提供了比较优雅的方法,我们来看看它是怎么实现的。

rxjava的实现思路也是一样,创建多个异步处理任务,最后将结果合并,拿调用getPlane接口来说:

private Observable<PlaneBean> getPlane()
			throws Exception {
		return Observable.create(new Observable.OnSubscribe<PlaneBean>() {
			@Override
			public void call(Subscriber<? super PlaneBean> subscriber) {
				PlaneBean plane = new PlaneBean();
				try {
					/* 调用服务业务处理*/
				} catch (Exception e) {
					logger.error(FuncStatic.errorTrace(e));
				}
				subscriber.onNext(plane);
				subscriber.onCompleted();
				logger.info(requestId + " get plane info end");
			}
		}).subscribeOn(Schedulers.from(workPool));
	}

使用Observable.create创建一个异步任务,在call方法中写需要需要处理的业务逻辑,执行完成后将数据plane传入到subscriber对象中,并调用onCompleted()方法表示结束执行,核心为subscribeOn方法,这个任务会交给workPool来调度,所以最初我们还要创建一个线程池

private static ExecutorService workPool = Executors.newFixedThreadPool(50);

其他API方法调用同上,再来说下合并,RxJava提供了merge和zip方法来合并任务,merge方法要求每个任务返回的结果都相同,zip则不限制,根据需求这里我们使用zip方法

Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial()
		new Func5<DynamicBean, ShareBean, PreBean, PlaneBean, FiducialBean, GetDetailResponse>() {
			@Override
			public GetDetailResponse call(DynamicBean t1, ShareBean t2, PreBean t3,
					PlaneBean t4, FiducialBean t5) {
				if (t1 != null)
					response.setDynamic(t1);
				if (t2 != null)
					response.setShare(t2);
				if (t3 != null)
					response.setPre(t3);
				if (t4 != null)
					response.setPlane(t4);
				if (5 != null)
					response.setFiducial(t4);
				return response;
			}

		}).subscribeOn(Schedulers.from(workPool)).toBlocking().subscribe();

因为这里我调用的5个API,所有使用方法Func5,如果是3个则使用Func3,同样交给workPool线程池来处理合并的结果,注意这里要使用toBlocking来阻塞阻塞合并操作,等待所有任务都执行完成后再进行合并,最后将结果赋予GetDetailResponse对象。

以上就完成了并行调度的执行,在API的依赖逐渐增多,这样可以大大提高执行效率,但也有一个问题,如果某个API执行时间很长,将对拖慢整个接口的执行时间,导致接口发送雪崩,下次讲讲如果避免这种情况。

JAVA内存分析

JAVA通过自带的垃圾回收机制来管理内存,但对于项目的内存分析也要了解,比如项目的内存使用情况,何时回收?或出现内存泄露如何排查?

首先看看JVM内存分配图:

671D4492-75BB-4046-8B2E-199ABDE70734

如上图,jvm包含了堆、本地方法栈,

堆上存放新建(New)的对象,它被划为为新生代,老年代两个区域,新生代又分为伊甸园、幸存者两个区域,不同的区域存放的对象生命周期不同

说说JAVA中几种内存溢出的情况:

1,JVM堆溢出(java.lang.OutOfMemoryError: java heap space)

当生成一个新对象,JVM内存申请如下流程:

1),jvm先尝试在eden分配新对象所需的内存,若内存足够,则将对象放入eden返回

2),若内存不够,jam启动youngGC,试图将eden不活跃的对象释放掉,若释放后仍不足以分配内存,则将Eden活跃的对象放入survivor中。

3),survivor作为Eden和old的中间交换区域,若old空间足够,survivor去对象会被移动到old区,否则留在survivor区

4),当old区不够时,jvm会在old区进行fullGC,若fullGC后,survivor和old仍然无法存放从Eden复制过来的对象,则会出现“outOfMemoryError: java heap space”

解决方法:加大堆内存的大小,通过设置-Xms(java heap初始化大小,默认是物理内存1/64) -Xmx(java heap的最大值) -Xmn(新生代heap的大小,一般为Xmx3或4分之一,注:增加新生代后会减少老年代的大小)

2,方法区内存溢出(java.lang.OutMemoryError: permGen space)

方法区主要是用来存放类信息,常量、静态变量等,所以程序中类加载过多(引入第三方包),或者过多使用反射、cglib这种动态代理,就可能导致该区域内存溢出

解决方法:通过设置-XX:PermSize(内存永久区初始值)和-XX:MaxPerSize(内存永久区最大值)的大小

3,线程栈溢出(java.lang.StackOverflowError)

线程栈是线程独有的一块内存区域,所以线程栈溢出必定是线程运行是出现错误,一般是递归太深,或者方法层级调用太深引起的

解决方法:设置栈区的大小,通常栈的大小是1-2M,可通过-Xss设置线程的栈的大小,jdK5以后每个栈默认大小为1M。

下面针对线上的某个项目,查看它是否出现内存溢出的情况

1,如何查看项目的内存使用情况?

jmap -heap pid

Attaching to process ID 64909, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.91-b14

using thread-local object allocation.
Parallel GC with 4 thread(s)

Heap Configuration:
   MinHeapFreeRatio         = 0
   MaxHeapFreeRatio         = 100
   MaxHeapSize              = 2147483648 (2048.0MB)
   NewSize                  = 44564480 (42.5MB)
   MaxNewSize               = 715653120 (682.5MB)
   OldSize                  = 89653248 (85.5MB)
   NewRatio                 = 2
   SurvivorRatio            = 8
   MetaspaceSize            = 21807104 (20.796875MB)
   CompressedClassSpaceSize = 1073741824 (1024.0MB)
   MaxMetaspaceSize         = 17592186044415 MB
   G1HeapRegionSize         = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
   capacity = 31457280 (30.0MB)
   used     = 12660000 (12.073516845703125MB)
   free     = 18797280 (17.926483154296875MB)
   40.24505615234375% used
From Space:
   capacity = 1572864 (1.5MB)
   used     = 1179648 (1.125MB)
   free     = 393216 (0.375MB)
   75.0% used
To Space:
   capacity = 9437184 (9.0MB)
   used     = 0 (0.0MB)
   free     = 9437184 (9.0MB)
   0.0% used
PS Old Generation
   capacity = 40894464 (39.0MB)
   used     = 4514768 (4.3056182861328125MB)
   free     = 36379696 (34.69438171386719MB)
   11.040046887520033% used

查看Java内存的分配情况及新生代、老年代内存的使用情况,确定是否内存分配过小?

2,判断是否出现内存泄露

jstat -gcutil pid 2000

D09AEF2B-2D3E-4D50-B74C-E603875EAD70

上面命令表示每隔2秒打印出GC的使用回收情况,若FGC很多很可能出现了内存泄露

3,查看占用内存最多的对象

jmap -histo:live pid | more

0B137198-E38B-4466-A554-E68FEC36EC61

按使用大小进行了排序,重点查看排在前面的对象,看是否程序写的有问题。

另外可以将jvm的堆内存导出来分析,使用

jmap -dump:format=b,file=dynamicapi.hprof pid

使用java vistual工具分析,jdk自带了jvisualvm就可以进行内存分析,执行命令:

进入jkd的bin目录,执行jvisualvm

点击文件->装入,选择堆内存文件,即可看到堆中类的使用情况

3E5ECD10-58E2-450B-A8C4-31B276689EC6

 

Twitter分布式ID算法Snowflake

Snowflake是Twitter开发的一个分布式ID生成算法,有以下几个特点:

1)默认情况下41bit的时间戳可以支持该算法使用到2082年,10bit的工作机器id可以支持1023台机器,序列号支持1毫秒产生4095个自增序列id,即理论上1秒产生409万id

2)高性能,不依赖其他第三方服务,稳定性高

3)强依赖于机器时钟

下面看看它的算法结构图:

图片 1

可以看到它是由三部分组成

1)当前时间戳

2)工作机器ID:包括dataCenterId和workId,可自己配置

3)12bit序列号,即从0增长到4095

算法其实很简单,因为不依赖于其它服务器,都是做时间比较和位移操作,流程图如下:

CA1FEEEF-6CBB-4D5F-993B-ED2C51C9D198

下面针对JAVA版的算法具体分析

// ==============================Fields===========================================
    /** 开始时间截 (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /** 机器id所占的位数 */
    private final long workerIdBits = 5L;

    /** 数据标识id所占的位数 */
    private final long datacenterIdBits = 5L;

    /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 支持的最大数据标识id,结果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** 序列在id中占的位数 */
    private final long sequenceBits = 12L;

    /** 机器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 数据标识id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /** 时间截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作机器ID(0~31) */
    private long workerId;

    /** 数据中心ID(0~31) */
    private long datacenterId;

    /** 毫秒内序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的时间截 */
    private long lastTimestamp = -1L;

因为算法都是基于二进制的位移操作,所以上面定义了一大堆变量,基本都是一些需要位移的长度

如序列IDsequenceBits在定义了它的二进制长度,序列号最大为4095,它的二进制占用长度就是12

同样datacenterId和workId最大数为31,二进制占用的长度就是5,workerIdShift,datacenterIdShift,timestampLeftShift定义了他们需要的位移数

为啥都要基于二进制的位移来操作呢?因为这样对于机器来说计算更快

核心方法生成ID

    // ==============================Methods==========================================
    /**
     * 获得下一个ID (该方法是线程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            //毫秒内序列溢出
            if (sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

1)首先调用timeGen()获取当前时间戳

2)如果当前时间戳小于上次记录的时间戳,则抛出异常,表示始终回拨了(所以要保证每台机器上时间统一)

3)如果当前时间戳等于上一次时间戳,表示同一秒内有多个并发请求,此时序列号就发挥作用了,递增+1,这里有一个操作(sequence + 1) & sequenceMask,就是要与最大序列号4095做&操作,即如果它大于了最大的序列号,那么sequence就等于0了,此时调用tilNextMillis()方法做等待操作,直到生成的时间戳大于上一次时间戳,因为同一秒只支持4095个并发

4)如果当前时间戳大于上一次,则直接把sequence置0

5)将上一次时间戳更新为当前时间戳

6)最后一步也是关键,通过位移操作,把sequence(序列号),workId(工作ID),datacenterId(数据中心ID),timestamps(时间戳)拼到一起

说明这里还有一个twepoch,表示起始的时间点,这里的作用主要是控制生成ID的大小,如果你想从较小的ID开始递增,那么twepoch就可以设置的大一些,可以等于当前的时间戳,因为(timestamp – twepoch)的值就越小,反之则时间越往前ID越大

 构造方法

传入一个datacenterId和workId就可以了,说明下不同机器可以使用不同的datacenterId,一台机器上不同的项目可以使用不同的wokId

   /**
     * 构造函数
     * @param workerId 工作ID (0~31)
     * @param datacenterId 数据中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

最后说说snowflake的优缺点:

优点:

1)毫秒数在高位,自增序列在低位,整个ID都是趋势递增的。

2)不依赖数据库等第三方系统,以服务的方式部署,稳定性更高,生成ID的性能也是非常高的。

3)可以根据自身业务特性分配bit位,非常灵活。

使用场景如:生成订单ID,因为ID不是连续递增的,所以可以保证订单数的隐蔽性

缺点:

1)强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

2)分布式部署时,服务器最好开启Network Time Protocol (NTP)服务,保证每个机器时间一致

下次说说如何使用zookeeper调度生成ID及保证服务的高可用。

JAVA并发包使用-线程池

JAVA提供了4种可用的线程池方法:

1,newSingleThreadExecutor():单例线程,连接池中只会存在一个线程
2,newFixedThreadPool(int nThread):固定数量的线程池,当超过现在数量,新的线程必须等待有线程被移除
3,newCacheedThreadPool():缓存线程池,当某个线程在创建时,先查看线程池中是否有存在的线程,没有则创建一个;可以设置线程的最大执行时长,默认为60s
4,newScheduledThreadPool():计划任务线程池,可以对线程设置周期执行

好处:线程池可用让程序更加专注于执行任务本身,而不必为线程的启动和关闭耗费时间

比如,有这么一个需求:在微信公众号中输入“订单”,查询出当天不同渠道的所有订单,这其实是一个很耗时的查询,若单个任务去执行,微信服务器可能等不了多久就直接返回“连接超时”了,此时想到的就是使用连接池分发多个任务去执行查询,最后将统计结果汇总

1,newFixedThreadPool来执行固定任务:

下面新建了一个固定大小的线程池,用来执行StatTask线程

ExecutorService pool = Executors.newFixedThreadPool(6);
Future<String> task1 = pool.submit(new StatTask(ORDER_TASK1, date));
Future<String> task2 = pool.submit(new StatTask(ORDER_TASK2, date));
Future<String> task3 = pool.submit(new StatTask(ORDER_TASK3, date));
Future<String> task4 = pool.submit(new StatTask(ORDER_TASK4, date));
Future<String> task5 = pool.submit(new StatTask(ORDER_TASK5, date));
Future<String> task6 = pool.submit(new StatTask(ORDER_TASK6, date));
pool.shutdown();

try {
	resultMap.put(ORDER_TASK1, task2.get(3, TimeUnit.SECONDS));
} catch (Exception e) {
	logger.error("error", e);
	resultMap.put(ORDER_TASK1, "0");
}
....

这里调用了pool.submit方法,表示有返回值的,submit里面的参数必须实现callable接口,返回Future对象,表中任务执行结果对象;若调用pool.execute方法则没有返回值,execute里面的参数需要实现Runnable接口

使用shutdown()方法,不再接受新的任务,以前的任务可以继续执行

Future.task方法为堵塞执行,参数可设置最大的执行时长,到时间则自动终止

class StatTask implements Callable<String> {
	private String taskName;
	private String date;

	StatTask(String taskName, String date) {
		this.taskName = taskName;
		this.date = date;
	}

	@Override
	public String call() throws Exception {
		long starttime = System.currentTimeMillis();
		if (ORDER_TASK1.equals(taskName)) {
			//处理订单1
		} else if (ORDER_TASK2.equals(taskName)) {
			//处理订单2
		}
		...
		return "0";
	}

}

StatTask实现Callable接口,并重写了call方法,在里面处理响应的查询逻辑,Callable使用了泛型,在call方法中返回自定义的类型

以上就新建了6个线程用来处理统计,最后将结果放入resultMap中,每个线程的最大等待时长为3s。

2,newScheduledThreadPool执行定时任务

ScheduledExecutorService pool = Executors.newScheduledThreadPool(20);
DataImportTask task = null;
for (int i=0;i<20;i++) {
	int start = i*300;
	int end = start+300;
	task = new DataImportTask("dataImport"+i,start,end);
	pool.scheduleWithFixedDelay(task, 0, 30, TimeUnit.MINUTES);
	//pool.schedule(task, 30, TimeUnit.MINUTES)
}

上面新建了20个定时任务用来导入数据

pool.scheduleWithFixedDelay()方法,第一个参数为任务类,需要继承Runnable接口,第二个参数为第一次执行的延时(纳秒/微妙/毫秒/秒/分/时),第二个参数对于第一次任务执行完成后在推迟多少时间执行,最后一个参数为时间的单位(纳秒/微妙/毫秒/秒/分/时)

pool.schedule表示只周期执行一次,第二个参数就表示第二次推迟的时间