Category Archives: Java

kafka和zookeeper集群安装

kafka和zookeeper集群安装

上期讲了kafka的作用及应用场景,今天我们来自己搭建一套kafka集群,由于kafka目前的安装包已经自带了zookeeper,所以在搭建zookeeper集群直接使用它内置的即可

准备工作:3台服务器,当然你可以使用自己的虚拟机尝试,如果安装虚拟机,可以查看我的这篇博客
我的三台服务器的IP分别是:192.168.101.116、192.168.101.115、192.168.102.215

安装步骤

下载安装包:

地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

配置zookeeper

配置kafka的路径、心跳检测时间 初始化连接时follower和leader最长连接时间 follower和leader数据最长同步时间

 dataDir=/data/kafka/zookeeper
 tickTime=2000
 initLimit=5
 syncLimit=2
 server.0=192.168.101.116:2888:3888
 server.1=192.168.101.115:2888:3888
 server.2=192.168.102.215:2888:3888

在每个服务器上注册zookeeper

/data/kafka/zookeeper目录下面touch myid文件
192.168.101.116上执行
echo "0" > /data/kafka/zookeeper/myid
192.168.101.115上执行
echo "1" > /data/kafka/zookeeper/myid
192.168.102.215上执行
echo "2" > /data/kafka/zookeeper/myid

配置kafka

 192.168.101.116上配置
 advertised.listeners=PLAINTEXT://zc1:9092
 broker.id=0

 192.168.101.115上配置
 advertised.listeners=PLAINTEXT://zc2:9092
 broker.id=1

 192.168.102.215上配置
 advertised.listeners=PLAINTEXT://zc3:9092
 broker.id=2

 通用配置
 log.dirs=/data/kafka/kafka-logs
 zookeeper.connect=192.168.101.116:2181,192.168.101.115:2181,192.168.102.215:2181

配置hosts

在每个服务器/etc/hosts中添加如下配置

192.168.101.116 zc1
192.168.101.115 zc2
192.168.102.215 zc3

开放防火墙端口

由于zookeeper和kafka监听了2181、9092、2888、3888,所有我们还要把这些端口添加到防火墙中,编辑/etc/sysconfig/iptables,添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT

启动zookeeper和kafka

先启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

再启动kafka

bin/kafka-server-start.sh config/server.properties &

测试数据写入和读取

使用命令测试

