Redis分布式锁使用

最近项目中遇到同时有两个线程同时更新一行记录导致后面一条语句执行失败的问题,由于项目是部署在不同的服务器上,这里要控制两个线程的执行顺序,自然想到了使用Redis的锁,废话不多说,下面给出具体实现

/**
 * 核查四要素相同报文是否正在处理,如果有实例正在处理四要素相同报文pass,否则线程等待
 * 
 * @param processData
 */
public void checkPacketProcessRepeat(ProcessData processData) {
	try {
		// 四要素key
		String repeatKey = REPEATKEYSTART + processData.getReviseFlight().getKey();
		while (true) {
			// 设置nx锁,如果nx锁设置成功跳出去,继续执行报文后续处理流程
			if (setNX(repeatKey, EXIST, 3)) {
				log.info("FlightPreProcess-checkPacketProcessRepeat,报文处理拿到NX锁,直接pass,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
						.getSourceId());
				return;
			}
			// 如果有当前航班有nx锁、或者nx锁设置失败,则需要等待3秒,等待其他实例处理完成
			log.info("FlightPreProcess-checkPacketProcessRepeat,报文多实例并发处理,需要等待3秒,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
					.getSourceId());
			Thread.sleep(3000);
		}
	} catch (Exception e) {
		log.error("FlightPreProcess-checkPacketProcessRepeat,异常,key:{},e:{}", processData.getReviseFlight().getKey(), e);
	}
}

这里根据报文的四要素确定唯一条记录,先调用setNX获取redis锁,如果获取到了就执行后面的逻辑,如果没有获取到则等待3s再重试,下面是setNX方法的实现

/** 设置锁 */
private boolean setNX(final String key, String value, final int exp) {
	return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
		@Override
		public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
			byte[] serializeKey = redisTemplate.getStringSerializer().serialize(key);
			Boolean acquire = connection.incr(serializeKey) == 1;
			// 如果设值成功,则设置过期时间
			if (acquire) {
				connection.expire(serializeKey, exp);
			}
			return acquire;
		}
	});
}

这里使用了redis的incr命令,它是一个原子操作,如果key不存在,那么key的值将初始化为0,然后执行INCR操作,这里判断如果设置成功,则对key设置过期时间,相当于了一个带有时间的锁。

在Redis2.6.12版本后,使用set命令也可以实现分布式锁,具体代码如下:

public static Boolean setNX(final String key, final String value, final int exp) {
        return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				String result = jedis.set(key, value, "NX", "EX", exp);
				if ("OK".equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

这里重点说说第3个和第4个参数,这里填的是NX,意思是当key不存在时,我们进行set操作,若key已经存在则不进行任何操作,第4个表示我们要给key设置一个过期时间,具体时间由第5个参数决定。

另外我们这里保存了key对应的value值,所以线程可以根据value值来释放锁,这里的value值可以是线程的ID,比如我们线程后面的逻辑执行失败了,我们可以通过这个value值来尽快释放锁,减少其它线程的等待时间,我们可以使用Lua脚本来实现

private static final Long RELEASE_SUCCESS = 1L;
private static final RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

public static Boolean releaseLock(final String key, final String value) {
			return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
			@Override
			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
				Jedis jedis = (Jedis) connection.getNativeConnection();
				Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key),
						Collections.singletonList(value));
				if (RELEASE_SUCCESS.equals(result)) {
					return Boolean.TRUE;
				}
				return Boolean.FALSE;
			}
		});
    }

通过Lua脚本获取对应key的value值,如果value值和给定的一样,则释放锁

 

Spring Boot启动出现死锁

Spring Boot启动出现死锁

最近公司的一个项目在启动后,在调用往mongoDB写数据时,一直写不进去,运行一段时间后,程序出现内存溢出,jstack导出了线程信息,如下:

CC6B8F5EF4392AAA4F0B393FFDA32AA6 DFB379909A737E20E4A894BFF683739A

mongoDB在插入数据时发生了堵塞,在等待一个监控对象,主线程在调用afterPropertiesSet方法,一直在等待中,表示spring的初始化工作一直没有完成,所以我们结合代码分析,查看afterPropertiesSet方法

    @Override
    public void afterPropertiesSet() throws Exception {
        scheduleRulePriorityCacheService.start();

        if (isTesting == false) {
            kafkaConsumerTask.start();
            executeService.execute();
        }

    }

 

在spring属性设置完成后,执行了一个kafka消费任务,这个kafka消费任务大致的流程就是从kafka中取出数据,然后放入一个队列中,之后执行了executeService.execute()从队列里面拿出数据然后进行业务处理,我们看看execute()方法

	public void execute() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

这里就是一个无线循环操作,即如果队列有数据就消费,没有就等待200ms

我们知道,afterPropertiesSet方法是spring Bean的生命周期的一部分,这里发生了死锁,必定是锁住了一个资源,没有释放,而MongoDB又需要这个资源,我看到错误信息,在调用DefaultSingletonBeanRegistry.getSingleton方法时锁住了一个资源,我们查看spring的源代码,

