Socket、Http、WebSocket?强大的Netty几行语句就帮你实现

news/2024/5/17 19:51:18 标签: Netty, tcp, http, websocket, socket
http://www.w3.org/2000/svg" style="display: none;">

Netty_0">Socket、Http、WebSocket?强大的Netty几行语句就帮你实现!

一、概述

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,同时也可以支持WebSocket,这里,我们将这三种方式都详细介绍一下。

首发地址:
品茗IT-同步发布

如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以加入我们的java学习圈,点击即可加入,共同学习,节约学习时间,减少很多在学习中遇到的难题。

二、依赖Jar包

只需要引入Netty的jar包,为了方便json转换,我这里引入fastjson。

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.17.Final</version>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
	<version>1.2.40</version>
</dependency>

三、通用的监听配置

不论是Socket、http还是socket>websocket,都是基于tcp的,因此,netty需要配置EventLoopGroup做监听。

package cn.pomit.springwork.nettynew.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

public abstract class NettyServiceTemplate {
	static private EventLoopGroup bossGroup = new NioEventLoopGroup();
	static private EventLoopGroup workerGroup = new NioEventLoopGroup();

	abstract protected ChannelHandler[] createHandlers();

	abstract public int getPort();

	abstract public String getName();

	@PostConstruct
	public void start() throws Exception {
		ServerBootstrap b = new ServerBootstrap();
		b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ChannelHandler[] handlers = createHandlers();
						for (ChannelHandler handler : handlers) {
							ch.pipeline().addLast(handler);
						}
					}
				}).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
				.childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_REUSEADDR, true);

		ChannelFuture cf = b.bind(getPort()).await();
		// cf.channel().closeFuture().await();
		if (!cf.isSuccess()) {
			System.out.println("无法绑定端口:" + getPort());
			throw new Exception("无法绑定端口:" + getPort());
		}

		System.out.println("服务[{" + getName() + "}]启动完毕,监听端口[{" + getPort() + "}]");
	}

	@PreDestroy
	public void stop() {
		bossGroup.shutdownGracefully().syncUninterruptibly();
		workerGroup.shutdownGracefully().syncUninterruptibly();
		System.out.println("服务[{" + getName() + "}]关闭。");
	}
}

这里的配置,都是netty的常用配置:

  • option和childOption是配置连接属性的。
  • ChannelHandler是必须的,这里通过抽象方法,由子类负责配置。

启动的时候,new一个子类,调用start方法即可。

NettySocket_107">四、Netty的Socket监听

有了上面的NettyServiceTemplate,我们可以用几行代码构建一个tcp服务器。实现父类的抽象方法createHandlers,传递ChannelHandler数组。

下面的ChannelHandler数组包含:

  1. 换行分割解码器
  2. 字符串解码器
  3. 字符串编码器
  4. 自定义处理器,写自己逻辑用。

StringTcpServer :

package cn.pomit.springwork.nettynew.server.tcp;

import cn.pomit.springwork.nettynew.handler.tcp.StringTcpServerHandler;
import cn.pomit.springwork.nettynew.server.NettyServiceTemplate;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class StringTcpServer extends NettyServiceTemplate {
	private int port = 8088;
	private String name = "String Server";

	public StringTcpServer(int port) {
		this.port = port;
	}

	@Override
	protected ChannelHandler[] createHandlers() {
		return new ChannelHandler[] { 
				new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()),
				new StringDecoder(), 
				new StringEncoder(),
				new StringTcpServerHandler() };
	}

	@Override
	public int getPort() {
		return port;
	}

	@Override
	public String getName() {
		return name;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public void setName(String name) {
		this.name = name;
	}

}

这样就是一个tcp服务器了,StringTcpServerHandler特别简单,打印并返回:

package cn.pomit.springwork.nettynew.handler.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class StringTcpServerHandler extends SimpleChannelInboundHandler<String> {
	String charset = "UTF-8";

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("内容:" + msg);
		ctx.writeAndFlush("返回内容:" + msg);
	}
}

