`
zhao_rock
  • 浏览: 188019 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

zookeeper注册服务与netty的简单结合应用

阅读更多

简单介绍一下zookeeper和netty
Netty:流行的NIO Socket通信框架,很多开源软件如hadoop tachyon spark都使用Netty作为底层通信框架
Zookeeper:分布式的,开放源码的分布式应用程序协调服务,hadoop hbase等开源分布式系统的重要组件

 

应用场景描述:
利用Zookeeper的服务注册与发现功能,实现Netty通信集群的简单高可用。

 

首先NettyServer端需要将服务注册到zookeeper中,代码如下

package com.zookeeper;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import com.constance.Constant;

public class ServiceRegistry {

    private CountDownLatch latch = new CountDownLatch(1);
    private String registryAddress;

    public ServiceRegistry(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    //注册到zk中,其中data为服务端的 ip:port
    public void register(String data) {
        if (data != null) {
            ZooKeeper zk = connectServer();
            if (zk != null) {
                createNode(zk, data);
            }
        }
    }

    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
           zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }
   
    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes();
            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("create zookeeper node path:"+path+" data:"+data);
        } catch (Exception e) {
         e.printStackTrace();
        }
    }
}

 

NettyServer端启动及调用注册服务的代码

package com.main;

import com.constance.Constant;
import com.netty.Server;
import com.zookeeper.ServiceRegistry;

public class ServerMain {

 
 public static void main(String[] agrs){
  int port = 9988;
  //register service in zookeeper
  ServiceRegistry zsr = new ServiceRegistry(Constant.registryAddress);
  String serverIp = "123.123.123.123:9988";
  zsr.register(serverIp);
  //netty bind
  new Server().bind(port);
 }
}

 

NettyClient端启动时发现服务的代码

package com.main;

import com.constance.Constant;
import com.netty.Client;
import com.test.TestThread;
import com.zookeeper.ServiceDiscovery;

public class ClientMain {

 /**
  * @param args
  * @throws InterruptedException
  */
 public static void main(String[] args) throws InterruptedException {
  //find service from zookeeper
  ServiceDiscovery sd = new ServiceDiscovery(Constant.registryAddress);
  String serverIp = sd.discover();
  String[] serverArr = serverIp.split(":");
  System.out.println("ServerIP:"+serverArr[0]+"    ServerPort:"+serverArr[1]);
  String hostIP = "123.123.111.111";
  int port = 9988;
  new TestThread().test();
  //Client.connect(hostIP, port);
  Client.connect(serverArr[0], Integer.valueOf(serverArr[1]));
 }
}

 

NettyClient中zk发现服务的方法

package com.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import com.constance.Constant;

public class ServiceDiscovery {

    private CountDownLatch latch = new CountDownLatch(1);
    private volatile List<String> dataList = new ArrayList<String>();
    private String registryAddress;

   
    public ServiceDiscovery(String registryAddress) {
        this.registryAddress = registryAddress;
        ZooKeeper zk = connectServer();
        if (zk != null) {
            watchNode(zk);
        }
    }

    public String discover() {
        String data = null;
        int size = dataList.size();
        if (size > 0) {
            if (size == 1) {
                data = dataList.get(0);
            } else {
             //随机获取其中的一个
                data = dataList.get(ThreadLocalRandom.current().nextInt(size));
            }
        }
        return data;
    }

    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
         //format host1:port1,host2:port2,host3:port3
            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                 //zookeeper处于同步连通的状态时
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException e) {
         e.printStackTrace();
        } catch (InterruptedException e) {
   e.printStackTrace();
  }
        return zk;
    }

    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk);
                    }
                }
            });
            List<String> dataList = new ArrayList<String>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            this.dataList = dataList;
        } catch (Exception e) {
         e.printStackTrace();
        }
    }
}

 

ps:使用代码框时,生成的代码框总是跑到第一行,只能将代码粘贴在文本中

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics