Category Archives: 数据结构与算法

Kafka使用介绍

Kafka使用介绍

kafka是什么?

kafka是一个分布式消息数据流平台,为处理实时数据提供一个统一、高吞吐、低延迟的平台。其本质是一个提供大规模分布式发布/订阅的消息队列,此外kafka还可以通过kafka connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams–一个Java的流式处理库。

它的2个大的应用: 1,构建实时的流数据管道,确保系统和应用之间数据传输的可靠 2,对应用之间的数据流进行转换和反应。

基本概念

  • Topic kafka将消息分门别类,每一类消息称之为一个主题(Topic) 每个消息(也叫记录recode)是由一个key,一个value和时间戳构成
  • Producer 发布消息的对象称之为主题生产者。
  • Consumer 订阅消息的对象称之为主题消费者
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群,集群中每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些发布的消息。

kafka的核心API

  • Producer API:允许应用发布一个或多个kafka主题
  • Consumer API:允许应用订阅一个或多个kafka主题
  • Streams API:转换数据流,将一个或多个主题的数据流转换为一个或多个主题的数据流输出
  • Connector API: 允许构建可以重复使用的生产者和消费者,将topic连接到现有的应用或数据系统上,例如,一个建立在数据库上的连接可以捕获表的每一次修改。

kafka的主题(Topic)和日志

topic可理解是一类消息的集合,在kafka中,topic是可以被多个订阅者来消费的 每个topic,kafka包含了一组分区数据,叫partition,如下:

每一个partition是一个有序、不可变的序列记录,每个记录都有一个唯一的序列ID叫offset。 当消息记录被Consumer消费后,这些记录不会被删除,kafka提供了一些配置,比如:按日期、按空间来定时删除记录。 kafka分区有两个目的,一是它便于扩展,不受单个服务器的限制,二是,它可以并行接受和处理多个任务。

分布式

每一个partition日志被分布在kafka的集群服务器上,并且可配置每个parition可重复的份数。 每个partition有一个leader,零个或多个follower,正常情况下leader负责所有的读写请求,而follower负责同步leader上的数据,当leader不可用时,follower会选举一个新的leader替代原来老的leader。

生产者

生产者负责选择一个topic发布消息,同时指定发布到哪一个partition上,最简单的方式是按照partition轮询,也可指定按权重指定。

消费者

消费者有叫一个组(group)的概念,比如多个消费者属于同一个组,那么他们将一起消费这个topic的数据,如下图:

一个kafka集群有两台服务器,4个partition,有两个分组A和B,A有2个消费者,B有4个消费者, 每个partition可以保证数据记录的顺序性,但客户端如果是并行处理,如groupA,C1同时消费P0、P3就可能照成数据顺序错乱的问题,如果要保证数据的一致性,那么顺序处理一个Topic的所有消息,那么就只提供一个分区。

kafka保证

  • 生产者发送到topic的消息会按照他们发送的顺序保存,如果消息M1、M2被同一个producer发送,当M1被先发送,那么它的offset值将会小于M2的
  • 消费者看到的数据也是根据他们保存的顺序
  • 如果一个topic配置了复制因数N,kafka集群将最大允许N-1台服务器同步失败。

Kafka和传统的消息系统之间的区别

  • 结合传统的优点:传统的消息系统分:队列和发布订阅两种模式,队列可以允许多个消费者同时瓜分数据,而发布订阅模式,会将消息通知到每一消费者。kafka结合了这两个模式的优点,当在kafka中,多个消费者的组ID设置为一样时,那么将采用队列的模式,如果组ID不同,则采用发布订阅模式。
  • 更强的顺序性保证:kafka中引入分区功能,一个topic可有多个分区,分区中保证了顺序的一致性,如果启动多个消费者,kafka保证每个消费者只会读取一个分区中的数据,当有多于分区数的消费者,那么这个消费者将一直处于空等待,不会收到任何消息

kafka的存储性能

kafka作为一个消息存储器,他会将消息写入到磁盘,并通过复制镜像,来保证容错。kafka允许所有的写入操作完成后再继续操作。因为kafka中保持了一个指针的方式,在存储50KB和50TB,其性能都是一样的。kafka通过这种指针读取数据,所以数据的大小,不会影响其读写性能。

kafka的流处理

kafka不仅提供了读、写、存储,还提供了对数据流进行处理,比如:一个零售APP,kafka可以从输入topic读取数据,然后使用StreamAPI统计数量,调整价格然后输出到topic中,类似的操作还包括聚合计算、数据流连接等。

Twitter分布式ID算法Snowflake

Snowflake是Twitter开发的一个分布式ID生成算法,有以下几个特点:

1)默认情况下41bit的时间戳可以支持该算法使用到2082年,10bit的工作机器id可以支持1023台机器,序列号支持1毫秒产生4095个自增序列id,即理论上1秒产生409万id

2)高性能,不依赖其他第三方服务,稳定性高

3)强依赖于机器时钟

下面看看它的算法结构图:

图片 1

可以看到它是由三部分组成

1)当前时间戳

2)工作机器ID:包括dataCenterId和workId,可自己配置

3)12bit序列号,即从0增长到4095

算法其实很简单,因为不依赖于其它服务器,都是做时间比较和位移操作,流程图如下:

CA1FEEEF-6CBB-4D5F-993B-ED2C51C9D198

下面针对JAVA版的算法具体分析

// ==============================Fields===========================================
    /** 开始时间截 (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /** 机器id所占的位数 */
    private final long workerIdBits = 5L;

    /** 数据标识id所占的位数 */
    private final long datacenterIdBits = 5L;

    /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 支持的最大数据标识id,结果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** 序列在id中占的位数 */
    private final long sequenceBits = 12L;

    /** 机器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 数据标识id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /** 时间截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作机器ID(0~31) */
    private long workerId;

    /** 数据中心ID(0~31) */
    private long datacenterId;

    /** 毫秒内序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的时间截 */
    private long lastTimestamp = -1L;

因为算法都是基于二进制的位移操作,所以上面定义了一大堆变量,基本都是一些需要位移的长度

如序列IDsequenceBits在定义了它的二进制长度,序列号最大为4095,它的二进制占用长度就是12

同样datacenterId和workId最大数为31,二进制占用的长度就是5,workerIdShift,datacenterIdShift,timestampLeftShift定义了他们需要的位移数

为啥都要基于二进制的位移来操作呢?因为这样对于机器来说计算更快

