例示:用Zookeeper搭建服务注册中心

用Zookeeper搭建一个服务注册中心:不断有ServiceProvider上来注册自己的进程ID或者与注册中心断开,ServiceConsumer则捕捉这些变化。

1.创建一个根结点,所有注册信息都将挂这个结点的下面

 
public class RegistryStartup {

    public static void main(String[] args) throws Exception {

        ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
        while (zk.getState() != States.CONNECTED) {
            ;//wait
        }

        Stat rootExists = zk.exists("/serviceProviderRoot", false);
        if (rootExists == null) {
            zk.create("/serviceProviderRoot", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        System.out.println("The node '/serviceProviderRoot' has been created");
    }
}

2.ServiceProvider注册自己的进程号

<!--pom.xml, 引入一个库,以帮助获取当前进程的ID-->

<dependency>
  <groupId>net.java.dev.jna</groupId>
  <artifactId>jna</artifactId>
  <version>3.5.1</version>
</dependency>
public class ServiceProvider {

    public static void main(String[] args) throws Exception {

        String pid = String.valueOf(CLibrary.INSTANCE.getpid()); //自己的进程号
        ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
        while (zk.getState() != States.CONNECTED) {
            ;//wait
        }
        System.out.println("Process " + pid
                + " connected to the Config Server as Service Provider ");

        byte[] pidData = pid.getBytes();

        //注册自己的进程号,注意要使用EPHEMERAL类型的结点,这样当自己下线时相应的注册信息也会自动消失
        //把自己当作某个根结点的子结点来注册
        String path = zk.create("/serviceProviderRoot/child", pidData, Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Process " + pid + " registered as znoed in path: " + path);

        //永远睡眠,以保持本进程在线
        Thread.sleep(Long.MAX_VALUE);

    }

    private static interface CLibrary extends Library {
        CLibrary INSTANCE = (CLibrary) Native.loadLibrary("c", CLibrary.class);

        int getpid();
    }
}

3.ServiceConsumer监听Provider的变化

public class ServiceConsumer implements Watcher {//要实现Watcher接口

    public static void main(String[] args) throws Exception {

        //连一下
        ZooKeeper zk = new ZooKeeper("localhost", 1000, null);
        while (zk.getState() != States.CONNECTED) {
            ;//wait
        }
        System.out.println("Connected to the Config Server as Service Consumer ");

        //创建本类的一个对象,用作一个Watcher
        ServiceConsumer consumer = new ServiceConsumer(zk);

        //先处理一下线上现有的provider
        consumer.dealWithAllProviders();

        //防止主线程终止;剩下的活都由Watcher的监视线程来干
        Thread.sleep(Long.MAX_VALUE);
    }

    private final ZooKeeper zk;

    ServiceConsumer(ZooKeeper zk) {
        super();
        this.zk = zk;
    }

    //处理当前现有的provider
    private void dealWithAllProviders() {
        try {
            //找出当前所有的provider结点(znode)
            //另外注意这里的第二个参数"this", 它的意思是找出所有结点后又立即注册一个watch行为,以监视新的变化
            List<String> pathList = zk.getChildren("/serviceProviderRoot", this);
            if (pathList.isEmpty()) {
                return;
            }

            //打印所有的provider进程号
            List<String> processList = new ArrayList<String>();
            for (String path : pathList) {
                byte[] data = zk.getData("/serviceProviderRoot/" + path, false, null);
                processList.add(new String(data));
            }
            System.out.println("Providers are: " + processList);

            //跟其中一个provider交互一下:通过kill -3 ”恐吓“ 它一下
            Runtime.getRuntime().exec("kill -3 " + processList.get(processList.size() / 2));

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    //本watcher的监视方法
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.NodeChildrenChanged) {//遇子结点(provider)变化,则处理一下所有子结点
            dealWithAllProviders();
        }
    }

}

4.运行

1. RegistryStartup要先跑一下

2. ServiceProvider和ServiceConumer随便谁先运行

3. 不时地启动新的ServiceProvider或终止运行中的ServiceProvider,观察一下ServiceConumer的控制台输出和运行中的所有的ServiceProvider的输出(当provider列表变动时,总有一个进程会打印线程栈)

Leave a Comment

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.