有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。单机的可以使用ReentrantLock或者synchronized代码块来实现,但是这些API在分布式场景中就无能为力了。
针对分布式锁的实现,目前比较常用的有以下几种方案:基于数据库, 基于缓存(redis,memcached),基于Zookeeper。
应用的场景例子:
管理后台的部署架构(多台tomcat服务器+redis【多台tomcat服务器访问一台redis】+mysql【多台tomcat服务器访问一台服务器上的mysql】)就满足使用分布式锁的条件。多台服务器要访问redis全局缓存的资源,如果不使用分布式锁就会出现问题。 看如下伪代码:
long N=0L;
//N从redis获取值
if(N<5){
N++;
//N写回redis
}
从redis获取值N,对数值N进行边界检查,自加1,然后N写回redis中。 这种应用场景很常见,像秒杀,全局递增ID、IP访问限制等。
基于redis:
使用 SETNX key value 命令。
设置成功,返回1,加锁。该客户端最后可以通过DEL key来释放该锁。
设置失败,返回0,获取锁失败。这时我们可以先返回或进行重试等对方完成或等待锁超时。
存在的问题:
- 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直存在,其他线程无法再获得到锁。
- 这把锁只能是非阻塞的,无论成功还是失败都直接返回。
- 这把锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在tair中已经存在。无法再执行put操作。
解决方式:
- 用put方法支持传入失效时间,到达时间之后数据会自动删除。
- while重复执行。
- 在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。
仍然存在的问题:
- 为什么不直接使用expire设置超时时间,而将时间的毫秒数其作为value放在redis中。因为假如在setnx后,redis崩溃了,expire就没有执行(set和expire设置失效时间只能分两步执行),结果就是死锁了。锁永远不会超时。
- 对于失效时间,如果如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在。
具体实现:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
private String lockKey;
//锁超时时间,防止线程在入锁以后,无限的执行等待
private int expireMsecs = 60 * 1000;
//获取锁失败后,锁等待时间。防止无限期等待
private int timeoutMsecs = 10 * 1000;
//是否加锁标志位
private volatile boolean locked = false;
//重载构造方法
public RedisLock(RedisTemplate redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey + "_lock";
}
public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
this(redisTemplate, lockKey);
this.timeoutMsecs = timeoutMsecs;
}
public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
this(redisTemplate, lockKey, timeoutMsecs);
this.expireMsecs = expireMsecs;
}
public String getLockKey() {
return lockKey;
}
//redis在存储数据时,都把数据转化成了byte[]数组的形式,那么在存取数据时,需要用到Serializer将数据格式进行转化。
//对redis进行get操作
private String get(final String key) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] data = connection.get(serializer.serialize(key));
connection.close();
if (data == null) {
return null;
}
return serializer.deserialize(data);
}
});
} catch (Exception e) {
logger.error("get redis error, key : {}", key);
}
return obj != null ? obj.toString() : null;
}
//对redis进行setNX操作
private boolean setNX(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
connection.close();
return success;
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (Boolean) obj : false;
}
//对redis进行getset操作
private String getSet(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
connection.close();
return serializer.deserialize(ret);
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (String) obj : null;
}
/**
* 使用setnx命令,缓存了锁。, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
* 执行过程:
* 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁。
* 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值,成功获得锁。
*/
public synchronized boolean lock() throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires); //锁到期时间
//加锁。如果不存在加锁成功,返回true。
if (this.setNX(lockKey, expiresStr)) {
//得到锁
locked = true;
return true;
}
//lockKey已存在,获取lockKey里的时间。
String currentValueStr = this.get(lockKey);
//判断是否为空,是否过期。
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
//过期,设置现在的锁到期时间。只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的。
String oldValueStr = this.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
//防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
//[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
//得到锁
locked = true;
return true;
}
}
timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
}
//已存在lockKey,且未过期,则获取锁失败
return false;
}
//解锁。直接删除lockKey即可
public synchronized void unlock() {
if (locked) {
redisTemplate.delete(lockKey);
locked = false;
}
}
public static void main(String[] args) throws Exception {
RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
try{
if(lock.lock()) {
//需要加锁的代码
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而超时,此时锁已经被别人获得,这时就不必解锁了。
lock.unlock();
}
}
}
基于zookeeper:
- 加锁:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的节点下生成一个唯一的临时(挥发性)有序(自增长)节点。
- 获取锁:调用getChildren(“locker”)来获取locker下面的所有子节点??突Ф嘶袢〉剿械淖咏诘鉷ath之后,如果发现自己在之前创建的子节点序号最?。ㄗ栽龀ぃ鹊较鹊茫?,那么就认为该客户端获取到了锁。
- 阻塞:如果创建的节点并非locker所有子节点中最小的,则没有获取到锁,此时找到比自己小的那个节点,然后对其调用exist()方法,并注册监听器。如果这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如果是则获取到了锁,否则重复以上步骤。
- 释放锁:只需将这个临时节点删除即可。
左边的整个区域表示一个Zookeeper集群。locker是Zookeeper的一个持久节点。node_x是locker这个持久节点下面的临时顺序节点。client_x表示多个客户端。Service表示需要互斥访问的共享资源。
Zookeeper如何解决前面提到的问题:
- 锁无法释放?客户端可以在Zookeeper中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
- 非阻塞锁? 客户端可以通过在ZK中创建顺序(自增长)节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
- 不可重入? 客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中。下次想要获取锁的时候和当前最小的节点中的数据比对,如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
仍然存在的问题:
- 性能上没有缓存高。每次都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
- 如果由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁??梢陨柚弥厥圆呗?。多次重试之后还不行的话才会删除临时节点。
curator实现zookeeper的分布式锁:
package bjsxt.curator.lock;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Lock {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.0.4:2181,192.168.0.9:2181,192.168.0.6:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;
static int count = 10;
public static void genarNo(){
try {
count--;
System.out.println(Thread.currentThread().getName()+" : "+count);
} finally {
}
}
public static void main(String[] args) throws Exception {
//1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
//.namespace("super")
.build();
//3 开启连接
cf.start();
//4 分布式锁
final InterProcessMutex lock = new InterProcessMutex(cf, "/lockpath");
final CountDownLatch countdown = new CountDownLatch(1);
for(int i = 0; i < 10; i++){
//循环开始10个线程,模仿多个客户端同时操作
new Thread(new Runnable() {
@Override
public void run() {
try {
countdown.await();
//加锁
lock.acquire();
//-------------业务处理开始
genarNo();
//-------------业务处理结束
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "t"+i ).start();
}
Thread.sleep(100);
countdown.countDown();
}
}
运行结果为依次打印出9876543210。
不加锁的话,一种可能的结果为:7564271703。
参考:
http://08643.cn/p/c77a5257303a
http://blog.csdn.net/pengshuai128/article/details/70593995
http://www.cnblogs.com/520playboy/p/6441651.html