生产者
[root@zc1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh –broker-list 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

>hello
>word

消费者
[root@zc2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh –bootstrap-server 192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092 –topic firsttopic

hello
word

代码测试

生产者

public class ProducerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Producer<String, String> producer;
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    public ProducerTest() {
        /** 生产者*/
        InputStream in_proc = this.getClass().getClassLoader().getResourceAsStream("kafka_test_producer.properties");
        Properties prop_proc = new Properties();
        try {
            prop_proc.load(in_proc);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_producer配置文件失败", e);
        }
        producer = new KafkaProducer<String, String>(prop_proc);
    }

    public void execute() {
        while(true) {
            try {
                String key = "CA1234";
                String message = System.currentTimeMillis()+" CA1234,PEK,SHA,2018-02-01";
                this.sendToTestServer(key, message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    private void sendToTestServer(String key, String message) {
        logger.info("发送消息:"+message);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key, message);
        producer.send(producerRecord);
    }

    public static void main(String[] args) {
        new ProducerTest().execute();
    }

}

kafka_test_producer.properties

kafka.topic=firsttopic
group.id=chuanzh
bootstrap.servers=192.168.101.115:9092,192.168.101.116:9092,192.168.102.215:9092
retries=5
request.timeout.ms=10000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

public class ConsumerTest {

    private final static String TOPIC_NAME = "firsttopic";
    private static Consumer<String, String> consumer;
    private final static Logger logger = LoggerFactory.getLogger(DynamicPushTestTask.class);

    public ConsumerTest() {
        /** 消费者*/
        InputStream ksin = this.getClass().getClassLoader().getResourceAsStream("kafka_test_consumer.properties");
        Properties props = new Properties();
        try {
            props.load(ksin);
        } catch (IOException e) {
            logger.error("加载kafkacore_test_consumer配置文件失败", e);
        }
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    }

    public void execute() {
        while(true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(2);
                logger.info("读取kafka,取到消息数量:" + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("value:{}", record.value());
                    logger.info("offset:{}", record.offset());
                }

                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("任务执行异常",e);
            }

        }
    }

    public static void main(String[] args) {
        new ConsumerTest().execute();
    }

}

kafka_test_consumer.properties

kafka.topic=flightcore
group.id=flightPacketPush
bootstrap.servers=43.241.208.208:9092,43.241.228.39:9092,43.241.234.89:9092
metadata.max.age.ms=60000
max.poll.records=50
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。

Hystrix服务隔离及熔断

上次说了使用RxJava并行提升API效率,但是如何避免项目中一个服务拖慢整体的性能?hystrix优雅的解决了这个问题。

先贴个图,网上找的,我觉得这个图就可以说明问题了

hystrix-1

当依赖服务A挂了,直接就会影响到整个服务的性能,如果连接一直被A暂用,最终将导致服务发生雪崩

所以我们要保证核心服务的稳定,其他非核心的服务可以不那么重要,即使出现了错误数据可以丢弃或不用保证实时准确,再以SOA项目中的getDynamicInfo为例,如下:

87194035-3D85-4860-AB44-35EA71697A44

getFlight为核心的业务,必须要获取到数据,获取不到那就整个服务不可用了

getEntryCard、getPlane、getRefund、getWeather、getPortaitUser等方法为非核心业务,我们允许其数据丢失的情况,要保证整个服务的可用

首先我们需要把这些服务使用hystrix分离出来,先对这几个服务做先分组:

pydyn:getEntryCard、getPlane、getRefund、getWeather

portrait:getPortaitUser

Maven配置

	<dependency>
	  <groupId>com.netflix.hystrix</groupId>
	  <artifactId>hystrix-core</artifactId>
	  <version>1.5.8</version>
	</dependency>

以getEntryCard服务为例,继承HystrixCommand<T>,重写run方法

public class GetEntryCardCommand extends HystrixCommand<String>{

	private String arrCode;
        private GetEntryCardService service;

	public GetEntryCardCommand(String arrCode) {
		super(setter());
		this.arrCode = arrCode;
	}

	private static Setter setter() {
		return PySetter.setter().andCommandKey(HystrixCommandKey.Factory.asKey("getEntryCard"));
	}

	@Override
	protected String run() throws Exception {
		// TODO Auto-generated method stub
		return service.getEntryCardCity(arrCode);
	}

	@Override
	protected String getFallback() {
		return service.getFromCache(arrCode);
	}

}

run方法要执行的业务逻辑,如果出现异常则会调用getFallback方法。

public class PySetter {

	public static Setter setter() {
		// 服务分组
		HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("pydyn");
		// 服务标识
		//HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("getEntryCard");
		// 线程池名称
		HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("pydyn-pool");
		// 线程配置
		HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
				.withCoreSize(10)
				.withKeepAliveTimeMinutes(5)
				.withMaxQueueSize(Integer.MAX_VALUE)
				.withQueueSizeRejectionThreshold(10000);

		//命令属性配置
		HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
				.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                                .withExecutionIsolationThreadInterruptOnTimeout(true).withExecutionTimeoutInMilliseconds(3000) //设置超时3秒自动熔断
				.withCircuitBreakerErrorThresholdPercentage(30);//失败率达到30%自动熔断

		return HystrixCommand.Setter
				.withGroupKey(groupKey)
				//.andCommandKey(commandKey)
				.andThreadPoolKey(threadPoolKey)
				.andThreadPoolPropertiesDefaults(threadPoolProperties)
				.andCommandPropertiesDefaults(commandProperties);
	}

}

以上是对服务的线程池熔断配置

HystrixCommandGroupKey:服务分组,以上pydyn分组就包括4个服务,必填选项

HystrixCommandKey:服务的名称,唯一标识,如果不配置,则默认是类名

HystrixThreadPoolKey:线程池的名称,相同线程池名称的线程池是同一个,如果不配置,默认为分组名

HystrixThreadPoolProperties:线程池的配置,coreSize配置核心线程池的大小,maxQueueSize线程池队列的最大大小,queueSizeRejectionThreshold,限制当前队列的大小,实际队列大小由这个参数决定,即到达队列里面条数到达10000,则都会被拒绝。

HystrixCommandProperties:配置命令的一些参数,如executionIsolationStrategy,配置执行隔离策略,默认是使用线程隔离,THREAD即为线程池隔离,ExecutionIsolationThreadInterruptOnTimeout和ExecutionTimeoutInMilliseconds配置了启用超时和最大执行时间,这里为3s,circuitBreakerErrorThresholdPercentage失败率配置,默认为50%,这里配置的为30%,即失败率到达30%触发熔断

接下来就是调用GetEntryCardCommand获取数据

String result = new GetEntryCardCommand(request.getArrCode()).execute();

我使用的是阻塞的方式获取,当然也可以使用异步的方法,得到Future对象,使用

Future<String> = new GetEntryCardCommand(request.getArrCode()).queue();

其他getPlane、getRefund、getWeather方法同GetEntryCardCommand类,

getPortaitUser也一样,只需要更改下自己的分组为portait,线程池名称,并配置自己的线程池参数

好了,这样就实现了多个服务之间的隔离和熔断,不同的服务分组使用不同的线程池,即使getDynamicInfo并发量突然剧增,也不会对getPlane、getRefund、getWeather等产生影响

下面来说说对服务的监控,hystrix已经帮我们实现了数据的记录,只需要安装他们的管理后台就可以查看数据,他们家有2个监控后台:hystrix-dashboard和Turbine,第一个是监控单个项目的日志,第二个可以把多个项目的日志聚合在一起

hystrix-dashboard监控:

maven配置:

	 <dependency>  
	     <groupId>com.netflix.hystrix</groupId>  
	     <artifactId>hystrix-metrics-event-stream</artifactId>  
	     <version>1.1.2</version>  
 	</dependency>

web.xml配置

    <servlet>  
	    <display-name>HystrixMetricsStreamServlet</display-name>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>  
	</servlet>  
	<servlet-mapping>  
	    <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
	    <url-pattern>/hystrix.stream</url-pattern>  
	</servlet-mapping>

访问http://ip:port/projectName/hystrix.stream查看是否有监控数据

hystrix-dashboard:先安装hystrix-dashboard.war,下载后放入web容器(Tomcat或Jetty)中,启动容器,访问http://ip:port/hystrix-dashboard,如下的界面:

371D6055-27B8-41ED-ADF3-544E92ECC934

最后点击Monitor Stream就可以看到数据了

EFD10FC9-9BAB-43DE-B5EB-09082222744B

可以看到每个服务及他们的线程池使用情况

CirCuit主要监控成功的请求数,请求超时数,失败率,平均耗时、90%,99%,99.5%的耗时(将鼠标移动到对于数字位置可以看到描述)

Thread Pools主要监控线程池的配置数,线程队列的配置数,最大活跃线程

 

Turbine监控(将多个实例的数据聚合起来)

1)下载turbine-web-1.0.0.war,并将war放入web容器中。

2)在容器下路径为turbine-web-1.0.0/WEB-INF/classes下新建config.properties文件。

InstanceDiscovery.impl=com.netflix.turbine.discovery.ConfigPropertyBasedDiscovery
#cluster
turbine.aggregator.clusterConfig=dynamicAPI
turbine.instanceUrlSuffix=:8080/projectName/hystrix.stream
turbine.ConfigPropertyBasedDiscovery.dynamicAPI.instances=127.0.0.1,127.0.0.2

这里我配置了2个实例127.0.0.1和127.0.0.2,他会同时调用http://127.0.0.1:8080/projectName/hystrix.stream和http://127.0.0.2:8080/projectName/hystrix.stream,然后根据分组名,服务名,线程池名将两个实例数据合并

3)调用http://ip:port/turbine-web-1.0.0/turbine.stream?cluster=dynamicAPI,即可查看到聚合的数据

