最近项目中遇到同时有两个线程同时更新一行记录导致后面一条语句执行失败的问题,由于项目是部署在不同的服务器上,这里要控制两个线程的执行顺序,自然想到了使用Redis的锁,废话不多说,下面给出具体实现
/**
* 核查四要素相同报文是否正在处理,如果有实例正在处理四要素相同报文pass,否则线程等待
*
* @param processData
*/
public void checkPacketProcessRepeat(ProcessData processData) {
try {
// 四要素key
String repeatKey = REPEATKEYSTART + processData.getReviseFlight().getKey();
while (true) {
// 设置nx锁,如果nx锁设置成功跳出去,继续执行报文后续处理流程
if (setNX(repeatKey, EXIST, 3)) {
log.info("FlightPreProcess-checkPacketProcessRepeat,报文处理拿到NX锁,直接pass,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
.getSourceId());
return;
}
// 如果有当前航班有nx锁、或者nx锁设置失败,则需要等待3秒,等待其他实例处理完成
log.info("FlightPreProcess-checkPacketProcessRepeat,报文多实例并发处理,需要等待3秒,key:{},sourceId:{}", processData.getReviseFlight().getKey(), processData.getReviseFlight()
.getSourceId());
Thread.sleep(3000);
}
} catch (Exception e) {
log.error("FlightPreProcess-checkPacketProcessRepeat,异常,key:{},e:{}", processData.getReviseFlight().getKey(), e);
}
}
这里根据报文的四要素确定唯一条记录,先调用setNX获取redis锁,如果获取到了就执行后面的逻辑,如果没有获取到则等待3s再重试,下面是setNX方法的实现
/** 设置锁 */
private boolean setNX(final String key, String value, final int exp) {
return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
byte[] serializeKey = redisTemplate.getStringSerializer().serialize(key);
Boolean acquire = connection.incr(serializeKey) == 1;
// 如果设值成功,则设置过期时间
if (acquire) {
connection.expire(serializeKey, exp);
}
return acquire;
}
});
}
这里使用了redis的incr命令,它是一个原子操作,如果key不存在,那么key的值将初始化为0,然后执行INCR操作,这里判断如果设置成功,则对key设置过期时间,相当于了一个带有时间的锁。
在Redis2.6.12版本后,使用set命令也可以实现分布式锁,具体代码如下:
public static Boolean setNX(final String key, final String value, final int exp) {
return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
Jedis jedis = (Jedis) connection.getNativeConnection();
String result = jedis.set(key, value, "NX", "EX", exp);
if ("OK".equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
}
});
}
这里重点说说第3个和第4个参数,这里填的是NX,意思是当key不存在时,我们进行set操作,若key已经存在则不进行任何操作,第4个表示我们要给key设置一个过期时间,具体时间由第5个参数决定。
另外我们这里保存了key对应的value值,所以线程可以根据value值来释放锁,这里的value值可以是线程的ID,比如我们线程后面的逻辑执行失败了,我们可以通过这个value值来尽快释放锁,减少其它线程的等待时间,我们可以使用Lua脚本来实现
private static final Long RELEASE_SUCCESS = 1L;
private static final RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
public static Boolean releaseLock(final String key, final String value) {
return (Boolean) redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
Jedis jedis = (Jedis) connection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key),
Collections.singletonList(value));
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
}
});
}
通过Lua脚本获取对应key的value值,如果value值和给定的一样,则释放锁