WebSocket
WebSocket
概述
概念
WebSocket 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex),一开始的握手需要借助HTTP请求完成
WebSocket是真正实现了全双工通信的服务器向客户端推的互联网技术。 它是一种在单个TCP连 接上进行全双工通讯协议
Websocket通信协议与2011年倍IETF定为标准RFC 6455,Websocket API被W3C定为标准
全双工(Full Duplex)是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输(A→B且B→A)。指A→B的同时B→A,是瞬时同步的 单工、半双工(Half Duplex),所谓半双工就是指一个时间段内只有一个动作发生。早期的对讲机、以及早期集线器等设备都是基于半双工的产品。随着技术的不断进步,半双工会逐渐退出历史舞台
浏览器支持情况:https://caniuse.com/#search=websocket
服务器支持情况:Tomcat 7.0.47+以上才支持
http与websocket
- http协议是短连接,因为请求之后,都会关闭连接,下次重新请求数据,需要再次打开链接
- WebSocket协议是一种长链接,只需要通过一次请求来初始化链接,然后所有的请求和响应都是通过这个TCP链接进行通讯
SpringBoot整合
依赖
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置类
@Configuration
public class WebSocketConfig{
/**
* EndPoint扫描器,自动扫描@ServerEndpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* websocket 配置信息
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean bean = new ServletServerContainerFactoryBean();
//文本缓冲区大小
bean.setMaxTextMessageBufferSize(8192);
//字节缓冲区大小
bean.setMaxBinaryMessageBufferSize(8192);
return bean;
}
}
重要的注解
- @ServerEndpoint 用于声明websocket响应类,有点像@RequestMapping @ServerEndpoint(“/websocket”)
- @OnOpen websocket连接时触发 参数有:Session session, EndpointConfig config
- @OnMessage 有消息时触发 参数很多,一会再说
- @OnClose 连接关闭时触发 参数有:Session session, CloseReason closeReason
- @OnError 有异常时触发 参数有:Session session, Throwable throwable
基本使用
@Log4j2
@Controller
@ServerEndpoint("/websocket")
public class BaseWebsocketController
{
//使用 ConcurrentHashMap, 保证线程安全, static全局共享 session
//这里之所以static,是因为这个类不是单例的!!
//他虽然有@Controller注解,但是不适用Ioc容器中拿对象,每一次请求过来都是一个新的对象
//存放 session
private final static Map<String, Session> sessions = new ConcurrentHashMap<>();
//onopen 在连接创建(用户进入聊天室)时触发
@OnOpen
public void openSession(Session session, EndpointConfig config){}
//响应字符串
@OnMessage
public void onMessage(Session session, String message){}
//响应字节流
@OnMessage
public void onMessage(Session session, byte[] message){}
//onclose 在连接断开(用户离开聊天室)时触发
@OnClose
public void closeSession(Session session, CloseReason closeReason){}
@OnError
public void sessionError(Session session, Throwable throwable){}
}
这里有两个 @OnMessage, 这是因为websocket能发送三种请求(我知道的三种),一种是字符串,一种是字节流(用于上传文件),一种是ping-pong(乒乓机制)
因为js不好发送ping请求,这里就只有响应字符串和字节流两种方法。
发消息
@ServerEndpoint(value = "/ws/{from_user}")
@Component
@Slf4j
public class TestEndPoint {
//用于存放消息 <key:发送方id , value:session>
private static final Map<Long, Session> onlineUsers = new ConcurrentHashMap<>();
/*
* @ServerEndpoint 是由WebSocket容器而不是Spring容器进行管理的。
* 因此,你不能在这个类中直接使用`@Autowired`注入Spring bean。
* 为了解决这个问题,你可以将`userMessageMapper`设为静态变量,然后在Spring容器启动时管理 testEndPoint 这个 bean 的时候注入 userMessageMapper。
* */
private static UserMessageMapper userMessageMapper;
@Autowired
public void setUserMessageMapper(UserMessageMapper userMessageMapper) {
TestEndPoint.userMessageMapper = userMessageMapper;
}
@OnOpen
public void onOpen(Session session, @PathParam("from_user") Long fromUser) {
System.out.println(userMessageMapper);
onlineUsers.put(fromUser, session);
log.info("### 新用户加入,用户名 = {},在线人数 = {}", UserData.USER_MAP.get(fromUser).getUsername(), onlineUsers.size());
List<String> userList = onlineUsers.keySet().stream().map(id -> UserData.USER_MAP.get(id).getUsername()).collect(Collectors.toList());
for (Long userId : onlineUsers.keySet()) {
try {
onlineUsers.get(userId).getBasicRemote().sendText(JSON.toJSONString(userList));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnMessage
public void onMessage(String message, @PathParam("from_user") Long fromUser) throws IOException {
//获取toId、msg
JSONObject jsonObject = JSON.parseObject(message);
Long toId = jsonObject.getLong("toId");
String msg = jsonObject.getString("msg");
//创建message对象
Date sendDate = new Date();
UserMessage textUserMessage = UserMessage.builder()
.fromUser(fromUser)
.toUser(toId)
.msg(msg)
.status(1) // 未读
.sendDate(sendDate)
.build();
// 将消息保存到MongoDB
userMessageMapper.insert(textUserMessage);
// 判断to用户是否在线
Session toSession = onlineUsers.get(toId);
if (toSession != null && toSession.isOpen()) {
Date readDate = new Date();
// TODO 具体格式与前端对接
UserMessageVo userMessageVo = new UserMessageVo();
BeanUtils.copyProperties(textUserMessage, userMessageVo);
userMessageVo.setFromUserName(UserData.USER_MAP.get(fromUser).getUsername());
userMessageVo.setToUserName(UserData.USER_MAP.get(toId).getUsername());
toSession.getBasicRemote().sendText(JSON.toJSONString(userMessageVo));
//更新消息状态
userMessageMapper.updateById(UserMessage.builder()
.status(2)
.readDate(readDate)
.id(textUserMessage.getId())
.build());
}
}
@OnClose
public void onClose(@PathParam("from_user") Long fromUser) {
onlineUsers.remove(fromUser);
}
@OnError
public void onError(Session session, Throwable e) {
log.error("## e=", e);
}
}
注意:
TestEndPoint类同时被@ServerEndpoint和@Component注解,所以它既是一个WebSocket的端点又是一个Spring的组件。因此,WebSocket容器和Spring容器都会创建并管理TestEndPoint的一个实例。但是,这两个实例是不同的,即它们在内存中的地址不同。Spring容器支持依赖注入,所以在Spring容器创建并管理的
TestEndPoint实例中,可以通过@Autowired注解注入依赖。当Spring容器创建TestEndPoint实例并调用setUserMessageMapper()方法时,userMessageMapper静态变量就会被赋值。然而,WebSocket容器并不支持Spring的依赖注入,所以在WebSocket容器创建并管理的
TestEndPoint实例中,@Autowired注解不会起作用,userMessageMapper静态变量不会被赋值。但是,由于
userMessageMapper是静态变量,它是属于TestEndPoint类的,而不是任何一个TestEndPoint实例。所以,无论是Spring容器的TestEndPoint实例还是WebSocket容器的TestEndPoint实例,它们都可以访问到这个静态变量。这就是为什么需要使用静态变量和静态方法进行注入,这样可以让WebSocket容器的TestEndPoint实例也能够访问到通过Spring容器注入的UserMessageMapper实例。
发文件
@OnMessage
public void onMessage(Session session, byte[] message) throws Exception {
final UserMessage response = new UserMessage();
final Long username = (Long) session.getUserProperties().get("fromId");
final String fileName = (String) session.getUserProperties().get("fileName");
log.info(fileName);
ByteBuffer buffer = ByteBuffer.wrap(message);
try (RandomAccessFile file = new RandomAccessFile("fileName", "rw")) {
FileChannel channel = file.getChannel();
channel.write(buffer);
response.setMsg("[" + username + "]上传了一个文件【" + fileName + "】");
session.getBasicRemote().sendText(JSONObject.toJSONString(response));
} catch (IOException e) {
response.setMsg("文件【" + fileName + "】上传失败");
session.getBasicRemote().sendText(JSONObject.toJSONString(response));
}
}
测试
测试http://wstool.js.org/
ws://127.0.0.1:8082/ws/zhangsan连接不上
导航到“控制面板” > “程序” > “程序和功能” > “启用或禁用 Windows 功能”(位于屏幕左侧) 打开以下节点:“Internet Information Services” > “万维网服务” > “应用程序开发功能” 选择“WebSocket 协议”功能。 选择“确定”
鉴权
前端
前端不好给websocket加请求头,就在路径上拼接token信息
const token = ""
const url = `ws://localhost:8080/ws/${userId}?token=${token}`
const ws = new WebSocket(url);
后端
在配置类中配置
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator{
/**
* EndPoint扫描器,自动扫描@ServerEndpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* websocket 配置信息
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean bean = new ServletServerContainerFactoryBean();
//文本缓冲区大小
bean.setMaxTextMessageBufferSize(8192);
//字节缓冲区大小
bean.setMaxBinaryMessageBufferSize(8192);
return bean;
}
/**
* 建立握手时,连接前的操作
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
// 这个userProperties 可以通过 session.getUserProperties()获取,后面会用到的的信息可以都放这里面
final Map<String, Object> userProperties = sec.getUserProperties();
// 获取token
String accessToken = null;
Map<String, List<String>> headers = request.getHeaders();
List<String> token = headers.get("token");
if(CollectionUtils.isEmpty(token)&&StringUtils.isBlank(token.get(0))){
// 没有请求头就去lu's
URI uri = request.getRequestURI();
String query = uri.getQuery();
String[] queryParams = query.split("&");
for (String q : queryParams){
if(q.startsWith("token")){
String[] token = q.split("=");
accessToken = token[1];
break;
}
}
}else{
accessToken = token.get(0);
}
if(StringUtils.isBlank(accessToken)){
throw new Exception("没有权限");
}
try {
// jwt校验 返回的信息可以存在 userProperties 中
JWTTokenUtils.parseJWT(token.get(0));
} catch (Exception err) {
throw new Exception("没有权限");
}
}
}
注意:
- endpoint 的 @ServerEndpoint 中要配置 configurator 为配置类
@ServerEndpoint(value = "/ws/{fromId}",configurator = WebSocketConfig.class) @Controller public class MyEndpoint{ }
心跳
前端心跳
计时器定时发送心跳
在onMessage中
再开一个计时器
function createWebSocket() {
try {
ws = new WebSocket(wsUrl);
init();
} catch(e) {
console.log('catch');
reconnect(wsUrl);
}
}
// 监听close、error、open、message
function init() {
ws.onclose = function () {
};
ws.onerror = function() {
};
ws.onopen = function () {
};
ws.onmessage = function (event) {
// 处理消息
// 更新时间
time = Date.now()
}
}
// 计时器判断长时间收不到应答,进行重连
let time = 0;
setInterval(()=>{
const currentTime = Date.now()
if(time != 0 && currentTime-time>10000){
// 重连
reconnect(wsUrl);
}
}, 15000)
// 计时器发送心跳
setInterval(()=>{
ws.send("xxx"); // 发一条心跳消息,格式自定义
}, 5000)
// 重连
var lockReconnect = false;//避免重复连接
function reconnect(url) {
if(lockReconnect) {
return;
};
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
tt = setTimeout(function () {
createWebSocket(url);
lockReconnect = false;
}, 4000);
}
后端应答
对约定好的心跳消息进行应答即可
前端全局websocket
单例模式
// 创建一个 WebSocket 单例对象
const WebSocketSingleton = (function () {
let instance = null;
let websocket = null;
function initWebSocket(url) {
websocket = new WebSocket(url);
websocket.onopen = function () {
console.log('WebSocket连接已建立');
};
websocket.onclose = function () {
console.log('WebSocket连接已关闭');
};
websocket.onerror = function (err) {
console.error('WebSocket发生错误:', err);
};
websocket.onmessage = function (event) {
console.log('收到消息:', event.data);
};
}
return {
// 获取单例实例的方法
getInstance: function (url) {
if (!instance) {
instance = initWebSocket(url);
}
return instance;
},
// 发送消息的方法
send: function (message) {
if (websocket) {
websocket.send(message);
} else {
console.error('WebSocket连接未初始化');
}
},
// 关闭连接的方法
close: function () {
if (websocket) {
websocket.close();
instance = null;
websocket = null;
}
}
};
})();
使用
// 使用单例模式创建 WebSocket 连接
const socketInstance = WebSocketSingleton.getInstance('wss://example.com/ws');
// 现在可以全局使用 socketInstance 进行操作,例如发送消息:
socketInstance.send('Hello WebSocket!');
// 当不再需要连接时,可以关闭连接:
// socketInstance.close();
VUE中在app.vue创建连接,在其他地方再次获取到的连接就是同一个
SpringCloud整合
gateway
实际测试结果加不加ws都可
spring:
cloud:
gateway:
routes:
- id: user-service
uri: lb:ws://userservice ## 加上ws:即可
predicates:
- Path=/user/**
定时任务
直接在存Session的Map中取出对应session发消息即可
分布式解决方案
redis存用户所在服务器,MQ发送通知,在对应机器上获取session进行处理