4)打开hystrix-dashboard配置turbine

766892D0-ADE9-4BC4-A991-7844965A1557

点击Monitor Stream即可查看到两个实例监控的数据

5EC0BFFE-3C41-4F14-980D-39614A231A16

 

参考资料:

http://tech.lede.com/2017/06/15/rd/server/hystrix/

http://www.cnblogs.com/java-zhao/archive/2016/09/01/5831002.html

 

RxJava在SOA中的运用

在做SOA服务化时,有时候一个服务依赖于其他很多服务,如下图:

8D5F1A87-787D-460D-BA2C-292DB4E6BCB0

最常规的做法是串行调用接口,最后将结果合并,如果为了提高效率,我们想并行调用每个接口,最后将结果合并,如何做呢?

首先我们想到的是使用多线程去执行,JUC中CountDownLatch可以实现这个效果,最先初始化n个任务传给countDownLatch,然后利用线程池去执行每个任务,执行完后使用countDown()方法将任务递减,CountDownLatch.awai()等待指导所有的任务执行完成。RxJava提供了比较优雅的方法,我们来看看它是怎么实现的。

rxjava的实现思路也是一样,创建多个异步处理任务,最后将结果合并,拿调用getPlane接口来说:

private Observable<PlaneBean> getPlane()
			throws Exception {
		return Observable.create(new Observable.OnSubscribe<PlaneBean>() {
			@Override
			public void call(Subscriber<? super PlaneBean> subscriber) {
				PlaneBean plane = new PlaneBean();
				try {
					/* 调用服务业务处理*/
				} catch (Exception e) {
					logger.error(FuncStatic.errorTrace(e));
				}
				subscriber.onNext(plane);
				subscriber.onCompleted();
				logger.info(requestId + " get plane info end");
			}
		}).subscribeOn(Schedulers.from(workPool));
	}

使用Observable.create创建一个异步任务,在call方法中写需要需要处理的业务逻辑,执行完成后将数据plane传入到subscriber对象中,并调用onCompleted()方法表示结束执行,核心为subscribeOn方法,这个任务会交给workPool来调度,所以最初我们还要创建一个线程池

private static ExecutorService workPool = Executors.newFixedThreadPool(50);

其他API方法调用同上,再来说下合并,RxJava提供了merge和zip方法来合并任务,merge方法要求每个任务返回的结果都相同,zip则不限制,根据需求这里我们使用zip方法

Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial()
		new Func5<DynamicBean, ShareBean, PreBean, PlaneBean, FiducialBean, GetDetailResponse>() {
			@Override
			public GetDetailResponse call(DynamicBean t1, ShareBean t2, PreBean t3,
					PlaneBean t4, FiducialBean t5) {
				if (t1 != null)
					response.setDynamic(t1);
				if (t2 != null)
					response.setShare(t2);
				if (t3 != null)
					response.setPre(t3);
				if (t4 != null)
					response.setPlane(t4);
				if (5 != null)
					response.setFiducial(t4);
				return response;
			}

		}).subscribeOn(Schedulers.from(workPool)).toBlocking().subscribe();

因为这里我调用的5个API,所有使用方法Func5,如果是3个则使用Func3,同样交给workPool线程池来处理合并的结果,注意这里要使用toBlocking来阻塞阻塞合并操作,等待所有任务都执行完成后再进行合并,最后将结果赋予GetDetailResponse对象。

以上就完成了并行调度的执行,在API的依赖逐渐增多,这样可以大大提高执行效率,但也有一个问题,如果某个API执行时间很长,将对拖慢整个接口的执行时间,导致接口发送雪崩,下次讲讲如果避免这种情况。

