Netty系列四:第一个Netty程序(业务线程异步)

有了之前的基础之后,我们从netty官网的示例(略做修改),来开始netty之旅。我们实现一个支持hello world版的netty程序。

首先我们创建一个主类:侦听 http端口,启动服务

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * An HTTP server that sends back the content of the received HTTP request
 * in a pretty plaintext form.
 */
public final class HttpHelloWorldServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8080"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup(10);//源码内部默认取的是 2*n,  N=CPU数量 , 此处注意生产应该以压测结果为准。
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);//标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpHelloWorldServerInitializer(sslCtx));

            System.out.println("服务启动成功,访问地址:" + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
            Channel ch = b.bind(PORT).sync().channel();
            ch.closeFuture().sync();
            /* 单个netty是可以侦听多个端口的,一个端口一条线程,如果需要侦听多个端口,如下所示:
            List<Integer> ports = Arrays.asList(8080, 8081);
            Collection<Channel> channels = new ArrayList<>(ports.size());
            for (int port : ports) {
                Channel serverChannel = b.bind(port).sync().channel();
                channels.add(serverChannel);
            }
            for (Channel ch : channels) {
                ch.closeFuture().sync();
            } */
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

HttpHelloWorldServerInitializer类中我们添加work线程的handler

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    //private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
    //p.addLast(group, "handler", new HttpHelloWorldServerHandler2());//业务线程独立的线程池

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        /*注意此处的超时时间不是客户端TCP连接的超时时间,而是服务器处理的时间,如果超时,那么就会触发handler里面的exceptionCaught */
        p.addLast(new ReadTimeoutHandler( 10));//服务器端设置超时时间,单位:秒
        p.addLast(new WriteTimeoutHandler(10));//服务器端设置超时时间,单位:秒
        p.addLast(new HttpServerCodec());//对http通信数据进行编解码
        p.addLast(new HttpHelloWorldServerHandler()); //业务handler
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

HttpHelloWorldServerHandler ,work 线程池里面的handler不能放耗时比较大的业务逻辑,否则会导致netty工作现成阻塞,所以我们再启动一个单例的异步业务线程池,处理业务逻辑。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {

    private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("IO线程处理完毕:" + Thread.currentThread().getThreadGroup()+":"+Thread.currentThread().getName());
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws  Exception{
        BusinessThreadUtil.doBusiness(ctx, msg, CONTENT);//handle中,可以使用异步的线程池,处理业务。防止handler卡住,导致netty并发性能不佳
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if(cause instanceof ReadTimeoutException||cause instanceof WriteTimeoutException) {
            System.out.println("超时了:" + cause.toString());
        }
        ctx.close();//直接关闭channel
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

业务逻辑:BusinessThreadUtil


import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;


public class BusinessThreadUtil {

    private static final ExecutorService executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));//CPU核数4-10倍

    public static void doBusiness(ChannelHandlerContext ctx, Object msg, byte[] content) {
        //异步线程池处理
        executor.submit( () -> {
            if (msg instanceof HttpRequest) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Thread.currentThread().setName("buessness-thread");
                System.out.println(Thread.currentThread().getId());
                HttpRequest req = (HttpRequest) msg;
                boolean keepAlive = HttpUtil.isKeepAlive(req);
                FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(content));
                response.headers().set("Content-Type", "text/plain");
                response.headers().setInt("Content-Length", response.content().readableBytes());
                if (!keepAlive) {
                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                } else {
                    response.headers().set("Connection", "keep-alive");
                    ctx.writeAndFlush(response);
                }
            }
        });
    }
}

 

HttpHelloWorldServerInitializer类中我们添加work线程的handler

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    //private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
    //p.addLast(group, "handler", new HttpHelloWorldServerHandler2());//业务线程独立的线程池

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        /*注意此处的超时时间不是客户端TCP连接的超时时间,而是服务器处理的时间,如果超时,那么就会触发handler里面的exceptionCaught */
        p.addLast(new ReadTimeoutHandler( 10));//服务器端设置超时时间,单位:秒
        p.addLast(new WriteTimeoutHandler(10));//服务器端设置超时时间,单位:秒
        p.addLast(new HttpServerCodec());//对http通信数据进行编解码
        p.addLast(new HttpHelloWorldServerHandler()); //业务handler
    }
}

HttpHelloWorldServerHandler ,work 线程池里面的handler不能放耗时比较大的业务逻辑,否则会导致netty工作现成阻塞,所以我们再启动一个单例的异步业务线程池,处理业务逻辑。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {

    private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("IO线程处理完毕:" + Thread.currentThread().getThreadGroup()+":"+Thread.currentThread().getName());
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws  Exception{
        BusinessThreadUtil.doBusiness(ctx, msg, CONTENT);//handle中,可以使用异步的线程池,处理业务。防止handler卡住,导致netty并发性能不佳
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if(cause instanceof ReadTimeoutException||cause instanceof WriteTimeoutException) {
            System.out.println("超时了:" + cause.toString());
        }
        ctx.close();//直接关闭channel
    }
}

业务逻辑:BusinessThreadUtil

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;


public class BusinessThreadUtil {

    private static final ExecutorService executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));//CPU核数4-10倍

    public static void doBusiness(ChannelHandlerContext ctx, Object msg, byte[] content) {
        //异步线程池处理
        executor.submit( () -> {
            if (msg instanceof HttpRequest) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Thread.currentThread().setName("buessness-thread");
                System.out.println(Thread.currentThread().getId());
                HttpRequest req = (HttpRequest) msg;
                boolean keepAlive = HttpUtil.isKeepAlive(req);
                FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(content));
                response.headers().set("Content-Type", "text/plain");
                response.headers().setInt("Content-Length", response.content().readableBytes());
                if (!keepAlive) {
                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                } else {
                    response.headers().set("Connection", "keep-alive");
                    ctx.writeAndFlush(response);
                }
            }
        });
    }
}
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享