NIO – Netty代码示例

以下代码代表一个加法。客户端提交加数,服务端回送“和”。

服务端

package player.kent.chen.temp.learnnetty.raw;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

/**
 * 报文格式:request=a+b, response=c
 * 
 * @author 陈坚 2013年6月18日下午2:47:04
 */

public class PlayNettyAddServer {
    //server关闭时会用到这个变量
    static final ChannelGroup allChannels = new DefaultChannelGroup();

    //真正处理报文的
    private static final class AddServerHandler extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            //拿到request报文
            ChannelBuffer inputBuf = (ChannelBuffer) e.getMessage();
            String requestStr = new String(inputBuf.array());

            //解析request报文并计算
            String[] params = requestStr.split("\\+");
            if (params.length != 2) {
                return;
            }
            int a = Integer.parseInt(params[0].trim());
            int b = Integer.parseInt(params[1].trim());
            String result = String.valueOf(a + b);

            //回送response报文
            Channel channel = e.getChannel();
            ChannelBuffer outputBuf = ChannelBuffers.buffer(result.length());
            outputBuf.writeBytes(result.getBytes());
            channel.write(outputBuf);
        }

        @Override
        //异常处理
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getCause().printStackTrace();
            e.getChannel().close();
        }

    }

    //启动入口
    public static void main(String[] args) throws InterruptedException {
        ExecutorService bossThreadPool = Executors.newCachedThreadPool(); //侦听线程池
        ExecutorService workerThreadPool = Executors.newCachedThreadPool(); //工作线程池        
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(bossThreadPool,
                workerThreadPool); //channelFactory用来产生和管理channel

        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); //a helper class that sets up a server
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception { //pipeline可以理解为handler的集合,一个接一个地处理
                return Channels.pipeline(new AddServerHandler());
            }
        });

        //设定一些通信参数
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        //在某端口启动
        Channel channel = bootstrap.bind(new InetSocketAddress(18318));
        System.out.println("服务器已启动,并将在20秒后关闭");
        allChannels.add(channel);
        Thread.sleep(20 * 1000);

        //开始释放资源
        System.out.println("服务器开始关闭...");
        System.out.println("关闭所有channel");
        ChannelGroupFuture closeFuture = allChannels.close();
        closeFuture.awaitUninterruptibly();
        System.out.println("释放外部资源");
        channelFactory.releaseExternalResources();
    }
}

客服端

package player.kent.chen.temp.learnnetty.raw;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class PlayNettyClient {

    private static final class AddClientHandler extends SimpleChannelHandler {

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
                throws Exception {
            System.out.println("channel连通");
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("channel关闭");
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
            String result = new String(buf.array());
            System.out.println("The add result is: " + result);
        }

        @Override
        //异常处理
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getCause().printStackTrace();
            e.getChannel().close();
        }

    }

    //client入口
    public static void main(String[] args) throws InterruptedException {

        ExecutorService bossThreadPool = Executors.newCachedThreadPool(); //主线程池
        ExecutorService workerThreadPool = Executors.newCachedThreadPool(); //从线程池        
        ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossThreadPool,
                workerThreadPool);

        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new AddClientHandler());
            }
        });

        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);

        //连上服务器
        ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(18318));

        //发出一个请求
        Channel channel = connectFuture.awaitUninterruptibly().getChannel();
        ChannelBuffer requestBuf = ChannelBuffers.dynamicBuffer();
        requestBuf.writeBytes(" 82 + 28 ".getBytes());
        ChannelFuture writeFuture = channel.write(requestBuf);
        writeFuture.awaitUninterruptibly();

        //等待断开
        ChannelFuture closeFuture = connectFuture.getChannel().getCloseFuture();
        closeFuture.awaitUninterruptibly();
        System.out.println("连接被断开");

        //释放资源
        channelFactory.releaseExternalResources();

    }

}

Leave a Comment

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.