NettyHttp_187">五、Netty的Http监听

Nettyhttp,稍微复杂点,复杂只是相对于返回值的控制,cookie的控制,session的控制,因为netty只负责帮你解析了http的所有信息,并没有像sevlet容器那样还给你控制session了啥的。

我这里只用netty做json数据转换,用来提供rest服务,如果想了解更多,可以看下我用netty实现的一个类似Spring容器的工具:https://gitee.com/ffch/teaboot, 这个工具解析html、rest等,提供了简单的session控制、security控制。代码属于早期内容,尚待维护。

5.1 Http服务器

这个服务器是专门面向rest服务的。有了上面的NettyServiceTemplate,我们可以用几行代码构建一个http服务器。实现父类的抽象方法createHandlers,传递ChannelHandler数组。

ChannelHandler数组中:

  1. HttpResponseEncoder是netty自己的响应编码器,报文级别
  2. HttpRequestDecoder是netty自己的请求解码器,报文级别
  3. HttpObjectAggregator是完全的解析Http POST请求用的。
  4. HttpMsgResponseEncoder是自定义的响应编码器,为的是对响应做简单的控制。应用级别
  5. HttpMsgRequestDecoder是自定义的请求解码器,是对http请求转换为json。应用级别
  6. JsonHttpServerHandler是自定义逻辑处理器,写自己业务用。

JsonHttpServer :

package cn.pomit.springwork.nettynew.server.http;

import cn.pomit.springwork.nettynew.coder.http.HttpMsgRequestDecoder;
import cn.pomit.springwork.nettynew.coder.http.HttpMsgResponseEncoder;
import cn.pomit.springwork.nettynew.handler.http.JsonHttpServerHandler;
import cn.pomit.springwork.nettynew.server.NettyServiceTemplate;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class JsonHttpServer extends NettyServiceTemplate {
	int port = 8888;
	String name = "Json Server";
	private String charset = "UTF-8";
	private int timeout = 60;
	
	public JsonHttpServer(int port) {
		this.port = port;
	}

	@Override
	protected ChannelHandler[] createHandlers() {
		return new ChannelHandler[] { 
				new HttpResponseEncoder(), 
				new HttpRequestDecoder(),
				new HttpObjectAggregator(1048576), 
				new HttpMsgResponseEncoder(charset, timeout),
				new HttpMsgRequestDecoder(charset), 
				new JsonHttpServerHandler() };
	}

	@Override
	public int getPort() {
		return port;
	}

	@Override
	public String getName() {
		return name;
	}

}


5.2 自定义的请求解码器

请求解码器就是转换下数据,变成字符串。

HttpMsgRequestDecoder:

package cn.pomit.springwork.nettynew.coder.http;

import java.nio.charset.Charset;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;

public class HttpMsgRequestDecoder extends MessageToMessageDecoder<HttpObject>{
	private String charset;

	public HttpMsgRequestDecoder(String charset) {
		super();
		this.charset = charset;
	}

	@Override
	protected void decode(ChannelHandlerContext ctx, HttpObject in,
			List<Object> out) throws Exception {
		FullHttpRequest request = (FullHttpRequest) in;

		ByteBuf buf = request.content();
		String jsonStr = buf.toString(Charset.forName(charset));
		out.add(jsonStr);
	}
}

5.3 自定义的响应编码器

响应编码器中,定义了连接的配置信息,http头信息、内容类型等。

HttpMsgResponseEncoder:

package cn.pomit.springwork.nettynew.coder.http;

import java.util.List;

import cn.pomit.springwork.nettynew.model.http.HttpResponseMsg;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

public class HttpMsgResponseEncoder extends MessageToMessageEncoder<HttpResponseMsg> {
	private String charset;
	private int timeout;

	public HttpMsgResponseEncoder(String charset, int timeout) {
		super();
		this.charset = charset;
		this.timeout = timeout;
	}

