跳至主要內容

WebSocket

程序员李某某大约 8 分钟

WebSocket

概述

概念

  • WebSocket 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex),一开始的握手需要借助HTTP请求完成

  • WebSocket是真正实现了全双工通信的服务器向客户端推的互联网技术。 它是一种在单个TCP连 接上进行全双工通讯协议

  • Websocket通信协议与2011年倍IETF定为标准RFC 6455,Websocket API被W3C定为标准

    全双工(Full Duplex)是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输(A→B且B→A)。指A→B的同时B→A,是瞬时同步的 单工、半双工(Half Duplex),所谓半双工就是指一个时间段内只有一个动作发生。早期的对讲机、以及早期集线器等设备都是基于半双工的产品。随着技术的不断进步,半双工会逐渐退出历史舞台

  • 浏览器支持情况:https://caniuse.com/#search=websocket

  • 服务器支持情况:Tomcat 7.0.47+以上才支持

http与websocket

  • http协议是短连接,因为请求之后,都会关闭连接,下次重新请求数据,需要再次打开链接
  • WebSocket协议是一种长链接,只需要通过一次请求来初始化链接,然后所有的请求和响应都是通过这个TCP链接进行通讯

SpringBoot整合

依赖

<!--websocket-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置类

@Configuration
public class WebSocketConfig{

    /**
     * EndPoint扫描器,自动扫描@ServerEndpoint
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * websocket 配置信息
     *
     * @return
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean bean = new ServletServerContainerFactoryBean();
        //文本缓冲区大小
        bean.setMaxTextMessageBufferSize(8192);
        //字节缓冲区大小
        bean.setMaxBinaryMessageBufferSize(8192);
        return bean;
    }

}

重要的注解

  • @ServerEndpoint 用于声明websocket响应类,有点像@RequestMapping @ServerEndpoint(“/websocket”)
  • @OnOpen websocket连接时触发 参数有:Session session, EndpointConfig config
  • @OnMessage 有消息时触发 参数很多,一会再说
  • @OnClose 连接关闭时触发 参数有:Session session, CloseReason closeReason
  • @OnError 有异常时触发 参数有:Session session, Throwable throwable

基本使用

@Log4j2
@Controller
@ServerEndpoint("/websocket")
public class BaseWebsocketController
{

	//使用 ConcurrentHashMap, 保证线程安全, static全局共享 session
	
	//这里之所以static,是因为这个类不是单例的!!
	//他虽然有@Controller注解,但是不适用Ioc容器中拿对象,每一次请求过来都是一个新的对象
	
    //存放 session
    private final static Map<String, Session> sessions = new ConcurrentHashMap<>();

    //onopen 在连接创建(用户进入聊天室)时触发
    @OnOpen
    public void openSession(Session session, EndpointConfig config){}

	//响应字符串
    @OnMessage
    public void onMessage(Session session, String message){}

	//响应字节流
    @OnMessage
    public void onMessage(Session session, byte[] message){}

    //onclose 在连接断开(用户离开聊天室)时触发
    @OnClose
    public void closeSession(Session session, CloseReason closeReason){}

    @OnError
    public void sessionError(Session session, Throwable throwable){}
}

这里有两个 @OnMessage, 这是因为websocket能发送三种请求(我知道的三种),一种是字符串,一种是字节流(用于上传文件),一种是ping-pong(乒乓机制)

因为js不好发送ping请求,这里就只有响应字符串和字节流两种方法。

发消息

@ServerEndpoint(value = "/ws/{from_user}")
@Component
@Slf4j
public class TestEndPoint {

    //用于存放消息 <key:发送方id , value:session>
    private static final Map<Long, Session> onlineUsers = new ConcurrentHashMap<>();

    /*
     * @ServerEndpoint 是由WebSocket容器而不是Spring容器进行管理的。
     * 因此,你不能在这个类中直接使用`@Autowired`注入Spring bean。
     * 为了解决这个问题,你可以将`userMessageMapper`设为静态变量,然后在Spring容器启动时管理 testEndPoint 这个 bean 的时候注入 userMessageMapper。
     * */
    private static UserMessageMapper userMessageMapper;