JAVA内存分析

JAVA通过自带的垃圾回收机制来管理内存,但对于项目的内存分析也要了解,比如项目的内存使用情况,何时回收?或出现内存泄露如何排查?

首先看看JVM内存分配图:

671D4492-75BB-4046-8B2E-199ABDE70734

如上图,jvm包含了堆、本地方法栈,

堆上存放新建(New)的对象,它被划为为新生代,老年代两个区域,新生代又分为伊甸园、幸存者两个区域,不同的区域存放的对象生命周期不同

说说JAVA中几种内存溢出的情况:

1,JVM堆溢出(java.lang.OutOfMemoryError: java heap space)

当生成一个新对象,JVM内存申请如下流程:

1),jvm先尝试在eden分配新对象所需的内存,若内存足够,则将对象放入eden返回

2),若内存不够,jam启动youngGC,试图将eden不活跃的对象释放掉,若释放后仍不足以分配内存,则将Eden活跃的对象放入survivor中。

3),survivor作为Eden和old的中间交换区域,若old空间足够,survivor去对象会被移动到old区,否则留在survivor区

4),当old区不够时,jvm会在old区进行fullGC,若fullGC后,survivor和old仍然无法存放从Eden复制过来的对象,则会出现“outOfMemoryError: java heap space”

解决方法:加大堆内存的大小,通过设置-Xms(java heap初始化大小,默认是物理内存1/64) -Xmx(java heap的最大值) -Xmn(新生代heap的大小,一般为Xmx3或4分之一,注:增加新生代后会减少老年代的大小)

2,方法区内存溢出(java.lang.OutMemoryError: permGen space)

方法区主要是用来存放类信息,常量、静态变量等,所以程序中类加载过多(引入第三方包),或者过多使用反射、cglib这种动态代理,就可能导致该区域内存溢出

解决方法:通过设置-XX:PermSize(内存永久区初始值)和-XX:MaxPerSize(内存永久区最大值)的大小

3,线程栈溢出(java.lang.StackOverflowError)

线程栈是线程独有的一块内存区域,所以线程栈溢出必定是线程运行是出现错误,一般是递归太深,或者方法层级调用太深引起的

解决方法:设置栈区的大小,通常栈的大小是1-2M,可通过-Xss设置线程的栈的大小,jdK5以后每个栈默认大小为1M。

下面针对线上的某个项目,查看它是否出现内存溢出的情况

1,如何查看项目的内存使用情况?

jmap -heap pid

Attaching to process ID 64909, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.91-b14

using thread-local object allocation.
Parallel GC with 4 thread(s)

Heap Configuration:
   MinHeapFreeRatio         = 0
   MaxHeapFreeRatio         = 100
   MaxHeapSize              = 2147483648 (2048.0MB)
   NewSize                  = 44564480 (42.5MB)
   MaxNewSize               = 715653120 (682.5MB)
   OldSize                  = 89653248 (85.5MB)
   NewRatio                 = 2
   SurvivorRatio            = 8
   MetaspaceSize            = 21807104 (20.796875MB)
   CompressedClassSpaceSize = 1073741824 (1024.0MB)
   MaxMetaspaceSize         = 17592186044415 MB
   G1HeapRegionSize         = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
   capacity = 31457280 (30.0MB)
   used     = 12660000 (12.073516845703125MB)
   free     = 18797280 (17.926483154296875MB)
   40.24505615234375% used
From Space:
   capacity = 1572864 (1.5MB)
   used     = 1179648 (1.125MB)
   free     = 393216 (0.375MB)
   75.0% used
To Space:
   capacity = 9437184 (9.0MB)
   used     = 0 (0.0MB)
   free     = 9437184 (9.0MB)
   0.0% used
PS Old Generation
   capacity = 40894464 (39.0MB)
   used     = 4514768 (4.3056182861328125MB)
   free     = 36379696 (34.69438171386719MB)
   11.040046887520033% used

查看Java内存的分配情况及新生代、老年代内存的使用情况,确定是否内存分配过小?

2,判断是否出现内存泄露

jstat -gcutil pid 2000

D09AEF2B-2D3E-4D50-B74C-E603875EAD70

上面命令表示每隔2秒打印出GC的使用回收情况,若FGC很多很可能出现了内存泄露

3,查看占用内存最多的对象

jmap -histo:live pid | more

0B137198-E38B-4466-A554-E68FEC36EC61

按使用大小进行了排序,重点查看排在前面的对象,看是否程序写的有问题。

另外可以将jvm的堆内存导出来分析,使用

jmap -dump:format=b,file=dynamicapi.hprof pid

使用java vistual工具分析,jdk自带了jvisualvm就可以进行内存分析,执行命令:

进入jkd的bin目录,执行jvisualvm

点击文件->装入,选择堆内存文件,即可看到堆中类的使用情况

3E5ECD10-58E2-450B-A8C4-31B276689EC6

 

Twitter分布式ID算法Snowflake

Snowflake是Twitter开发的一个分布式ID生成算法,有以下几个特点:

1)默认情况下41bit的时间戳可以支持该算法使用到2082年,10bit的工作机器id可以支持1023台机器,序列号支持1毫秒产生4095个自增序列id,即理论上1秒产生409万id

