用Zookeeper搭建一个服务注册中心:不断有ServiceProvider上来注册自己的进程ID或者与注册中心断开,ServiceConsumer则捕捉这些变化。
1.创建一个根结点,所有注册信息都将挂这个结点的下面
public class RegistryStartup {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
while (zk.getState() != States.CONNECTED) {
;//wait
}
Stat rootExists = zk.exists("/serviceProviderRoot", false);
if (rootExists == null) {
zk.create("/serviceProviderRoot", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
System.out.println("The node '/serviceProviderRoot' has been created");
}
}
2.ServiceProvider注册自己的进程号
<!--pom.xml, 引入一个库,以帮助获取当前进程的ID--> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>3.5.1</version> </dependency>
public class ServiceProvider {
public static void main(String[] args) throws Exception {
String pid = String.valueOf(CLibrary.INSTANCE.getpid()); //自己的进程号
ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
while (zk.getState() != States.CONNECTED) {
;//wait
}
System.out.println("Process " + pid
+ " connected to the Config Server as Service Provider ");
byte[] pidData = pid.getBytes();
//注册自己的进程号,注意要使用EPHEMERAL类型的结点,这样当自己下线时相应的注册信息也会自动消失
//把自己当作某个根结点的子结点来注册
String path = zk.create("/serviceProviderRoot/child", pidData, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Process " + pid + " registered as znoed in path: " + path);
//永远睡眠,以保持本进程在线
Thread.sleep(Long.MAX_VALUE);
}
private static interface CLibrary extends Library {
CLibrary INSTANCE = (CLibrary) Native.loadLibrary("c", CLibrary.class);
int getpid();
}
}
3.ServiceConsumer监听Provider的变化
public class ServiceConsumer implements Watcher {//要实现Watcher接口
public static void main(String[] args) throws Exception {
//连一下
ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
while (zk.getState() != States.CONNECTED) {
;//wait
}
System.out.println("Connected to the Config Server as Service Consumer ");
//创建本类的一个对象,用作一个Watcher
ServiceConsumer consumer = new ServiceConsumer(zk);
//先处理一下线上现有的provider
consumer.dealWithAllProviders();
//防止主线程终止;剩下的活都由Watcher的监视线程来干
Thread.sleep(Long.MAX_VALUE);
}
private final ZooKeeper zk;
ServiceConsumer(ZooKeeper zk) {
super();
this.zk = zk;
}
//处理当前现有的provider
private void dealWithAllProviders() {
try {
//找出当前所有的provider结点(znode)
//另外注意这里的第二个参数"this", 它的意思是找出所有结点后又立即注册一个watch行为,以监视新的变化
List<String> pathList = zk.getChildren("/serviceProviderRoot", this);
if (pathList.isEmpty()) {
return;
}
//打印所有的provider进程号
List<String> processList = new ArrayList<String>();
for (String path : pathList) {
byte[] data = zk.getData("/serviceProviderRoot/" + path, false, null);
processList.add(new String(data));
}
System.out.println("Providers are: " + processList);
//跟其中一个provider交互一下:通过kill -3 ”恐吓“ 它一下
Runtime.getRuntime().exec("kill -3 " + processList.get(processList.size() / 2));
} catch (Exception e) {
e.printStackTrace();
}
}
//本watcher的监视方法
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {//遇子结点(provider)变化,则处理一下所有子结点
dealWithAllProviders();
}
}
}
4.运行
1. RegistryStartup要先跑一下
2. ServiceProvider和ServiceConumer随便谁先运行
3. 不时地启动新的ServiceProvider或终止运行中的ServiceProvider,观察一下ServiceConumer的控制台输出和运行中的所有的ServiceProvider的输出(当provider列表变动时,总有一个进程会打印线程栈)