    @Autowired
    public void setUserMessageMapper(UserMessageMapper userMessageMapper) {
        TestEndPoint.userMessageMapper = userMessageMapper;
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("from_user") Long fromUser) {

        System.out.println(userMessageMapper);
        onlineUsers.put(fromUser, session);
        log.info("### 新用户加入,用户名 = {},在线人数 = {}", UserData.USER_MAP.get(fromUser).getUsername(), onlineUsers.size());
        List<String> userList = onlineUsers.keySet().stream().map(id -> UserData.USER_MAP.get(id).getUsername()).collect(Collectors.toList());
        for (Long userId : onlineUsers.keySet()) {
            try {
                onlineUsers.get(userId).getBasicRemote().sendText(JSON.toJSONString(userList));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnMessage
    public void onMessage(String message, @PathParam("from_user") Long fromUser) throws IOException {
        //获取toId、msg
        JSONObject jsonObject = JSON.parseObject(message);
        Long toId = jsonObject.getLong("toId");
        String msg = jsonObject.getString("msg");

        //创建message对象
        Date sendDate = new Date();
        UserMessage textUserMessage = UserMessage.builder()
                .fromUser(fromUser)
                .toUser(toId)
                .msg(msg)
                .status(1)  // 未读
                .sendDate(sendDate)
                .build();
        // 将消息保存到MongoDB
        userMessageMapper.insert(textUserMessage);
        // 判断to用户是否在线
        Session toSession = onlineUsers.get(toId);
        if (toSession != null && toSession.isOpen()) {
            Date readDate = new Date();
            // TODO 具体格式与前端对接
            UserMessageVo userMessageVo = new UserMessageVo();
            BeanUtils.copyProperties(textUserMessage, userMessageVo);
            userMessageVo.setFromUserName(UserData.USER_MAP.get(fromUser).getUsername());
            userMessageVo.setToUserName(UserData.USER_MAP.get(toId).getUsername());
            toSession.getBasicRemote().sendText(JSON.toJSONString(userMessageVo));
            //更新消息状态
            userMessageMapper.updateById(UserMessage.builder()
                    .status(2)
                    .readDate(readDate)
                    .id(textUserMessage.getId())
                    .build());
        }
    }

    @OnClose
    public void onClose(@PathParam("from_user") Long fromUser) {
        onlineUsers.remove(fromUser);
    }

    @OnError
    public void onError(Session session, Throwable e) {
        log.error("## e=", e);
    }
}

注意:TestEndPoint类同时被@ServerEndpoint@Component注解,所以它既是一个WebSocket的端点又是一个Spring的组件。因此,WebSocket容器和Spring容器都会创建并管理TestEndPoint的一个实例。但是,这两个实例是不同的,即它们在内存中的地址不同。

Spring容器支持依赖注入,所以在Spring容器创建并管理的TestEndPoint实例中,可以通过@Autowired注解注入依赖。当Spring容器创建TestEndPoint实例并调用setUserMessageMapper()方法时,userMessageMapper静态变量就会被赋值。

然而,WebSocket容器并不支持Spring的依赖注入,所以在WebSocket容器创建并管理的TestEndPoint实例中,@Autowired注解不会起作用,userMessageMapper静态变量不会被赋值。

但是,由于userMessageMapper是静态变量,它是属于TestEndPoint类的,而不是任何一个TestEndPoint实例。所以,无论是Spring容器的TestEndPoint实例还是WebSocket容器的TestEndPoint实例,它们都可以访问到这个静态变量。这就是为什么需要使用静态变量和静态方法进行注入,这样可以让WebSocket容器的TestEndPoint实例也能够访问到通过Spring容器注入的UserMessageMapper实例。

发文件

@OnMessage
public void onMessage(Session session, byte[] message) throws Exception {
    final UserMessage response = new UserMessage();

    final Long username = (Long) session.getUserProperties().get("fromId");
    final String fileName = (String) session.getUserProperties().get("fileName");
    log.info(fileName);

    ByteBuffer buffer = ByteBuffer.wrap(message);
    try (RandomAccessFile file = new RandomAccessFile("fileName", "rw")) {
        FileChannel channel = file.getChannel();
        channel.write(buffer);
        response.setMsg("[" + username + "]上传了一个文件【" + fileName + "】");
        session.getBasicRemote().sendText(JSONObject.toJSONString(response));
    } catch (IOException e) {
        response.setMsg("文件【" + fileName + "】上传失败");
        session.getBasicRemote().sendText(JSONObject.toJSONString(response));
    }
}

测试

  • 测试http://wstool.js.org/

    ws://127.0.0.1:8082/ws/zhangsan
    
  • 连接不上

    导航到“控制面板” > “程序” > “程序和功能” > “启用或禁用 Windows 功能”(位于屏幕左侧) 打开以下节点:“Internet Information Services” > “万维网服务” > “应用程序开发功能” 选择“WebSocket 协议”功能。 选择“确定”

鉴权

前端

前端不好给websocket加请求头,就在路径上拼接token信息

const token = ""
const url = `ws://localhost:8080/ws/${userId}?token=${token}`
const ws = new WebSocket(url);

后端

在配置类中配置

@Configuration
public class WebSocketConfig  extends ServerEndpointConfig.Configurator{

    /**
     * EndPoint扫描器,自动扫描@ServerEndpoint
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * websocket 配置信息
     *
     * @return
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean bean = new ServletServerContainerFactoryBean();
        //文本缓冲区大小
        bean.setMaxTextMessageBufferSize(8192);
        //字节缓冲区大小
        bean.setMaxBinaryMessageBufferSize(8192);
        return bean;
    }

    /**
     * 建立握手时,连接前的操作
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {

        // 这个userProperties 可以通过 session.getUserProperties()获取,后面会用到的的信息可以都放这里面
        final Map<String, Object> userProperties = sec.getUserProperties();

        // 获取token
        String accessToken = null;
        Map<String, List<String>> headers = request.getHeaders();
        List<String> token = headers.get("token");
        if(CollectionUtils.isEmpty(token)&&StringUtils.isBlank(token.get(0))){
            // 没有请求头就去lu's
            URI uri = request.getRequestURI();
            String query = uri.getQuery();
            String[] queryParams = query.split("&");
            for (String q : queryParams){
                if(q.startsWith("token")){
                    String[] token = q.split("=");
                    accessToken = token[1];
                    break;
                }
            }
        }else{
           	accessToken = token.get(0);
        }
        if(StringUtils.isBlank(accessToken)){
             throw new Exception("没有权限");
        }
        try {
            // jwt校验 返回的信息可以存在 userProperties 中
            JWTTokenUtils.parseJWT(token.get(0));
        } catch (Exception err) {
            throw new Exception("没有权限");
        }
    }
}

注意:

  • endpoint 的 @ServerEndpoint 中要配置 configurator 为配置类
@ServerEndpoint(value = "/ws/{fromId}",configurator = WebSocketConfig.class)
@Controller
public class MyEndpoint{
    
}

心跳

前端心跳

计时器定时发送心跳

在onMessage中

再开一个计时器

function createWebSocket() {
  try {
    ws = new WebSocket(wsUrl);
    init();
  } catch(e) {
    console.log('catch');
    reconnect(wsUrl);
  }
}

// 监听close、error、open、message
function init() {
  ws.onclose = function () {
  };
  ws.onerror = function() {
  };
  ws.onopen = function () {
  };
  ws.onmessage = function (event) {
    // 处理消息
  	// 更新时间
  	time = Date.now()
  }
}

// 计时器判断长时间收不到应答,进行重连
let time = 0;
setInterval(()=>{
    const currentTime = Date.now()
    if(time != 0 && currentTime-time>10000){
        // 重连
        reconnect(wsUrl);	
    }
}, 15000)

// 计时器发送心跳
setInterval(()=>{
   ws.send("xxx");  // 发一条心跳消息,格式自定义
}, 5000)


// 重连
var lockReconnect = false;//避免重复连接
function reconnect(url) {
  if(lockReconnect) {
    return;
  };
  lockReconnect = true;
  //没连接上会一直重连,设置延迟避免请求过多
  tt && clearTimeout(tt);
  tt = setTimeout(function () {
    createWebSocket(url);
    lockReconnect = false;
  }, 4000);
}

后端应答

对约定好的心跳消息进行应答即可

前端全局websocket

单例模式

// 创建一个 WebSocket 单例对象
const WebSocketSingleton = (function () {
  let instance = null;
  let websocket = null;

  function initWebSocket(url) {
    websocket = new WebSocket(url);

    websocket.onopen = function () {
      console.log('WebSocket连接已建立');
    };

    websocket.onclose = function () {
      console.log('WebSocket连接已关闭');
    };

    websocket.onerror = function (err) {
      console.error('WebSocket发生错误:', err);
    };

    websocket.onmessage = function (event) {
      console.log('收到消息:', event.data);
    };
  }

  return {
    // 获取单例实例的方法
    getInstance: function (url) {
      if (!instance) {
        instance = initWebSocket(url);
      }
      return instance;
    },

    // 发送消息的方法
    send: function (message) {
      if (websocket) {
        websocket.send(message);
      } else {
        console.error('WebSocket连接未初始化');
      }
    },

    // 关闭连接的方法
    close: function () {
      if (websocket) {
        websocket.close();
        instance = null;
        websocket = null;
      }
    }
  };
})();

使用


// 使用单例模式创建 WebSocket 连接
const socketInstance = WebSocketSingleton.getInstance('wss://example.com/ws');

// 现在可以全局使用 socketInstance 进行操作,例如发送消息:
socketInstance.send('Hello WebSocket!');

// 当不再需要连接时,可以关闭连接:
// socketInstance.close();

VUE中在app.vue创建连接,在其他地方再次获取到的连接就是同一个

SpringCloud整合

gateway

实际测试结果加不加ws都可

spring:
  cloud:
    gateway:
      routes:
      - id: user-service 
        uri: lb:ws://userservice   ## 加上ws:即可
        predicates: 
        - Path=/user/**

定时任务

直接在存Session的Map中取出对应session发消息即可

分布式解决方案

redis存用户所在服务器,MQ发送通知,在对应机器上获取session进行处理

上次编辑于:
贡献者: ext.liyuanhao3,李元昊