目的:演示如何将超高并发的请求放到任务队列,然后由指定的线程池中的并发线程共同处理这些任务队列。
1、引用依赖
刚写了个taskqueue-module,引用它
<dependency><groupId>io.github.tiger822</groupId><artifactId>taskqueue-module</artifactId><version>1.0-SNAPSHOT</version></dependency>
2、定义线程池及任务队列
定义了一个有20个固定线程的线程池,还有一组长度为200的任务队列
private final ThreadPool threadPool=new ThreadPool(20,20,1800,4);
private TaskQueue<ChatMessage> queue=new BlockedTaskQueue<>(200);
3、改写channelRead0
将原先的代码移走,单纯将数据对象放到任务队列即可
if (msg instanceof ChatMessage) {ChatMessageProperties properties=((ChatMessage)msg).getProperties();properties.setCtx(ctx);if (!queue.offer((ChatMessage) msg,100)){ctx.channel().writeAndFlush(JSONData.fromErr(0,-1,"任务队列满,系统正忙请稍后再试"));return;}}
4、定义线程组
同时启动20组线程监听这个任务队列
private void initThreads(Consumer<ChatMessage> consumer){List<CompletableFuture> list=new ArrayList<>();for (int i=0;i<threadPool.getCorePoolSize();i++){list.add(CompletableFuture.runAsync(()->{while (!closed.get()){ChatMessage message=queue.poll(3000);if (message==null){continue;}consumer.accept(message);}}));}tasks=list.toArray(new CompletableFuture[0]);}
5、修改server启动函数
在启动函数中,加入
initThreads(chatMessage->{ChannelHandlerContext ctx= (ChannelHandlerContext) chatMessage.getProperties().getCtx();try {CodeConsts.MessageType messageType = CodeConsts.MessageType.fromValue(chatMessage.getProperties().getSClass());switch (Objects.requireNonNull(messageType)) {case text:case attachment:getSession(ctx);chatService.sendMessage(chatMessage);broadcastOnLineMessage(chatMessage);break;case command:procCommand(ctx, (CommandMessage) chatMessage);break;}System.out.println("当前线程ID:"+Thread.currentThread().getId()+",当前任务队列数:" + queue.count());}catch (Exception e){e.printStackTrace();ctx.channel().writeAndFlush(JSONData.fromErr(0,-1,e.getMessage()));}});
这样处理后,将任务数据从netty线程中分离出来异步处理,给netty减负,也给并发量设置了上限以防止系统崩溃。
本文链接:https://my.lmcjl.com/post/9902.html
展开阅读全文
4 评论