	@Override
	protected void encode(ChannelHandlerContext ctx, HttpResponseMsg message, List<Object> out) {
		try {
			DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(message.getResCode()),
					Unpooled.wrappedBuffer(message.getMessage().getBytes(charset)));
			response.headers().set(HttpHeaderNames.CONTENT_TYPE, message.getResType()+";charset=" + charset);
			response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

			// 强制keep-alive
			response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
			response.headers().set("Keep-Alive", "timeout=" + timeout);

			out.add(response);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
}

5.4 业务处理器

这里的业务处理器很简单,就打印返回。

JsonHttpServerHandler:

package cn.pomit.springwork.nettynew.handler.http;

import cn.pomit.springwork.nettynew.model.http.HttpResponseMsg;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class JsonHttpServerHandler extends SimpleChannelInboundHandler<String> {
	String charset = "UTF-8";

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("post内容:" + msg);
		HttpResponseMsg hrm = new HttpResponseMsg();
    	hrm.setResType(HttpResponseMsg.ResType.JSON.getValue());
    	hrm.setResCode(HttpResponseMsg.ResCode.OK.getValue());
    	hrm.setMessage(msg);
    	ctx.writeAndFlush(hrm);
	}

}

NettyWebSocket_368">六、Netty的WebSocket监听

Netty的WebSocket,相对于http还稍微简单点。

Netty对WebSocket做了完全控制,你需要做的只是对WebSocket的用户进行控制,能根据用户找到相应的通道即可。

6.1 WebSocket服务器

有了上面的NettyServiceTemplate,我们可以用几行代码构建一个WebSocket服务器。实现父类的抽象方法createHandlers,传递ChannelHandler数组。

ChannelHandler数组中:

  1. HttpServerCodec是netty自己的http解码器,报文级别
  2. ChunkedWriteHandler是用于大数据的分区传输。
  3. HttpObjectAggregator是完全的解析Http消息体请求用的。
  4. WebSocketServerProtocolHandler是配置socket>websocket的监听地址。
  5. WebSocketServerHandler是自定义逻辑处理器,写自己业务用。

WebSocketServer :

package cn.pomit.springwork.nettynew.server.socket>websocket;

import cn.pomit.springwork.nettynew.handler.socket>websocket.WebSocketServerHandler;
import cn.pomit.springwork.nettynew.server.NettyServiceTemplate;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.socket>websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer extends NettyServiceTemplate {
	int port = 9999;
	String name = "WebSocket";
	
	public WebSocketServer(int port) {
		this.port = port;
	}

	@Override
	protected ChannelHandler[] createHandlers() {
		return new ChannelHandler[] { 
				new HttpServerCodec(), 
				new ChunkedWriteHandler(),
				new HttpObjectAggregator(1048576), 
				new WebSocketServerProtocolHandler("/ws"),
				new WebSocketServerHandler() };
	}

	@Override
	public int getPort() {
		return port;
	}

	@Override
	public String getName() {
		return name;
	}

}


6.2 WebSocket的聊天室逻辑

下面是用socket>websocket做聊天室的逻辑:

  • 使用MessageDTO 做消息的传递实体;
  • WebSocketUser存储了每个连接上来的WebSocket用户,保存对应段分Channel。
  • 前端要求填入用户后,模拟登录,并返回用户列表。前端根据用户列表选择人发送信息。
  • 根据MessageDTO中的发送人和接收人,找到对应的Channel并发送消息。

WebSocketServerHandler:

package cn.pomit.springwork.nettynew.handler.socket>websocket;

import java.util.List;

import com.alibaba.fastjson.JSONObject;