2)高性能,不依赖其他第三方服务,稳定性高

3)强依赖于机器时钟

下面看看它的算法结构图:

图片 1

可以看到它是由三部分组成

1)当前时间戳

2)工作机器ID:包括dataCenterId和workId,可自己配置

3)12bit序列号,即从0增长到4095

算法其实很简单,因为不依赖于其它服务器,都是做时间比较和位移操作,流程图如下:

CA1FEEEF-6CBB-4D5F-993B-ED2C51C9D198

下面针对JAVA版的算法具体分析

// ==============================Fields===========================================
    /** 开始时间截 (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /** 机器id所占的位数 */
    private final long workerIdBits = 5L;

    /** 数据标识id所占的位数 */
    private final long datacenterIdBits = 5L;

    /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 支持的最大数据标识id,结果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** 序列在id中占的位数 */
    private final long sequenceBits = 12L;

    /** 机器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 数据标识id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /** 时间截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作机器ID(0~31) */
    private long workerId;

    /** 数据中心ID(0~31) */
    private long datacenterId;

    /** 毫秒内序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的时间截 */
    private long lastTimestamp = -1L;

因为算法都是基于二进制的位移操作,所以上面定义了一大堆变量,基本都是一些需要位移的长度

如序列IDsequenceBits在定义了它的二进制长度,序列号最大为4095,它的二进制占用长度就是12

同样datacenterId和workId最大数为31,二进制占用的长度就是5,workerIdShift,datacenterIdShift,timestampLeftShift定义了他们需要的位移数

为啥都要基于二进制的位移来操作呢?因为这样对于机器来说计算更快

