Zookeeper应用场景之分布式屏障Barrier
Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。 分布式barrier一般出现在类似这样的场景,某个任务最终的执行需要基于很多并行计算的子结果。
项目demo地址:
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| package cn.radarsoft.barrier;
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;
public class BarrierQueue implements Watcher{
private static final String Addr = "zk集群地址";
private String root = null;
private ZooKeeper zk = null;
private static CountDownLatch latch = new CountDownLatch(1);
private static final CountDownLatch countDownLatch = new CountDownLatch(10);
public BarrierQueue(String root) { this.root = root; try { zk = new ZooKeeper(Addr, 3000, this); if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.setData(root, "10".getBytes(), -1); } }
} catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace();
}
}
void add(String path,CountDownLatch countDownLatch) { try { if(null != zk){ zk.exists(root + "/start", true); zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); List<String> list = zk.getChildren(root, false); System.out.println("子节点的个数:" + list.size() + ",跟节点默认参考值:" + Integer.parseInt(new String(zk.getData(root,false, new Stat()))) ); if (list.size() < Integer.parseInt(new String(zk.getData(root,false, new Stat())))) { countDownLatch.countDown(); } else { if (null == zk.exists(root + "/start", false)) { zk.create(root + "/start", new byte[0],Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }
@Override public void process(WatchedEvent event) { if ((root + "/start").equals(event.getPath())&& event.getType() == EventType.NodeCreated) { System.out.println(root + "/start" + "---" + "节点被传建了"); try { List<String> list = zk.getChildren(root, false); for (final String node : list) { if(!"start".equals(node)){ System.out.println(node); } } System.out.println("所以人到齐,开始吃饭"); countDownLatch.countDown(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }else if(event.getState() == KeeperState.SyncConnected){ latch.countDown(); } }
}
|
测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @SpringBootTest public class ZooKeeperBarrierTest {
private static CountDownLatch latch = new CountDownLatch(1);
private static final CountDownLatch countDownLatch = new CountDownLatch(10);
private static String subNode = "/element";
@Test public void barrier() throws InterruptedException { try { final BarrierQueue queue2 = new BarrierQueue("/queue_barrier"); latch.await(); for (int i = 0; i < 10; i++) { queue2.add("/queue_barrier" + subNode,countDownLatch); } } catch (Exception e) { e.printStackTrace(); } countDownLatch.await(); } }
|
zookeeper实现分布式屏障思路
某个node路径为”/queue_barrier”,在该节点下有个子节点给子节点赋值为某个值,假设为10,当根路径”/queue_barrier”下的子节点个数为10时,则所有子进程都完成了任务,主进程开始执行。
基于zookeeper的节点类型,创建临时连续的节点会在创建的节点后给节点名加上一个数字后缀,基于这个顺序,我们可以有如下的思路
1:通过调用getData()来获取某个节点的值,假设为10
2:调用getChildren()来获取所有的子节点,同时注册watcher监听
3:统计子节点的个数
4:将统计的个数和getData()获取的值比较,如果还不足10,就需要等待
5:接收watcher通知
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 子节点的个数:1,跟节点默认参考值:10 子节点的个数:2,跟节点默认参考值:10 子节点的个数:3,跟节点默认参考值:10 子节点的个数:4,跟节点默认参考值:10 子节点的个数:5,跟节点默认参考值:10 子节点的个数:6,跟节点默认参考值:10 子节点的个数:7,跟节点默认参考值:10 子节点的个数:8,跟节点默认参考值:10 子节点的个数:9,跟节点默认参考值:10 子节点的个数:10,跟节点默认参考值:10 /queue_barrier/start---节点被传建了 element0000000008 element0000000009 element0000000006 element0000000007 element0000000004 element0000000005 element0000000002 element0000000003 element0000000000 element0000000001 所以人到齐,开始吃饭
|
查看zk节点数据:所有临时节点创建完成后,start节点被创建。
Curator实现
代码:
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties { public static final String CONNECT_ADDRESS = "192.168.1.3:9000"; public static final int CONNECTION_TIMEOUT_MS = 40000; public static final int SESSION_TIMEOUT_MS = 10000; public static final String NAMESPACE = "MyNamespace"; public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() { CuratorFramework curator = CuratorFrameworkFactory.builder() .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS) .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY) .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS) .namespace(CuratorFrameworkProperties.NAMESPACE) .build(); curator.start(); assert curator.getState().equals(CuratorFrameworkState.STARTED); return curator; } }
|
DistributedDoubleBarrierRunnable
类(实现了Runnable
接口,模拟分布式节点进入与离开分布式屏障):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.kaven.zookeeper;
import lombok.SneakyThrows; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import java.util.Random;
public class DistributedDoubleBarrierRunnable implements Runnable{ @SneakyThrows @Override public void run() { CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
int randomSleep = new Random().nextInt(20000); Thread.sleep(randomSleep);
String barrierPath = "/kaven";
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);
System.out.println(Thread.currentThread().getName() + " 等待进入屏障"); long start = System.currentTimeMillis(); barrier.enter(); System.out.println(Thread.currentThread().getName() + " 等待了 " + (System.currentTimeMillis() - start) / 1000 + " s"); System.out.println(Thread.currentThread().getName() + " 进入屏障"); Thread.sleep(1000); barrier.leave(); System.out.println(Thread.currentThread().getName() + " 离开屏障"); } }
|
测试:
1 2 3 4 5 6 7 8 9
| private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
@Test public void barrier() { for (int i = 0; i < 5; i++) { EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable()); } }
|
结果:
模拟5
个分布式节点进入与离开分布式屏障,输出如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| pool-1-thread-3 等待进入屏障 pool-1-thread-5 等待进入屏障 pool-1-thread-2 等待进入屏障 pool-1-thread-1 等待进入屏障 pool-1-thread-4 等待进入屏障 pool-1-thread-4 等待了 0 s pool-1-thread-4 进入屏障 pool-1-thread-3 等待了 9 s pool-1-thread-3 进入屏障 pool-1-thread-2 等待了 6 s pool-1-thread-2 进入屏障 pool-1-thread-1 等待了 5 s pool-1-thread-1 进入屏障 pool-1-thread-5 等待了 8 s pool-1-thread-5 进入屏障 pool-1-thread-1 离开屏障 pool-1-thread-3 离开屏障 pool-1-thread-5 离开屏障 pool-1-thread-2 离开屏障 pool-1-thread-4 离开屏障
|
This is copyright.