Zookeeper案例-分布式屏障Barrier

分布式屏障Barrier

Posted by John Doe on 2021-12-25
Words 1.7k and Reading Time 8 Minutes
Viewed Times

Zookeeper应用场景之分布式屏障Barrier

Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。 分布式barrier一般出现在类似这样的场景,某个任务最终的执行需要基于很多并行计算的子结果。

项目demo地址:

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package cn.radarsoft.barrier;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


public class BarrierQueue implements Watcher{

private static final String Addr = "zk集群地址";

private String root = null;

private ZooKeeper zk = null;

private static CountDownLatch latch = new CountDownLatch(1);

private static final CountDownLatch countDownLatch = new CountDownLatch(10);

public BarrierQueue(String root) {
this.root = root;
try {
// 连接zk服务器
zk = new ZooKeeper(Addr, 3000, this);
if (zk != null) {
// 建立根目录节点
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk.setData(root, "10".getBytes(), -1);
}
}

} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();

}

}

void add(String path,CountDownLatch countDownLatch) {
try {
if(null != zk){
// 设置一个监控的标志,当大小为10时,所有子节点都已经创建完毕,进行主流程处理
zk.exists(root + "/start", true);
zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> list = zk.getChildren(root, false);
System.out.println("子节点的个数:" + list.size() + ",跟节点默认参考值:" + Integer.parseInt(new String(zk.getData(root,false, new Stat()))) );
if (list.size() < Integer.parseInt(new String(zk.getData(root,false, new Stat())))) {
countDownLatch.countDown();
} else {
if (null == zk.exists(root + "/start", false)) {
zk.create(root + "/start", new byte[0],Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}


@Override
public void process(WatchedEvent event) {
if ((root + "/start").equals(event.getPath())&& event.getType() == EventType.NodeCreated) {
System.out.println(root + "/start" + "---" + "节点被传建了");
try {
List<String> list = zk.getChildren(root, false);
for (final String node : list) {
if(!"start".equals(node)){
System.out.println(node);
}
}
System.out.println("所以人到齐,开始吃饭");
countDownLatch.countDown();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}else if(event.getState() == KeeperState.SyncConnected){
latch.countDown();
}
}

}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SpringBootTest
public class ZooKeeperBarrierTest {

private static CountDownLatch latch = new CountDownLatch(1);

private static final CountDownLatch countDownLatch = new CountDownLatch(10);

private static String subNode = "/element";

@Test
public void barrier() throws InterruptedException {
try {
final BarrierQueue queue2 = new BarrierQueue("/queue_barrier");
latch.await();
for (int i = 0; i < 10; i++) {
queue2.add("/queue_barrier" + subNode,countDownLatch);
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
}
}

zookeeper实现分布式屏障思路

某个node路径为”/queue_barrier”,在该节点下有个子节点给子节点赋值为某个值,假设为10,当根路径”/queue_barrier”下的子节点个数为10时,则所有子进程都完成了任务,主进程开始执行。
基于zookeeper的节点类型,创建临时连续的节点会在创建的节点后给节点名加上一个数字后缀,基于这个顺序,我们可以有如下的思路
1:通过调用getData()来获取某个节点的值,假设为10
2:调用getChildren()来获取所有的子节点,同时注册watcher监听
3:统计子节点的个数
4:将统计的个数和getData()获取的值比较,如果还不足10,就需要等待
5:接收watcher通知

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
子节点的个数:1,跟节点默认参考值:10
子节点的个数:2,跟节点默认参考值:10
子节点的个数:3,跟节点默认参考值:10
子节点的个数:4,跟节点默认参考值:10
子节点的个数:5,跟节点默认参考值:10
子节点的个数:6,跟节点默认参考值:10
子节点的个数:7,跟节点默认参考值:10
子节点的个数:8,跟节点默认参考值:10
子节点的个数:9,跟节点默认参考值:10
子节点的个数:10,跟节点默认参考值:10
/queue_barrier/start---节点被传建了
element0000000008
element0000000009
element0000000006
element0000000007
element0000000004
element0000000005
element0000000002
element0000000003
element0000000000
element0000000001
所以人到齐,开始吃饭

查看zk节点数据:所有临时节点创建完成后,start节点被创建。

Curator实现

代码:

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息,以及创建CuratorFramework实例的方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}

DistributedDoubleBarrierRunnable类(实现了Runnable接口,模拟分布式节点进入与离开分布式屏障):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;

import java.util.Random;

public class DistributedDoubleBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(20000);
Thread.sleep(randomSleep);

// 分布式屏障的路径
String barrierPath = "/kaven";

// 创建DistributedDoubleBarrier实例,用于提供分布式屏障功能
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);

System.out.println(Thread.currentThread().getName() + " 等待进入屏障");
long start = System.currentTimeMillis();
// 等待进入屏障
barrier.enter();
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
System.out.println(Thread.currentThread().getName() + " 进入屏障");
Thread.sleep(1000);
// 等待离开屏障
barrier.leave();
System.out.println(Thread.currentThread().getName() + " 离开屏障");
}
}

测试:

1
2
3
4
5
6
7
8
9
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

@Test
public void barrier() {
// 分布式节点处理业务
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable());
}
}

结果:

模拟5个分布式节点进入与离开分布式屏障,输出如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pool-1-thread-3 等待进入屏障
pool-1-thread-5 等待进入屏障
pool-1-thread-2 等待进入屏障
pool-1-thread-1 等待进入屏障
pool-1-thread-4 等待进入屏障
pool-1-thread-4 等待了 0 s
pool-1-thread-4 进入屏障
pool-1-thread-3 等待了 9 s
pool-1-thread-3 进入屏障
pool-1-thread-2 等待了 6 s
pool-1-thread-2 进入屏障
pool-1-thread-1 等待了 5 s
pool-1-thread-1 进入屏障
pool-1-thread-5 等待了 8 s
pool-1-thread-5 进入屏障
pool-1-thread-1 离开屏障
pool-1-thread-3 离开屏障
pool-1-thread-5 离开屏障
pool-1-thread-2 离开屏障
pool-1-thread-4 离开屏障

This is copyright.

...

...

00:00
00:00