一、SSE是什么?

SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

  • 注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。
  • 优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
  • 缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
  • sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:
  Content-Type: text/event-stream;
  charset=UTF-8Cache-Control: no-cache
  Connection: keep-alive

实现一个例子

后端代码

       /**
	* 用于创建连接
	*/
	@GetMapping("/connect/{userId}")
	public SseEmitter connect(@PathVariable String userId) {
		return SseEmitterUtil.connect(userId);
	}

	/**
	 * 推送给所有人
	 * @param message
	 * @return
	 */
	@GetMapping("/push/{message}")
	public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
		// 获取连接人数
		int userCount = SseEmitterUtil.getUserCount();
		// 如果无在线人数,返回
		if (userCount < 1) {
			return ResponseEntity.status(500).body("无人在线!");
		}
		SseEmitterUtil.batchSendMessage(message);
		return ResponseEntity.ok("发送成功!");
	}

SseEmitterUtil代码

public class SseEmitterUtil {

	/**
	 * 当前连接数
	 */
	private static AtomicInteger count = new AtomicInteger(0);

	/**
	 * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
	 */
	private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

	/**
	 * 创建用户连接并返回 SseEmitter
	 * @param userId 用户ID
	 * @return SseEmitter
	 */
	public static SseEmitter connect(String userId) {
		// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
		SseEmitter sseEmitter = new SseEmitter(0L);
		// 注册回调
		sseEmitter.onCompletion(completionCallBack(userId));
		sseEmitter.onError(errorCallBack(userId));
		sseEmitter.onTimeout(timeoutCallBack(userId));
		sseEmitterMap.put(userId, sseEmitter);
		// 数量+1
		count.getAndIncrement();
		log.info("创建新的sse连接,当前用户:{}", userId);
		return sseEmitter;
	}

	/**
	 * 给指定用户发送信息
	 */
	public static void sendMessage(String userId, String message) {
		if (sseEmitterMap.containsKey(userId)) {
			try {
				// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
				sseEmitterMap.get(userId).send(message);
			}
			catch (IOException e) {
				log.error("用户[{}]推送异常:{}", userId, e.getMessage());
				removeUser(userId);
			}
		}
	}

	/**
	 * 群发消息
	 */
	public static void batchSendMessage(String wsInfo, List<String> ids) {
		ids.forEach(userId -> sendMessage(wsInfo, userId));
	}

	/**
	 * 群发所有人
	 */
	public static void batchSendMessage(String wsInfo) {
		sseEmitterMap.forEach((k, v) -> {
			try {
				v.send(wsInfo, MediaType.APPLICATION_JSON);
			}
			catch (IOException e) {
				log.error("用户[{}]推送异常:{}", k, e.getMessage());
				removeUser(k);
			}
		});
	}

	/**
	 * 移除用户连接
	 */
	public static void removeUser(String userId) {
		sseEmitterMap.remove(userId);
		// 数量-1
		count.getAndDecrement();
		log.info("移除用户:{}", userId);
	}

	/**
	 * 获取当前连接信息
	 */
	public static List<String> getIds() {
		return new ArrayList<>(sseEmitterMap.keySet());
	}

	/**
	 * 获取当前连接数量
	 */
	public static int getUserCount() {
		return count.intValue();
	}

	private static Runnable completionCallBack(String userId) {
		return () -> {
			log.info("结束连接:{}", userId);
			removeUser(userId);
		};
	}

	private static Runnable timeoutCallBack(String userId) {
		return () -> {
			log.info("连接超时:{}", userId);
			removeUser(userId);
		};
	}

	private static Consumer<Throwable> errorCallBack(String userId) {
		return throwable -> {
			log.info("连接异常:{}", userId);
			removeUser(userId);
		};
	}

}

前端代码

<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (window.EventSource) {

        // 建立连接
        source = new EventSource('/sse/connect/' + userId);

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', '/sse/close/' + userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/lori/p/17371917.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!

相关课程