核心方法生成ID

    // ==============================Methods==========================================
    /**
     * 获得下一个ID (该方法是线程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            //毫秒内序列溢出
            if (sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

1)首先调用timeGen()获取当前时间戳

2)如果当前时间戳小于上次记录的时间戳,则抛出异常,表示始终回拨了(所以要保证每台机器上时间统一)

3)如果当前时间戳等于上一次时间戳,表示同一秒内有多个并发请求,此时序列号就发挥作用了,递增+1,这里有一个操作(sequence + 1) & sequenceMask,就是要与最大序列号4095做&操作,即如果它大于了最大的序列号,那么sequence就等于0了,此时调用tilNextMillis()方法做等待操作,直到生成的时间戳大于上一次时间戳,因为同一秒只支持4095个并发

4)如果当前时间戳大于上一次,则直接把sequence置0

5)将上一次时间戳更新为当前时间戳

6)最后一步也是关键,通过位移操作,把sequence(序列号),workId(工作ID),datacenterId(数据中心ID),timestamps(时间戳)拼到一起

说明这里还有一个twepoch,表示起始的时间点,这里的作用主要是控制生成ID的大小,如果你想从较小的ID开始递增,那么twepoch就可以设置的大一些,可以等于当前的时间戳,因为(timestamp – twepoch)的值就越小,反之则时间越往前ID越大

 构造方法

传入一个datacenterId和workId就可以了,说明下不同机器可以使用不同的datacenterId,一台机器上不同的项目可以使用不同的wokId

   /**
     * 构造函数
     * @param workerId 工作ID (0~31)
     * @param datacenterId 数据中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

最后说说snowflake的优缺点:

优点:

1)毫秒数在高位,自增序列在低位,整个ID都是趋势递增的。

2)不依赖数据库等第三方系统,以服务的方式部署,稳定性更高,生成ID的性能也是非常高的。

3)可以根据自身业务特性分配bit位,非常灵活。

使用场景如:生成订单ID,因为ID不是连续递增的,所以可以保证订单数的隐蔽性

缺点:

1)强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

2)分布式部署时,服务器最好开启Network Time Protocol (NTP)服务,保证每个机器时间一致

下次说说如何使用zookeeper调度生成ID及保证服务的高可用。

HashMap原理解析

HashMap,在程序中我们经常要用到的集合,它的实现是通过数组+单向链表来实现的

先从HashMap的put方法讲起,基本思路是:

1,会通过数据的key做hash算法,得到数组bucket的下标值,

2,然后再把<key,value>以链表的形式(只有一个节点)插入到bucket[i]中,如果两个key算出来的下标值i一样,那么新的元素就会添加到链表之后

查看源代码:

    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }

    static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }

当key为null时,默认放入到数组的第0个位置,不为null,获取key的hashcode,进行右移16位并与hashcode做异或运算,得到下标值

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        //tab数组为空,则创建一个
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        //根据容量大小和hash值计算(&)下标值
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            //若节点存在,则替换
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            //tab存放为树,jdk8默认为8个节点
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            //tab存放为链表,添加元素
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        //如果map数量大于负载因子*最大容量,则扩容
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

获取元素,基本思路:如果key为null,直接命中value,如果不为空,通过key获取数组下标值,如果第一个节点的key值相同,则直接返回,若为树,则遍历树结构,若为链表,则遍历链表对比key

    public V get(Object key) {
        Node<K,V> e;
        return (e = getNode(hash(key), key)) == null ? null : e.value;
    }

    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        //先判断首节点
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (first = tab[(n - 1) & hash]) != null) {
            if (first.hash == hash && // always check first node
                ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            //多个节点
            if ((e = first.next) != null) {
                //为树
                if (first instanceof TreeNode)
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                //为链表
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null);
            }
        }
        return null;
    }

数据扩容,resize(),基本思路是:将新的容量扩充到原来的2倍,并将旧的数组copy到新的数组中,注意原来链表或树的节点,位置可能会发生变化,要么是原来的位置,要么是原来位置*2,下面一幅图可以很好的描述resize元素的变化情况

d7acbad8-d941-11e4-9493-2c5e69d084c0

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            //超过限制的最大值,则不再扩容
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            //计算新的容量,是原来的2倍
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            //将旧的bucket复制到新的bucket中
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            //若节点为双数,则index位置不变
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            //若节点为单数,则index位置为原位置+oldCap
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

总结:

对于HashMap的原理可以理解如下:

目前有n个篮子,我要把贴有标签(标签代表key)的苹果放入篮子中,首先确定放入哪一个篮子中,所以通过苹果的标签来分类(hash算法),某一类的都放入在一个篮子中

为何要这样做,想想,如果将所有苹果都放入一个篮子中,那么我要取出某个标签的苹果,我就要在篮子中一个一个的找,如果苹果很多,这样效率是很低的,所以我是先通过苹果的标签定位到某个分类的篮子,再去里面找,这样的效率就提高很多了

参考文章:

1,http://yikun.github.io/2015/04/01/Java-HashMap%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86%E5%8F%8A%E5%AE%9E%E7%8E%B0/

2,http://blog.csdn.net/vking_wang/article/details/14166593

JAVA内存模型

Java内存模式是理解多线程执行的重要依据。

什么是内存模型,为什么需要它?

先看一张图:

8A628D2C-5E67-4A6B-9A2E-66699729E2D4

比如给一个变量aVariable赋值:aVariable=3,当然单线程下是没什么问题的,但如果是多线程呢?会不会有影响?答案是肯定的,多线程下数据的读写都会发生竞争。

所以用一句话总结JAVA内存模型是:

通过加锁来控制多线程下共享变量的读写顺序操作

如何保证?JMM定义了一套Happens-Before规则,包括:

程序顺序规则:如果程序中操作A在操作B之前,那么在线程中A操作将在B操作之前执行

监视器锁规则:同一监视器的解锁发生在加锁之前。

volatile变量规则:对volatile变量的写入操作必须在对该变量的读操作之前执行。

线程启动规则:Thread.start的调用必须在线程其他操作之前执行

线程结束规则:线程的任何操作必须在检测到其他线程结束之前执行,或从Thread.join中成功返回,或调用Thread.isAlive时返回false

中断规则:调用interrupt时,必须在中断线程检测到interrupt之前执行(就是先调用了interrupt才会抛出InterruptedException,感觉是废话)

终结器规则:对象的构造函数必须在启动对象的终结器之前执行完成

传递性:如果操作A在操作B之前执行,且操作B在C之前执行,那么A必须在C之前执行

如果程序中不满足以上的情况,那么多线程执行下就会发生重排序,什么是重排序?举个例子,如下:

public class PossibleRecordering {

	static int x=0,y=0;
	static int a=0,b=0;

	public static void main(String[] args) throws InterruptedException {
		Thread one = new Thread(new Runnable() {
			@Override
			public void run() {
				a = 1;
				x = b;
			}
		});

		Thread other = new Thread(new Runnable() {
			@Override
			public void run() {
				b = 1;
				y = a;
			}
		});

		one.start();other.start();
		one.join(); other.join();
		System.out.println("x="+x+",y="+y);
	}

}

上面的例子x和y是多少呢?不确定,可能是0,也可能是1,线程中a=1和x=b两者没有任何依耐性,所以JMM会对这两个操作执行重排,先执行哪个还真不一定。如果想要x=b在a=1之后执行,那么将x用volatile修饰

当使用volatile修饰后,JMM确保

1)当第二个操作是volatile写时,不管第一个是什么,都不能发生重排

2)当第一个操作是volatile读时,不管第二个操作是什么,都不能发生重排

3)当第一个操作为volatile写,第二个是volatile读时,都不能发生重排

为了好记就是:一读二写不能发生重排

final关键字

在构造函数中初始化的使用final修饰的关键字对象,优先于其他线程的读操作

举个例子:

public class SafeStates {

	private final Map<String, String> states;
	private int i=0;

	static SafeStates obj;

	public SafeStates () {
		states = new HashMap<String, String>();
		states.put("alaska", "AK");
		states.put("alabama", "AL");
                i=1;
	}

	public void addStates() {
		states.put("add", "AD");
	}

	public static void writer() {
		obj = new SafeStates();
	}

	public static void reader() {
		SafeStates o = obj;
		System.out.println(o.states);
		System.out.println(o.i);
	}

}

一个线程先调用了writer方法,然后另外一个线程调用reader方法,JMM可确保使用final修饰的变量,的赋值操作在reader方法之前,即reader方法读取到的states对象是{“alaska”, “AK”,alabama”, “AL”},但i值就不可保证了,可能读取的值为0

另外如果在非构造方法中,比如调用了addState方法,其他线程在读取的时候也不能确保读取到states值的最新数据

 

CountDownLatch使用原理

先看一个例子,来说明下CountDownLatch运用场景

public class SpiderDemo {
	private static Logger logger = LoggerFactory.getLogger(SpiderDemo.class);
	private static ExecutorService spiderPool = Executors.newFixedThreadPool(5);

	public static void main(String[] args) {

		CountDownLatch countDown = new CountDownLatch(5);
		for (int i=0;i<5;i++) {
			spiderPool.execute(new SpiderThread(countDown, i));
		}

		try {
			countDown.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		logger.info("spider end");
		spiderPool.shutdown();
	}

	static class SpiderThread implements Runnable{
		private CountDownLatch countDown;
		private int index;

		public SpiderThread(CountDownLatch countDown, int index) {
			this.countDown = countDown;
			this.index = index;
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			logger.info("spider"+index+" data begin.. ");
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			logger.info("spider"+index+" data complete.. ");

			countDown.countDown();
		}

	}

}

我们来看看上面的例子,这里创建了一个CountDownLatch对象,传入一个数值,使用5个线程去执行SpiderThread,结束后使用countDown()方法将计数器递减,最后使用await()方法堵塞,直到所有的线程执行完成。

所以CountDownLatch的使用场景是可以异步去执行多个任务,主线程可以等待所有任务执行完成后,再继续执行。

下面从源码方面分析下CountDownLatch的实现原理

首先查看构造方法CountDownLatch(int count )

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

	Sync(int count) {
            setState(count);
        }

当count小于0的时候,直接抛出异常,大于0则创建了一个Sync对象。Sync构造函数则是直接设置了一个变量值state来保存count数组

再看看countDown()方法,很显然它的目的是将count数组-1

    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

它里面是直接调用了releaseShared(1)方法,而releaseShared又调用了tryReleaseShared方法,如果尝试执行成功,则再调用doReleaseShared()方法

	protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

tryReleaseShared是干什么的呢?从上面的例子看它的作用就是将count值-1,如果当前count值就是0了,那就啥都不干,直接返回false了,否则使用CAS算法尝试将count值-1,最后判断当前的count值是否为0,若为0则返回true,返回true就需要执行doReleaseShared方法了

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared又干了些啥?doReleaseShared的作用就是唤醒主线程继续执行,这里调用了unparkSuccessor方法

最后来看下await()方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

可以看到最终会调用doAcquireSharedInterruptibly(若中途线程有被打断,则抛出异常了)

首先会添加一个Node节点(Node.SHARED看源码实质就是new Node()),这个节点就是保存所有调用await()方法的线程(因为可能会有多个线程调用了await()方法),所以这里每调用一次就保存下来一个节点,最后一个一个的释放,如果这里判断count不为0,则会使用park方法将线程挂起。

 

JAVA垃圾收集器(二)

上篇文章说了JAVA垃圾收集器的一些算法,这次讲讲JVM中用到的一些垃圾收集器,这些垃圾收集器也是运用这些算法来回收内存的,如下图:

141513122384006

新生代收集器有:Serial、ParNew、Parallel Scavenge

老年代收集器有:CMS、Serial Old Parallel Old

Serial收集器(复制算法)

新生代单线程收集器,使用标记清理算法,且标记、清理都是单线程,优点:运行高效,缺点:界面会产生卡顿的现象

Serial Old收集器(标记-整理)

老年代单线程收集器,Serial老年代收集版本

ParNew收集器(停止-复制算法)

新生代收集器,Serial的多线程版本,并发标记、清理,在多CPU下有比Serial更好的性能

Parallel Scavenge收集器(停止-复制算法)

新生代收集器,也是使用多线程并行收集,但关注的是高吞吐量(最高效率的利用CPU时间),吞吐量=用户线程时间/(用户线程时间+GC线程时间),适用于后台对交互性不高的应用

Parallel Old收集器(停止-复制算法)

老年代收集器,并行收集,吞吐量优先

CMS(Concurrent Mark Sweep)收集器(标记-清除算法)

以获取最短停顿时间为目标的收集器,应用于如(B/S网站),并发收集,占用CPU资源较高。

 对象分配

1)对象优先会在Eden上分配,如果空间不够,则会发起一次Young GC,清除非存活的对象,把存活的对象移动到Survivor中

2)大对象直接进入老年代,大对象指的是需要大量连续内存空间的Java对象,比如很长的字符串或数组,要避免这种大的对象的频繁创建与销毁

3)长期存活的对象将进入老年代,虚拟机会给每个对象定义一个对象年龄计数器,如果对象在Eden出生并经过第一次Young GC后仍然存活将被移动到Survivor中,并对对象年龄设为1,对象在Survivor区中每熬过一次Young GC,年龄就增加1岁,当它的年龄增加到一定程度(默认为15岁)时,就会被晋升到老年代中

4)动态对象年龄判定:虚拟机并不总是要求对象必须到达年龄才能晋升到老年代,如果在Survivor中相同年龄所有对象大小的总和大于Survivor空间的一半,年龄大于或等于该年龄的对象就可以直接进入到老年代。

5)触发Full GC:当老年代的剩余空间不足以容纳从Survivor区移动过来的对象,则会触发一次Full GC,另外在触发Young GC时,JVM会进行空间分配担保,如果新生代晋升了平均大小大于老年代的剩余大小也会触发Full GC;调用System.gc()也会触发Full GC,Full GC会对整个堆进行回收,会照成整个应用停止,所以要尽量减少Full GC的次数。

参考资料:

http://www.cnblogs.com/sunniest/p/4575144.html

 

JAVA垃圾收集器(一)

在JAVA中,JVM虚拟机已经帮我们实现了Java垃圾内存的回收,在说JAVA垃圾收集器之前,先思考下面几个问题:

1)哪些内存需要回收?

2)什么时候回收?

3)如何回收?

闭着眼睛思考1分钟…

好,下面依次来说下:

1,哪些内存需要回收?

我们知道JAVA内存模型包括:Java堆,方法区,线程栈(分为:程序计数器,虚拟机栈,本地方法栈)

线程栈(程序计数器,虚拟机栈,本地方法栈)这三个是随着线程而生,线程而灭。栈中的栈针随着方法的进入和退出有条不紊的执行者出栈和入栈的操作,每一个栈帧中分配多少内存基本上是在类结构确定下来就已知的,所以方法结束或线程结束后,内存自然就跟着回收了,所以这部分不用过多考虑内存回收的问题

Java堆和方法区,这些里面都是用来存一些静态的类,或者动态代理生成的类,创建的对象,这些都是在程序运行时动态创建的,所需要的内存我们就不确定了,如果创建的对象使用完后,一直没有被回收,就导致内存溢出了,所以这部分就是垃圾回收器所要关心的,他要识别出那些无效的对象回收掉

2,什么时候回收?

其实第一点已经说了,就是那些无效的对象,何为无效的对象?即不再可能被任何途径使用的对象

如何判断这个对象为无效对象呢?垃圾回收提供了2种算法:引用计数法和根搜索算法,

引用计数法:即判断某个对象不在被其它对象引用,当某个对象被一个对象引用时,计数器+1,当引用失效时,则-1,直到减为0时,表示可以回收了,但这不能解决A,B之间相互引用的问题,A中有一个变量指向B,同时B中有一个变量指向A,所以即使A,B最后都不会再用了,由于他们的引用关系还在,仍然不能被回收。

JAVA中使用的就是根搜索算法来判断对象是否存活的

根搜索算法:JAVA中定义了一个名为“GC Roots” 的对象来作为起始点,从这个节点开始找,如果某个对象到这个“GC Roots”对象没有引用连接,那么表示这个对象就可以被回收了。

哪些对象可以作为GC Roots呢?Java中定义了这些对象:

1)虚拟机栈(栈帧中的本地变量表)中引用的对象。

2)方法区中类静态属性引用的对象。

3)方法区中常量引用的对象

4)本地方法栈中(一般说的本地方法)应用的对象

如下图,object1,object2,object3,object4不能被回收,object5,object6,object7虽然有互连,但是可以被回收

75B97DA3-0F8F-43CB-9BDD-2B310EF88B90

3,如何回收?

关于回收有3中算法:

1)标记-清除算法

javaneicunbiaojiqingchu

如上图,先标记出无效的对象,再进行清理,它有两个缺点:

效率问题:标记和清除过程的效率都不高

空间问题:如上图,清除后会产生大量不连续的内存碎片

2)复制算法

javaneicunfuzhi

如上图,先将可用内存划为为大小相等的两块,每次只使用其中一块,当这一块内存用完了,将存活的对象复制到另一份上,并将原先那块一次性清理掉

缺点:以空间换时间,这样,可用内存就缩小为原来的一半了,当然JVM做了也做了优化

在使用这种算法回收新生代时,不用要求内存分为1:1等量的两块,因为新生代一般都是朝生夕死,所以只会有少部分对象存活下来,JVM将新生代分为了Eden和两块较小的Survivor,Eden和Survivor内存为8:1,当Survivor内存不够时,就需要其他内存(老年代)来担保为其承担部分内存。

3)标记-整理算法

javaneicunbiaojizhengli

如上图,类似于“标记-清理”,也是先标记处存活的对象,但之后是先将存活的对象都向一端移动,然后清理掉边界之外的内存,这种算法一般适用于JAVA老年代的回收

说明:JAVA堆分为了新生代和老年代

新生代每次回收都会有大批对象死去,只有少量存活,所以采用复制算法

而老年代每个对象的存活率很高,且没有额外的空间对其担保,所以可以采用“标记-清理”或“标记-整理”算法来回收

 

使用二进制进行权限控制

1,什么是二进制?

二进制是用01两个数码来表示的数,他的规则是“逢二进一”

比如:二进制00000000表示十进制000000001表示十进制1

2,如何使用权限控制?

二进制权限控制说的简单点,即是二进制与十进制之间的转换

用二进制表示权限值是按位来设置的,即每个位占一个权限,

比如:00000001表示第一位拥有一种权限(转为十进制权限为:1

00000011表示第一位,第二位各拥有一种权限(转为十进制权限为3

再比如:某个人拥有老师,班主任,教务主任3种权限,我们可以用二进制表示为:

00000111,用十进制表示为7

3,一个接口权限控制的例子。

要求:通过二进制权限控制不同来源的客户端(如:平板,手机,网页)登录的用户可以访问指定应用。

说明:目前有3个数据源(平板,手机,网页),他们对应的二进制权限位分别为 1, 2, 3

有一个用户user,他有属性:idnamesexpower

一般接口会为每个数据源分配一个appKey,设置平板为appKey1,手机appKey2,网页appKey3

假设我们设置用户user,可以通过平板和网页访问,即用二进制表示为00000101,用十进制表示:power = 5

程序实现:

使用binaryPosList 表示“二进制权限位集合”

for(int i=0;i<binaryPosList.size;i++) {
    power += pow(2,binaryPosList[i]-1);
}

private static int pow(int a, int N) {
    if(N == 0) {
        return 1;
    } else {
        return a*pow(a,N-1);
    }
}

所以power = pow(2,1-1) + pow(2,3-1) = 5;

当用户使用手机登录时:

接口通过传过来的appKey 查找到对应的二进制位为2,同时我们查找出此用户的权限power5,二进制位00000101

首先说明下,当二进制最后一位为0时,转换为十进制必为偶数,当二进制最后一位为1时,转换为十进制必为奇数。

所以可以将power需要对比的位数移动到末尾,即将二进制数向右移动(二进制位-1),即移动后为:00000010,末尾移动的数丢弃,首位移动后用0补充。Java中正好有移位的算法。

Int n = power >>> (二进制位 -1)

n为偶数(n % 2 == 0)则表示没有权限,若n为奇数则表示有权限。

常用排序(冒泡,选择,插入,快速)

算法:

1. 冒泡排序

说明:外层循环以第1个元素为起点i,内层循环以第i+1为起点j,比较i和j对应的元素大小,只要有j元素小于i元素,则两者替换

代码示例:

public static int[] sort(int[] arr) {
        int temp = 0;
        for(int i=0;i

 

2.选择排序:

说明:类似于冒泡排序,外层循环以第1个元素为起点i,内层循环以第i+1为起点j,先找出内层元素中最小的元素,两者替换(优于冒泡排序)

代码示例:

public static int[] sort(int[] arr) {
        int temp = 0;
        for(int i=0;i

 

3.插入排序

图解:

1

2

代码示例:

public static int[] sort(int[] arr) {
    int temp = 0;
    for(int i=1;i

 

4.快速排序:

图解:

3

代码示例:

    public static int getPoint(int[] arr, int low, int high) {
        int point = low;
        int contrast = arr[point];
        for (int i = low + 1; i <= high; i++) {
            if (arr[i] < contrast) {
                point++;
                swap(arr, point, i);
            }
        }
        swap(arr, low, point);
        return point;
    }

    public static int[] quickSort(int[] arr, int low, int high) {
        if (low < high) {
            int point = getPoint(arr, low, high);
            quickSort(arr, low, point - 1);
            quickSort(arr, point + 1, high);

        }

        return arr;
    }