netty-chat聊天室后端增加并发管控(任务排队,预防系统被冲崩溃)

目的:演示如何将超高并发的请求放到任务队列,然后由指定的线程池中的并发线程共同处理这些任务队列。

1、引用依赖

刚写了个taskqueue-module,引用它

		<dependency><groupId>io.github.tiger822</groupId><artifactId>taskqueue-module</artifactId><version>1.0-SNAPSHOT</version></dependency>

2、定义线程池及任务队列

定义了一个有20个固定线程的线程池,还有一组长度为200的任务队列

private final ThreadPool threadPool=new ThreadPool(20,20,1800,4);
private TaskQueue<ChatMessage> queue=new BlockedTaskQueue<>(200);

3、改写channelRead0

将原先的代码移走,单纯将数据对象放到任务队列即可

				if (msg instanceof ChatMessage) {ChatMessageProperties properties=((ChatMessage)msg).getProperties();properties.setCtx(ctx);if (!queue.offer((ChatMessage) msg,100)){ctx.channel().writeAndFlush(JSONData.fromErr(0,-1,"任务队列满,系统正忙请稍后再试"));return;}}

4、定义线程组

同时启动20组线程监听这个任务队列

private void initThreads(Consumer<ChatMessage> consumer){List<CompletableFuture> list=new ArrayList<>();for (int i=0;i<threadPool.getCorePoolSize();i++){list.add(CompletableFuture.runAsync(()->{while (!closed.get()){ChatMessage message=queue.poll(3000);if (message==null){continue;}consumer.accept(message);}}));}tasks=list.toArray(new CompletableFuture[0]);}

5、修改server启动函数

在启动函数中,加入

initThreads(chatMessage->{ChannelHandlerContext ctx= (ChannelHandlerContext) chatMessage.getProperties().getCtx();try {CodeConsts.MessageType messageType = CodeConsts.MessageType.fromValue(chatMessage.getProperties().getSClass());switch (Objects.requireNonNull(messageType)) {case text:case attachment:getSession(ctx);chatService.sendMessage(chatMessage);broadcastOnLineMessage(chatMessage);break;case command:procCommand(ctx, (CommandMessage) chatMessage);break;}System.out.println("当前线程ID:"+Thread.currentThread().getId()+",当前任务队列数:" + queue.count());}catch (Exception e){e.printStackTrace();ctx.channel().writeAndFlush(JSONData.fromErr(0,-1,e.getMessage()));}});

这样处理后,将任务数据从netty线程中分离出来异步处理,给netty减负,也给并发量设置了上限以防止系统崩溃。

本文链接:https://my.lmcjl.com/post/9902.html

展开阅读全文

4 评论

留下您的评论.