核心方法生成ID

    // ==============================Methods==========================================
    /**
     * 获得下一个ID (该方法是线程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            //毫秒内序列溢出
            if (sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

1)首先调用timeGen()获取当前时间戳

2)如果当前时间戳小于上次记录的时间戳,则抛出异常,表示始终回拨了(所以要保证每台机器上时间统一)

3)如果当前时间戳等于上一次时间戳,表示同一秒内有多个并发请求,此时序列号就发挥作用了,递增+1,这里有一个操作(sequence + 1) & sequenceMask,就是要与最大序列号4095做&操作,即如果它大于了最大的序列号,那么sequence就等于0了,此时调用tilNextMillis()方法做等待操作,直到生成的时间戳大于上一次时间戳,因为同一秒只支持4095个并发

4)如果当前时间戳大于上一次,则直接把sequence置0

5)将上一次时间戳更新为当前时间戳

6)最后一步也是关键,通过位移操作,把sequence(序列号),workId(工作ID),datacenterId(数据中心ID),timestamps(时间戳)拼到一起

说明这里还有一个twepoch,表示起始的时间点,这里的作用主要是控制生成ID的大小,如果你想从较小的ID开始递增,那么twepoch就可以设置的大一些,可以等于当前的时间戳,因为(timestamp – twepoch)的值就越小,反之则时间越往前ID越大

 构造方法

传入一个datacenterId和workId就可以了,说明下不同机器可以使用不同的datacenterId,一台机器上不同的项目可以使用不同的wokId

   /**
     * 构造函数
     * @param workerId 工作ID (0~31)
     * @param datacenterId 数据中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

最后说说snowflake的优缺点:

优点:

1)毫秒数在高位,自增序列在低位,整个ID都是趋势递增的。

2)不依赖数据库等第三方系统,以服务的方式部署,稳定性更高,生成ID的性能也是非常高的。

3)可以根据自身业务特性分配bit位,非常灵活。

使用场景如:生成订单ID,因为ID不是连续递增的,所以可以保证订单数的隐蔽性

缺点:

1)强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

2)分布式部署时,服务器最好开启Network Time Protocol (NTP)服务,保证每个机器时间一致

下次说说如何使用zookeeper调度生成ID及保证服务的高可用。

JAVA并发包使用-线程池

JAVA提供了4种可用的线程池方法:

1,newSingleThreadExecutor():单例线程,连接池中只会存在一个线程
2,newFixedThreadPool(int nThread):固定数量的线程池,当超过现在数量,新的线程必须等待有线程被移除
3,newCacheedThreadPool():缓存线程池,当某个线程在创建时,先查看线程池中是否有存在的线程,没有则创建一个;可以设置线程的最大执行时长,默认为60s
4,newScheduledThreadPool():计划任务线程池,可以对线程设置周期执行

好处:线程池可用让程序更加专注于执行任务本身,而不必为线程的启动和关闭耗费时间

比如,有这么一个需求:在微信公众号中输入“订单”,查询出当天不同渠道的所有订单,这其实是一个很耗时的查询,若单个任务去执行,微信服务器可能等不了多久就直接返回“连接超时”了,此时想到的就是使用连接池分发多个任务去执行查询,最后将统计结果汇总

1,newFixedThreadPool来执行固定任务:

下面新建了一个固定大小的线程池,用来执行StatTask线程

ExecutorService pool = Executors.newFixedThreadPool(6);
Future<String> task1 = pool.submit(new StatTask(ORDER_TASK1, date));
Future<String> task2 = pool.submit(new StatTask(ORDER_TASK2, date));
Future<String> task3 = pool.submit(new StatTask(ORDER_TASK3, date));
Future<String> task4 = pool.submit(new StatTask(ORDER_TASK4, date));
Future<String> task5 = pool.submit(new StatTask(ORDER_TASK5, date));
Future<String> task6 = pool.submit(new StatTask(ORDER_TASK6, date));
pool.shutdown();

try {
	resultMap.put(ORDER_TASK1, task2.get(3, TimeUnit.SECONDS));
} catch (Exception e) {
	logger.error("error", e);
	resultMap.put(ORDER_TASK1, "0");
}
....

这里调用了pool.submit方法,表示有返回值的,submit里面的参数必须实现callable接口,返回Future对象,表中任务执行结果对象;若调用pool.execute方法则没有返回值,execute里面的参数需要实现Runnable接口

使用shutdown()方法,不再接受新的任务,以前的任务可以继续执行

Future.task方法为堵塞执行,参数可设置最大的执行时长,到时间则自动终止

class StatTask implements Callable<String> {
	private String taskName;
	private String date;

	StatTask(String taskName, String date) {
		this.taskName = taskName;
		this.date = date;
	}

	@Override
	public String call() throws Exception {
		long starttime = System.currentTimeMillis();
		if (ORDER_TASK1.equals(taskName)) {
			//处理订单1
		} else if (ORDER_TASK2.equals(taskName)) {
			//处理订单2
		}
		...
		return "0";
	}

}

StatTask实现Callable接口,并重写了call方法,在里面处理响应的查询逻辑,Callable使用了泛型,在call方法中返回自定义的类型

以上就新建了6个线程用来处理统计,最后将结果放入resultMap中,每个线程的最大等待时长为3s。

2,newScheduledThreadPool执行定时任务

ScheduledExecutorService pool = Executors.newScheduledThreadPool(20);
DataImportTask task = null;
for (int i=0;i<20;i++) {
	int start = i*300;
	int end = start+300;
	task = new DataImportTask("dataImport"+i,start,end);
	pool.scheduleWithFixedDelay(task, 0, 30, TimeUnit.MINUTES);
	//pool.schedule(task, 30, TimeUnit.MINUTES)
}

上面新建了20个定时任务用来导入数据

pool.scheduleWithFixedDelay()方法,第一个参数为任务类,需要继承Runnable接口,第二个参数为第一次执行的延时(纳秒/微妙/毫秒/秒/分/时),第二个参数对于第一次任务执行完成后在推迟多少时间执行,最后一个参数为时间的单位(纳秒/微妙/毫秒/秒/分/时)

pool.schedule表示只周期执行一次,第二个参数就表示第二次推迟的时间

使用高德地图画飞行轨迹

高德地图提供了很多API来满足我们在其地图上画不同的图形,比如描绘出一段公交的路线,或确定一个建筑的位置等。

下面我以工作中的例子,描绘在高德地图上如何画出一条飞行轨迹。

高德提供了JS的API,需要先引入,后面的key需要你去高德的开发平台去申请

<script type="text/javascript" src="https://webapi.amap.com/maps?v=1.3&key=你的key"></script>

在html中定义一个div,来展示地图

<div id="container" style="text-align:center; margin: 0 auto; margin-top:25px;"></div>

下面就可以开始画图了,首先我要调用接口获取所有的飞行点坐标,一个轨迹就是通过一个个点来显示的

    var data = postRequest();
    //获取地图的中心点坐标
    var centerlat = (data.dep.lat + data.arr.lat)/2
    var centerlon = (data.dep.lon + data.arr.lon)/2
    var centerArr = [centerlat, centerlon];
    var map = new AMap.Map('container', {
        resizeEnable: true,
        center: centerArr,
        zoom: 5
    });

定义一个地图对象map,其中心点通过起/始点经纬度来计算,zoom为地图显示的缩放级别,resizeEnable:true,表示可以调整地图大小

好了就可以在地图上画线段了,获取点集合,格式为:[[40.0618,116.5991],[40.0552,116.6002],[40.0485,116.6013]],下面也设置了线条的一些样式。

strokeDasharray为勾勒形状轮廓的虚线和间隙的样式,此属性在strokeStyle 为dashed 时有效, 此属性在ie9+浏览器有效,用法:

[10,5] 表示10个像素的实线和5个像素的空白(如此反复)组成的虚线

    var lineArr1 = data.pointList1;
    var polyline = new AMap.Polyline({
        path: lineArr1,          //设置线覆盖物路径
        strokeColor: "#3366FF", //线颜色
        strokeOpacity: 1,       //线透明度
        strokeWeight: 2,        //线宽
        strokeStyle: "solid",   //线样式
    });
    polyline.setMap(map);

    var lineArr2 = data.pointList2;
    var planPolyline = new AMap.Polyline({
        path: lineArr2,          //设置线覆盖物路径
        strokeColor: "#3366FF", //线颜色
        strokeOpacity: 0.7,       //线透明度
        strokeWeight: 2,        //线宽
        strokeStyle: "dashed",   //线样式
        strokeDasharray: [10, 5] //补充线样式
    });
    planPolyline.setMap(map);

另外还需要画一个飞行物,更好的展示视觉效果,飞行物即一个点坐标,但这个坐标我用一张图片来表示

    //添加飞行物图标
    var icon = new AMap.Icon({
	    size: new AMap.Size(54, 52), //图标大小
            image: "images/MapPlane.png"
        });
	var marker = new AMap.Marker({
	    map: map,
	    angle: nowPlanePos.course,
	    position: [nowPlanePos.lng,nowPlanePos.lat],
	    offset: new AMap.Pixel(-27, -52),
	    icon: icon       
	});

使用marker方法来描绘一个点,angle表示点标记的旋转角度,即飞行物的朝向,offset函数一定要加上,因为你如果缩放地图后,那么这个飞行物可能在地图上呈现的位置会相差很大,

可以看看没有设置offset和设置了offset的差别,前两幅是设置了offset,后两幅是没有

6A3B1EF7-D539-4EE8-87D6-175B5767883C

C1EC8C8B-02B0-4534-8111-E4C2052C5306

可以看到,地缩放比例飞行物的位置相差很多,但实质,坐标的位置都是一样的,飞行物并没有根据地图来改变自己的偏移量

offset的用法:点标记显示位置偏移量,默认值为Pixel(-10,-34)。Marker指定position后,默认以marker左上角位置为基准点,对准所给定的position位置,若需使marker指定位置对准在position处,需根据marker的尺寸设置一定的偏移量。

以上我的图片宽度是52,只需要设置横线偏移量为宽度一半,向左则为-27。

注意:我的飞行物是不断的移动的,所以我需要实时的请求接口获取最新的点坐标,所以每次需要重新再map上画图,但在画图前,先要清除原来的图标对象,所以delete方法删除即可

比如在画飞行物时,使用:

    if(marker != null) {
		marker.setMap(null);
		delete marker;
	}

另外map对象是不需要重画的,map初始化一次即可。

参考:

官网文档地址:覆盖物-参考手册-JavaScript API | 高德地图API

图形示例:折线、多边形、圆-折线、多边形和圆-示例中心-JavaScript API | 高德地图API

HashMap原理解析

HashMap,在程序中我们经常要用到的集合,它的实现是通过数组+单向链表来实现的

先从HashMap的put方法讲起,基本思路是:

1,会通过数据的key做hash算法,得到数组bucket的下标值,

2,然后再把<key,value>以链表的形式(只有一个节点)插入到bucket[i]中,如果两个key算出来的下标值i一样,那么新的元素就会添加到链表之后

查看源代码:

    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }

    static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }

当key为null时,默认放入到数组的第0个位置,不为null,获取key的hashcode,进行右移16位并与hashcode做异或运算,得到下标值

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        //tab数组为空,则创建一个
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        //根据容量大小和hash值计算(&)下标值
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            //若节点存在,则替换
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            //tab存放为树,jdk8默认为8个节点
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            //tab存放为链表,添加元素
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        //如果map数量大于负载因子*最大容量,则扩容
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

获取元素,基本思路:如果key为null,直接命中value,如果不为空,通过key获取数组下标值,如果第一个节点的key值相同,则直接返回,若为树,则遍历树结构,若为链表,则遍历链表对比key

    public V get(Object key) {
        Node<K,V> e;
        return (e = getNode(hash(key), key)) == null ? null : e.value;
    }

    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        //先判断首节点
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (first = tab[(n - 1) & hash]) != null) {
            if (first.hash == hash && // always check first node
                ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            //多个节点
            if ((e = first.next) != null) {
                //为树
                if (first instanceof TreeNode)
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                //为链表
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null);
            }
        }
        return null;
    }

数据扩容,resize(),基本思路是:将新的容量扩充到原来的2倍,并将旧的数组copy到新的数组中,注意原来链表或树的节点,位置可能会发生变化,要么是原来的位置,要么是原来位置*2,下面一幅图可以很好的描述resize元素的变化情况

d7acbad8-d941-11e4-9493-2c5e69d084c0

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            //超过限制的最大值,则不再扩容
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            //计算新的容量,是原来的2倍
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            //将旧的bucket复制到新的bucket中
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            //若节点为双数,则index位置不变
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            //若节点为单数,则index位置为原位置+oldCap
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

总结:

对于HashMap的原理可以理解如下:

目前有n个篮子,我要把贴有标签(标签代表key)的苹果放入篮子中,首先确定放入哪一个篮子中,所以通过苹果的标签来分类(hash算法),某一类的都放入在一个篮子中

为何要这样做,想想,如果将所有苹果都放入一个篮子中,那么我要取出某个标签的苹果,我就要在篮子中一个一个的找,如果苹果很多,这样效率是很低的,所以我是先通过苹果的标签定位到某个分类的篮子,再去里面找,这样的效率就提高很多了

参考文章:

1,http://yikun.github.io/2015/04/01/Java-HashMap%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86%E5%8F%8A%E5%AE%9E%E7%8E%B0/

2,http://blog.csdn.net/vking_wang/article/details/14166593

将项目发布到Maven仓库

我的开源包eweb已经发布到maven中心库了,使用maven配置就可以引入:

    <dependency>
        <groupId>com.github.chuanzh</groupId>
        <artifactId>eweb</artifactId>
        <version>1.0.1</version>
    </dependency>

下面讲讲如何发布到中心库,查了网上的步骤,都说的比较复制,但其实也很简单,这里分下面几个步骤:

一,Sonatype账号注册&创建一个Issue

二,使用GPG生成秘钥

三,修改项目的maven配置文件

四,上传构件到OSS&发布

看懂了之后其实很好理解,其实就类似于申请接入腾讯,微博的APP应用,

第一步,你得成为一个开发者吧,也就是要注册一个开发者账号,接下来就是要创建一个应用,等待腾讯、微博服务商审核

第二步,审核通过后,腾讯、微博会给你一对秘钥,但maven是要自己生成的,也就是用GPG

第三步,在你的项目中配置密钥和其他配置。

第四部,就是上传你的项目到腾讯、微博的服务平台,然后再等待审核,审核通过,那你就可以在他们的平台使用APP了

下面就从这4个方面说明下。

一,Sonatype账号注册&创建一个Issue

注册地址: Sign up for JIRA

记住你的用户名和密码,之后要用到

创建一个Issue:https://issues.sonatype.org/secure/CreateIssue.jspa?issuetype=21&pid=10134

v2-404b13199782405587859b6e73c02854_b

填写项目的基本信息:简单描述,详细描述,项目地址

重点说明下:Group Id为你项目的groupid,一般为公司或个人域名,如果你没有,就不要随便写了(他们在审核时会要求你提供一些证明的),写github的项目地址就可以了

好了,现在你就可以静静的等待maven审核了,一般为1~2天

二,使用GPG生成秘钥

首先需要下载GPG软件,Mac下载地址:GPG Suite,Windows用户下载地址:Secure email and file encryption with GnuPG for Windows,我的是Mac,所以具体说说Mac下的生成方式,Window方式请参考这篇博客

Mac下生成很简单,如下几个步骤:

1,新建一个密钥

v2-4fe9a772f939f0a8ef4fd2bfd103f97d_b

2,点击右键,将公钥发送至公钥服务器

v2-30789dafea5f96661c8e22b9c58ac165_b

到此密钥就生成了,记住你的口令,后面要用到

三,修改项目的maven配置文件

1,修改maven的setting配置

<settings>

    ...

    <servers>
        <server>
            <id>oss</id>
            <username>用户名</username>
            <password>密码</password>
        </server>
    </servers>

    ...

</settings>

用户名和密码就是你注册Sonatype的账号

2,配置项目pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.github.chuanzh</groupId>
  <artifactId>eweb</artifactId>
  <version>1.0.1</version>
  <packaging>jar</packaging>

  <name>eweb</name>
  <description>轻量web开发框架</description>
  <url>https://github.com/chuanzh/eweb</url>

  <licenses>
    <license>
      <name>The Apache Software License, Version 2.0</name>
      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
    </license>
  </licenses>

  <developers>
    <developer>
      <name>chuan.zhang</name>
      <email>zhangchuan0305@gmail.com</email>
    </developer>
  </developers>

  <scm>
    <connection>scm:git@github.com:chuanzh/eweb.git</connection>
    <developerConnection>scm:git@github.com:chuanzh/eweb.git</developerConnection>
    <url>git@github.com:chuanzh/eweb.git</url>
  </scm>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <profiles>
    <profile>
      <id>release</id>
      <distributionManagement>
        <snapshotRepository>
          <id>oss</id>
          <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
        </snapshotRepository>
        <repository>
          <id>oss</id>
          <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
        </repository>
      </distributionManagement>
      <build>
        <plugins>
          <!-- Source -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-source-plugin</artifactId>
            <version>3.0.1</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar-no-fork</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Javadoc -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-javadoc-plugin</artifactId>
            <version>2.10.4</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>jar</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <!-- Gpg Signature -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-gpg-plugin</artifactId>
            <version>1.6</version>
            <executions>
              <execution>
                <id>sign-artifacts</id>
                <phase>verify</phase>
                <goals>
                  <goal>sign</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>

</project>

pom.xml必须配置的有:name,description,licenses,developers,scm。

snapshotRepository和repository在Issure初审通过后会给你,另外id要和setting.xml中保持一致

6EC7FDB0-E87F-4A66-ABCD-D7DC14EC17D1

四,上传构件到OSS&发布

1,使用下面命令,会提示要求你输入GPG的密钥,就是申请时候填写的,此过程比较慢,需要耐心等待

mvn clean deploy -P release

2,在OSS中发布构件

使用sonatype账号登陆https://oss.sonatype.org ,通过模糊查找,选择Staging Repositories,如下图,该构件的状态此时应该为open,勾选它,点击close按钮,sonatype会先做检验,通过后,就可以点击Release了,如果审核不通过,尝试删掉后重新上传

49767CB8-C939-456A-B314-1DA45F606BFB

A521FDDB-E66E-4116-B604-DF098982EA20

3,通知sonatype已经发布构建

也就是在Issure中回复,再次等待,也是需要审核1~2天,若审核通过,会回复你如下的信息

C86DFA4C-BC11-4A82-8CEB-FD3F3D9BF75C

等待10分钟,你就可以在maven库中搜索到你的jar包了

3CB6AF70-C0F5-4F63-8F3C-14D22E8C3CA8

 

若下次修改了本地的代码,只需要重新执行第四步的命令即可