站长资源数据库
浅析Redis分布式锁
简介近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个Job来完成定时任务. 前期考虑的方案有采用ZooKeeper分布式任务,Quartz分布式任务调度,但是由于Zookeeper需
近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个Job来完成定时任务. 前期考虑的方案有采用ZooKeeper分布式任务,Quartz分布式任务调度,但是由于Zookeeper需要增加额外组件,Quartz需要增加表,并且项目中现在已经有Redis这一组件存在,所以考虑采用Redis分布式锁的情况来完成分布式任务抢占这一功能
记录一下走过的弯路.
第一版本:
@Override public <T> Long set(String key,T value, Long cacheSeconds) { if (value instanceof HashMap) { BoundHashOperations valueOperations = redisTemplate.boundHashOps(key); valueOperations.putAll((Map) value); valueOperations.expire(cacheSeconds, TimeUnit.SECONDS); } else{ //使用map存储 BoundHashOperations valueOperations = redisTemplate.boundHashOps(key); valueOperations.put(key, value); //秒 valueOperations.expire(cacheSeconds, TimeUnit.SECONDS); } return null; } @Override public void del(String key) { redisTemplate.delete(key); }
采用set 和 del 完成锁的占用与释放,后经测试得知,set不是线程安全,在并发情况下常常会导致数据不一致.
第二版本:
/** * 分布式锁 * @param range 锁的长度 允许有多少个请求抢占资源 * @param key * @return */ public boolean getLock(int range, String key) { ValueOperations<String, Integer> valueOper1 = template.opsForValue(); return valueOper1.increment(key, 1) <= range; } /** * 初始化锁, 设置等于0 * @param key * @param expireSeconds * @return */ public void initLock(String key, Long expireSeconds) { ValueOperations<String, Integer> operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); operations.set(key, 0, expireSeconds * 1000); } /** * 释放锁 * @param key */ public void releaseLock(String key) { ValueOperations<String, Integer> operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.delete(key); }
采用redis的 increament操作完成锁的抢占.但是释放锁时,是每个线程都可以删除redis中的key值. 并且initLock会降上一次的操作给覆盖掉,所以也废弃掉此方法
最终版本:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.stereotype.Service; import org.springframework.util.ReflectionUtils; import redis.clients.jedis.Jedis; import java.lang.reflect.Field; import java.util.Collections; @Service public class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisConnectionFactory connectionFactory; /** * 尝试获取分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public boolean lock(String lockKey, String requestId, int expireTime) { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } public Jedis getJedis() { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); return jedis; } }