import cn.pomit.springwork.nettynew.model.socket>websocket.MessageDTO;
import cn.pomit.springwork.nettynew.model.socket>websocket.WebSocketUser;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.socket>websocketx.TextWebSocketFrame;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
		MessageDTO messageDTO = JSONObject.parseObject(msg.text(), MessageDTO.class);
		if (messageDTO.getMessageType().equals(MessageDTO.Type.TYPE_NEW.getMessageType())) {
			WebSocketUser.add(messageDTO.getFromUserName(), ctx.channel());
			messageDTO.setTargetUserName(messageDTO.getFromUserName());
			messageDTO.setFromUserName("SYSTEM");
			messageDTO.setMessage(JSONObject.toJSONString(WebSocketUser.getUserList()));
			ctx.channel().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageDTO)));
		} else {
			List<Channel> webUsers = WebSocketUser.getSessionByUserName(messageDTO.getTargetUserName());
			if (webUsers == null || webUsers.size() == 0) {
				System.out.print("发送给" + messageDTO.getTargetUserName() + ",当前无session");
				MessageDTO messageDTOError = new MessageDTO();
				messageDTOError.setFromUserName("SYSTEM");
				messageDTOError.setTargetUserName(messageDTO.getFromUserName());
				messageDTOError.setMessageType(MessageDTO.Type.TYPE_ERROR.getMessageType());
				messageDTOError.setMessage("发送失败!");
				ctx.channel().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageDTOError)));
				return;
			}
			System.out.print("发送给" + messageDTO.getTargetUserName() + ",当前session个数为:" + webUsers.size());

			for (int i = 0; i < webUsers.size(); i++) {
				Channel session = webUsers.get(i);
				if (!session.isOpen()) {
					WebSocketUser.removeWebSocketSession(messageDTO.getTargetUserName(), session);
				}

				session.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageDTO)));
			}
		}

	}

	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		System.out.println("用户:" + ctx.channel().id().asLongText() + "上线");
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		System.out.println("用户下线: " + ctx.channel().id().asLongText());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.channel().close();
	}
}

6.3 用户信息保存

用一个并发map保存所有用户和对应的Channel。

WebSocketUser:

package cn.pomit.springwork.nettynew.model.socket>websocket;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.channel.Channel;

public class WebSocketUser {
	private static Map<String, List<Channel>> userNameWebsession = new ConcurrentHashMap<>();

	public static void add(String userName, Channel webSocketSession) {
		userNameWebsession.computeIfAbsent(userName, v -> new ArrayList<Channel>()).add(webSocketSession);
	}

	/**
	 * 根据昵称拿WebSocketSession
	 * 
	 * @param nickName
	 * @return
	 */
	public static List<Channel> getSessionByUserName(String userName) {
		return userNameWebsession.get(userName);
	}

	/**
	 * 移除失效的WebSocketSession
	 * 
	 * @param webSocketSession
	 */
	public static void removeWebSocketSession(String userName, Channel webSocketSession) {
		if (webSocketSession == null)
			return;
		List<Channel> webSessoin = userNameWebsession.get(userName);
		if (webSessoin == null || webSessoin.isEmpty())
			return;
		webSessoin.remove(webSocketSession);
	}

	public static Set<String> getUserList() {
		return userNameWebsession.keySet();
	}
}

七、过程中用到的其他实体、启动类及页面

7.1 启动类

TestApp:

package cn.pomit.springwork.nettynew;

import cn.pomit.springwork.nettynew.server.http.JsonHttpServer;
import cn.pomit.springwork.nettynew.server.tcp.StringTcpServer;
import cn.pomit.springwork.nettynew.server.socket>websocket.WebSocketServer;

public class TestApp {
	