7D5A37364ED4463F83CA98076DBEB6AD

可以看到锁住了singletonObjects对象,这个对象就是spring的单例容器,同样我们可以确定,在MongoDB调用注册事件的时候也需要这个对象,我们同样查看源代码

CECB8DBCEB5F05A176976D877FCEC3A1

可以看到MongoDB在注册事件的时候同时需要锁住retrievalMutex对象,那么retrievalMutex和singletonObjects有什么关系?我们接着看

ADE46D9008DA9E92021028F3E5582578 1F3CF43276456CBF7BF938AE5A1CC168

点进去查看getSingletonMutex方法,返回的就是singletonObjects对象,所有retrievalMutex实质和singletonObjects是同一个对象

那么就很容易得出结论了,在调用afterPropertiesSet方法时,singletonObjects对象一直没释放,而MongoDB又需要这个对象,所以产生了死锁。 解决方法,让afterPropertiesSet方法尽快释放singletonObjects对象,我们可以开启一个新线程来执行从队列中读取数据做业务处理的逻辑

	public void execute() {
		poolTaskExecutor.execute(this);
	}

	@Override
	public void run() {
		while (true) {
			try {
				if (receiveQueue.isEmpty()) {
					Thread.sleep(200);
				} else {
					String msg = receiveQueue.poll();
					if (StringUtils.isBlank(msg)) {
						continue;
					}
					poolTaskExecutor.execute(() -> {
						scheduleProcessCenter.process(msg);
					});
				}
			} catch (Exception e) {
				logger.error("ExecuteService error,e:{}", e);
			}
		}
	}

 

Mysql死锁的问题

Mysql死锁的问题

1,先看错误日志

WechatIMG21 WechatIMG22

从以上日志可以看出是两条SQL执行出现了问题,后面一条SQL回滚了

SQL为:

update dynamic_check_packet_system set check_flag = '3' where create_time <= '2018-12-19 02:00:00' and check_flag='0' and conflict_field not in ('suspectCancel');
update flight.dynamic_check_packet_system set check_flag='1', update_time='2018-12-19 09:55:00.0',check_start_time='2018-12-19 10:00:00' where id=14211034;

使用explain查看第一条SQL

E9C461C524DED9A648DC4DCFF089919F

可以看到,这个语句使用了idxcheckflag这个索引,createtime在前,为什么没有使用createtime?

查看表中<=createtime的数据,发现有14166149条,远远大于checkflag=0的记录数,这时MySQL就会优先选择使用chekflag的索引,所以第一条语句会把checkflag=0的所有记录数都锁住。

第二条SQL同样更新check_flag=0,id=14211034的记录,但这条记录是被第一条SQL锁住的,所以就会更新失败了?

深入分析:
这里查看MySQL引擎,用的是Innodb,Innodb是支持行锁的,既然第一条SQL把这条记录行锁住了,第二条SQL应该等待才对,为什么会发生死锁呢?所以这里一定存在两把锁,而且锁的顺序不同。

我们知道MySQL的Innodb主键使用了聚集索引(索引直接指向实际数据),而如果再新建一个索引,这个索引会指向主键索引,然后通过主键索引找到数据,所以这里存在需要更新聚集索引ID数据和二级索引check_flag数据

第一条语句通过checkflag=0查找,那么就先会锁住二级索引checkflag数据,然后再去获取聚集索引ID数据的锁

而第二天SQL则是通过ID查找,那么就会先会锁住聚集索引ID数据,然后再去获取二级索引check_flag数据

显然这样获取锁的先后顺序不同,就造成了死锁。

问题解决:
让第一条SQL语句使用createtime索引,可以指定一个createtime>’开始时间’ and createtime<’结束时间’来减少扫描行数,让MySQL优先使用createtime索引。

 

分布式一致性协议

分布式一致性协议

在分布式系统中为了解决数据的一致性,主要有二阶段提交协议、三阶段提交协议、Poxos算法、TTC协议、Rraft协议、ZAB协议(zookeeper的协议),下面就分别介绍这几种协议

二阶段提交协议

二阶段提交协议主要思想是:有一个协调者,多个参与者,协调者负责发送命令给参与者,确保数据能同时更新到所有节点上,主要步骤如下:

Two-phase_commit

1)协调者将事务的请求发送给所有参与者,询问是否可以提交事务,参与者锁住自己的资源,并写undo/redo日志,如果参与者都准备成功,则向协调者回应“可以提交”

2)协调者所有参与者都会有“可以提交”,此时向参与者发送“正式提交”命令,所有参与者开始提交自己的事务

如上述1)2)有任何不成功,所有参与者都将回滚自己的事务

二阶段提交原理比较简单,实现起来也比较方便,但问题也比较明显,如:

1,同步堵塞:所有参与者在没有收到协调者发送过来的“正式提交”命令,都将锁住资源,其他线程如果要获取资源只能等待

2,单点问题:所有参与者都依靠一个协调者,如果协调者在2)操作突然失去联系,这个时候所有的参与者都不知道如何处理,是否提交事务

