Netty01---Netty实现简单通信

这个Demo的功能是客户端向服务端发送一个Hello Netty的消息,然后服务端又把消息返回给客户端

Server端

这些都是一些公共的代码,代码比较简单,服务端负责监听端口,Handler负责处理业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ServerProcessor {
public void process(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootStrap = new ServerBootstrap();
ChannelFuture cf;
bootStrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ServerMessageHandler());//server的handler
}
});

try {
cf = bootStrap.bind(8099).sync();//监听8099端口
System.out.println("8099:binded...");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
ServerProcessor serverProcessor = new ServerProcessor();
serverProcessor.process();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ChannelHandler.Sharable
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务已启动,等待客户连接");
}


//接收客户端发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;/*netty实现的缓冲区*/
System.out.println("Server Accept:"+in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)/*flush掉所有的数据*/
.addListener(ChannelFutureListener.CLOSE);/*当flush完成后,关闭连接*/
}
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ClientProcessor {

public void process() {
EventLoopGroup group =new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ClientMessageHandler());

}
});

ChannelFuture future = b.connect("127.0.0.1", 8099).sync();//连接端口

future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}

public static void main(String[] args) {
ClientProcessor clientProcessor = new ClientProcessor();
clientProcessor.process();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ClientMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8));
ctx.fireChannelActive();
}


@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("NettyClient accetp:" + byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}