	public static void main(String args[]) throws ClassNotFoundException, InstantiationException, IllegalAccessException{		
		StringTcpServer stringTcpServerTest = new StringTcpServer(8088);
		JsonHttpServer jsonHttpServer = new JsonHttpServer(8880);
		WebSocketServer webSocketServer = new WebSocketServer(9999);
		try {
			stringTcpServerTest.start();
			jsonHttpServer.start();
			webSocketServer.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}



7.2 Http响应实体

HttpResponseMsg:


7.3 WebSocket传递实体

MessageDTO:


7.4 WebSocket前端页面


东西,不贴了,直接加群找我要吧。

详细完整的代码,可以访问品茗IT-博客《Socket、Http、WebSocket?强大的Netty几行语句就帮你实现!》进行查看

快速构建项目

Spring项目快速开发工具:

一键快速构建Spring项目工具

一键快速构建SpringBoot项目工具

一键快速构建SpringCloud项目工具

一站式Springboot项目生成

Mysql一键生成Mybatis注解Mapper

Spring组件化构建

SpringBoot组件化构建

SpringCloud服务化构建

喜欢这篇文章么,喜欢就加入我们一起讨论JAVA吧!
https://imgconvert.csdnimg.cn/aHR0cHM6Ly91cGxvYWQtaW1hZ2VzLmppYW5zaHUuaW8vdXBsb2FkX2ltYWdlcy8xNTI5NjcwNS1hYTk5Mzc4ZGZhM2I3ZmJkLnBuZw" alt="品茗IT交流群" />


http://www.niftyadmin.cn/n/1050715.html

相关文章

java之的System类

1 public class Demo3_System {2 3 /*4 * System类不能被实例化&#xff0c;所有的方法都是静态的5 * 成员方法&#xff1a;6 * gc() 垃圾回收7 * exit(int status) 退出程序 status 为0时是正常退出&#xff0c;非0位非正常退出8 * pu…

python3.6安装

1. 下载源码Python-3.6.5.tar.xz, https://www.python.org/ftp/python/3.6.5/ 2. 运行./configure --enable-optimizations 3. 修改Makefile中的prefix和Python-3.6.5/Modules/Setup.dist (处理openssl)&#xff0c;make; make install 4. python3.6 -m venv my_env 5. Certbo…

php实现求一个数的质数因子

php实现求一个数的质数因子 一、总结 一句话总结&#xff1a;这么简单的题目&#xff0c;还是把变量定义的位置和自增的位置写错。 1 <?php2 $numtrim(fgets(STDIN)); 3 //如果$num大于1 4 $i2; 5 while($num>1){ 6 while($num%$i0){ 7 echo $i. ; 8 $num$num/$i;…

python控制窗口对角线运动

import win32con import win32gui import time while True:time.sleep(1)notepad win32gui.FindWindow("Notepad","无标题 - 记事本")for size in range(0,1620):time.sleep(0.001)win32gui.SetWindowPos(notepad,win32con.HWND_TOPMOST,size,size*1080//…

Struts2-综合项目

综合项目&#xff1a;视频后台管理系统 开发环境&#xff1a;Tomcat6(服务器)jdk6(windows操作系统) 使用技术&#xff1a;struts2(后台)jsp(前台显示)ajax(信息传递)json(服务器响应前台&#xff0c;发送数据的格式) 课程方向&#xff1a; create table course(id int primary…

DB2自增列数据处理

自增列数据导入&#xff0c;可以先用一int型字段当过渡列&#xff0c;导入原表identity列数据后&#xff0c;将此int列更改属性即可&#xff0c;相关语句如下&#xff1a; db2 > alter table test alter column id set not null DB20000I SQL命令成功完成。 db2 > alter…

Apache Maven(六):存储库

Maven 存储库主要是存放一些第三方依赖jar包等。 严格来说&#xff0c;只有两种存储库&#xff1a;本地和远程&#xff0c;本地存储库是指您远程下载到本地的一个缓存&#xff0c;还包含尚未发布的临时构建文件。远程存储库是指一些可以通过各种协议如file://或http://并下载的…

MYSQL PROCEDURE 测试用例

/*** 查询俱协信息*/ DROP PROCEDURE IF EXISTS get_club_list; DELIMITER $$ CREATE PROCEDURE get_club_list(-- 页码IN i_page_num int unsigned,-- 一页数量IN i_page_size int unsigned ) BEGIN-- 定义变量DECLARE v_num int unsigned DEFAULT 0;SET v_num i_page_num*i_…