3,数据不一致性:在2)操作中,可能由于网络原因,有些节点收到提交命令,有些没有,从而照成数据不一致的问题

三阶段提交协议

Three-phase_commit_diagram

三阶段相比二阶段的思想是在操作1)中多了一步,首先询问:是否可以锁资源,如果所有人同意了才开始锁资源,后面的步骤就童二阶段提交了,它相比二阶段协议,解决了如果协调者突然失去联系,参与者仍可以提交他们的事务

但三阶段实现起来还是比较复杂,而且由于参与者最后还是会提交自己的事务,也会造成数据不一致行

TTC协议

为了减少资源被堵塞的时间,产生了TTC协议,可以这样理解TTC是针对SOA服务的锁,2PC是针对数据库的锁

比如我们需要从A账户转100到B账户,必然会涉及到如下几个步骤 1,读取A账户金额,-100 2,读取B账户金额,+100

2PC就会先锁住A账户和B账号的数据,那么任何其它线程想要读取这个账号数据,都将堵塞

TCC则先会从A账号-100元,这时线程也可以读取A账号的数据了,如果转入B账号发生失败,那么就会调用回滚接口,再将A账号+100元,这实质是一种补偿机制

Poxos算法

理解Poxos算法,先弄清楚几个角色

1,proposers 投票人,可以理解为收集人民意见的人

2,acceptors 接受投票者,可以理解为人大代表

3,learners 学习者,可以理解为记录员,将处理的结果记录下来

608A9B29BB2FE2E9787DDB3A3876268E

你可以这样理解,首先投票人接收到人民的意见,就开始提出一个法案,并把这个法案告诉人大代表,说现在人民提了一个法案,我们来开始讨论这个法案吧,人大代表如果同意了(这里的同意指的是有半数以上的人同意了),那么就开始讨论这个法案,如果人大代表都同意(半数以上人同意)通过这个法案,那么大家就会确定下来,交给记录人员将这个法案记录下来,并告诉人民,现在已经确定了一个新法案

poxos算法也有一个问题,即如果有两个投票人接到了人民的法案,我们现在假设为投票人P1和投票人P2,他们同时接受到两个法案A1,A2,A2的版本要大于A1的版本,这里说明下人大代表会接受法案版本大的那个,如果有下面这种情况 1)投票人P1发送了法案A1,并且人大代表都接受到了这个法案,所以决定开始讨论这个法案,并告诉所有人

2)投票人P2发送了法案A2,人大代表又接收到了法案A2,发现A2的版本大于A1的,所以放弃了A1,开始讨论A2的

3)投票者P1发现自己的法案被放弃了,就又会提出一个新的法案A3,A3的版本大于A2,并会发给人大代表,人大代表又会放弃A2,开始讨论A3

4) 投票者P2发现自己的法案被放弃了,就又会提出一个新的法案A4,A4的版本大于A3,并会发给人大代表,人大代表又会放弃A3,开始讨论A4

这样依次反复,就会出现不断更换法案的问题,所以我们就需要只有一个投票人来发送法案,如果投票人退休了,就再新选一个投票人,下面我们来讲讲ZAB协议

ZAB协议

角色定义:

1,leader,主节点,对应上面的投票人,只能有一个

2,follower,从节点,对应上面的人大代表,必须为多个

685547DB334887C0A05B4A68827CBBC1

这里有一个leader,所以最开始必须有一个选举的过程,确定谁来当leader, 所有节点都向各个节点发送一条消息,表示我要当leader,这里由于节点发送的时间不同,每个命令发给其他节点的时间也不同,每个节点最开始收到请求后,都会同意这个命令,如果某个节点收到半数以上的节点回应“同意”,那么它就会将自己设置为leader,并告诉其它节点,现在我是leader,你们需要听从我的,其它节点就会把自己设置为follower

在zookeeper中,最开始的选举是会将myid最大的节点设为leader节点,之后会根据节点zxid,来确定谁来当下一个leader节点,当确定leader后,具体处理流程如下:

1)收到客户端的一个事务请求,可能是leader或follower,follower收到请求后也会转发给leader

2)leader收到follower的事务请求,开始把事务处理请求发给所有follower,follower收到事务请求开始锁资源,处理,将redo/undo写入日志,如果执行成功告诉leader

3)leader收到半数以上的follower回应成功,则再次发送一个命令给follower,说可以提交事务了

4)follower接受到命令,提交自己的事务,并将结果返给leader

5)leader将结果发给客户端

Raft协议

Raft协议同ZAB协议,只不过ZAB是follower节点向主节点发送心跳,确保主节点是否存活,而Raft是Leader节点向follower节点发送心跳,以下这个动画很生动的讲解了分布式系统下一致性的保证:

http://thesecretlivesofdata.com/raft/

你也可以模拟分布式系统不同场景各节点的情况:

https://raft.github.io/

参考文章:

https://coolshell.cn/articles/10910.html 参考书籍

《从POXOS到Zookeeper分布式原理一致性与实践》