为什么需要分布式锁
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁应该具备哪些条件
在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
原生 Zookeeper 实现分布式锁案例
1)分布式锁实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedLock { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int sessionTimeout = 2000; private ZooKeeper zk; private String rootNode = "locks"; private String subNode = "seq-"; private String waitPath; private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String currentNode; public DistributedLock() throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); connectLatch.await(); Stat stat = zk.exists("/" + rootNode, false); if (stat == null) { System.out.println("根节点不存在"); zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } public void zkLock() { try { currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Thread.sleep(10); List<String> childrenNodes = zk.getChildren("/" + rootNode, false); client 获得锁 if (childrenNodes.size() == 1) { return; } else { Collections.sort(childrenNodes); String thisNode = currentNode.substring(("/" + rootNode + "/").length()); int index = childrenNodes.indexOf(thisNode); if (index == -1) { System.out.println("数据异常"); } else if (index == 0) { client 获得锁 return; } else { this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1); zookeeper 会回调监听器的 process 方法 zk.getData(waitPath, true, new Stat()); waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void zkUnlock() { try { zk.delete(this.currentNode, -1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }
|
2)分布式锁测试
(1)创建两个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import org.apache.zookeeper.KeeperException; import java.io.IOException;
public class DistributedLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { try { lock1.zkLock(); System.out.println("线程 1 获取锁"); Thread.sleep(5 * 1000); lock1.zkUnlock(); System.out.println("线程 1 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zkLock(); System.out.println("线程 2 获取锁"); Thread.sleep(5 * 1000); lock2.zkUnlock(); System.out.println("线程 2 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
|
(2)观察控制台变化:
1 2 3 4
| 线程 1 获取锁 线程 1 释放锁 线程 2 获取锁 线程 2 释放锁
|
Curator 框架实现分布式锁案例
1)原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2)Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。 详情请查看官方文档:https://curator.apache.org/index.html
3)Curator 案例实操
(1)添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
|
(2)代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest { private String rootNode = "/locks"; private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int connectionTimeout = 2000; private int sessionTimeout = 2000; public static void main(String[] args) { new CuratorLockTest().test(); } private void test() { final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode); final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode); new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程 1 获取锁"); lock1.acquire(); System.out.println("线程 1 再次获取锁"); Thread.sleep(5 * 1000); lock1.release(); System.out.println("线程 1 释放锁"); lock1.release(); System.out.println("线程 1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程 2 获取锁"); lock2.acquire(); System.out.println("线程 2 再次获取锁"); Thread.sleep(5 * 1000); lock2.release(); System.out.println("线程 2 释放锁"); lock2.release(); System.out.println("线程 2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } public CuratorFramework getCuratorFramework (){ RetryPolicy policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .connectionTimeoutMs(connectionTimeout) .sessionTimeoutMs(sessionTimeout) .retryPolicy(policy).build(); client.start(); System.out.println("zookeeper 初始化完成..."); return client; } }
|
(3)观察控制台变化:
1 2 3 4 5 6 7 8
| 线程 1 获取锁 线程 1 再次获取锁 线程 1 释放锁 线程 1 再次释放锁 线程 2 获取锁 线程 2 再次获取锁 线程 2 释放锁 线程 2 再次释放锁
|
This is copyright.