麻城做网站,建设部举报网站,专业开发网站企业,wordpress 主题 打不开背景
自从2022年底chatgpt上线后#xff0c;sse就进入了大众的视野#xff0c;之前是谁知道这玩意是什么#xff1f;但是打字机的效果看起来是真的很不错#xff0c;一度吸引了很多人的趋之若鹜#xff0c;当然了这个东西的确挺好用#xff0c;而且实现很简单#xff0…背景
自从2022年底chatgpt上线后sse就进入了大众的视野之前是谁知道这玩意是什么但是打字机的效果看起来是真的很不错一度吸引了很多人的趋之若鹜当然了这个东西的确挺好用而且实现很简单之前我用python的demo讲了一下SSE的概念看起来有很多人看但是并没有说明白这个原理这次再彻底把这个原理给说明白而且我发现通过node.js 的Express框架来说明这个概念更加简洁所以今天就用Express框架来说明SSE概念这样对前端同学更加友好。
之前的Python SSE的文章
https://blog.csdn.net/wangsenling/article/details/130911465
https://blog.csdn.net/wangsenling/article/details/130490769 回看协议演化历史
学过计算机网络的人都知道socket连接就是全双工的怎么到http协议这里就变成了单向的了且连一次就over了这其实是浏览器编程者故意而为另外服务端根本不保留会话信息处理完成直接就把这次请求的相关信息从内存清理了所以才出现这种单向沟通后来网络资源越来越便宜才逐渐的走向了SSE和Websocket
HTTP 协议设计背景
HTTP 是在 1990 年代早期互联网环境中设计出来的当时网络资源如带宽和连接数非常有限。基于这种限制HTTP 协议被设计为无状态、短连接的模型以尽可能地节省服务器和客户端的资源。这意味着 请求-响应模型HTTP 是典型的请求-响应协议客户端发起请求服务器处理并返回响应随后连接立即关闭。这样设计的目的是为了快速释放资源特别是服务器的连接数。 短连接早期的 HTTP/1.0 协议默认每个请求完成后都立即关闭连接这种方式减少了保持大量长连接对服务器造成的负担但也带来了效率上的问题特别是在需要频繁通信的情况下。 无状态HTTP 的无状态设计使得每个请求都是独立的服务器不必保留任何会话信息从而进一步降低了对服务器资源的需求。这在资源稀缺的互联网早期非常重要。
HTTP 的局限性
随着网络应用的复杂性增加HTTP 的短连接和无状态特性逐渐暴露出一些问题特别是在需要实时更新或双向通信的应用场景中例如 实时聊天、在线协作工具、股票行情更新、通知系统等。 在这些场景下HTTP 的传统请求-响应模型显得过于笨重因为每次更新都需要客户端主动发起新的请求。
为了弥补这些局限出现了一些技术例如 轮询Polling客户端定期向服务器发送请求以获取最新的数据。这种方法虽然可以在一定程度上实现实时更新但效率较低因为大量请求可能只会获得很少或没有新数据。 长轮询Long Polling客户端发送请求后服务器保持连接打开直到有新数据然后返回响应。这减少了一些不必要的请求但仍然需要不断建立新连接并且是非标准化的。
SSE 的诞生
为了解决 HTTP 请求-响应模型中的不足SSE 被引入。SSE 是基于 HTTP 协议之上的一种扩展它允许服务器在一个长时间保持的连接中不断地向客户端推送事件流。与 WebSocket 相比SSE 更加轻量并且完全基于 HTTP 协议这使它具有很好的兼容性。 长连接保持SSE 通过建立一个 HTTP 长连接使得服务器可以在连接保持的状态下推送多个事件。这样一来服务器可以在有新数据时立即推送给客户端而客户端不必频繁发起新的请求。 单向通信SSE 仅允许服务器向客户端发送数据这与 WebSocket 的全双工通信形成对比。虽然通信方式有限但它简化了很多实时更新场景中的开发工作并且避免了 WebSocket 的复杂性。 兼容 HTTP由于 SSE 基于 HTTP 协议因此它能够很好地与现有的 HTTP 基础设施如代理、防火墙等配合工作不容易遇到兼容性问题。
为什么 HTTP 自动关闭连接
你提到的 HTTP 自动关闭连接是基于早期互联网的设计初衷——节省资源。在那个时代保持长连接对于资源有限的服务器来说是一个很大的负担 资源节约每个连接都占用系统的文件描述符、内存和 CPU 资源。如果每个客户端保持长时间连接服务器很容易耗尽这些资源。因此HTTP 通过每次请求结束后立即关闭连接来减少服务器负担。 提高并发能力通过让连接快速关闭服务器可以同时处理更多的客户端请求提高并发能力。
WebSocket 与 HTTP 的不同设计哲学
正如你提到的WebSocket 是长连接全双工的它在设计上更类似于低层的 TCP 套接字通信。这使得 WebSocket 更适合需要实时双向通信的应用比如聊天、在线游戏等。 WebSocket 提供的长连接全双工通信适用于需要双向持续交互的场景但它不是基于 HTTP 的标准模型因此需要在协议层上进行升级。
相比之下HTTP 的设计初衷是基于短连接的请求-响应模型更适合传统的静态内容传输场景。在现代互联网中虽然有了更高效的 WebSocket但 HTTP 的请求-响应模型仍然有其合理性尤其在静态资源加载、API 请求等方面。
总结
SSE 的出现正是为了解决 HTTP 的局限性提供一种简单、基于 HTTP 协议的长连接机制适用于实时更新但无需复杂双向通信的场景。HTTP 早期设计为短连接的原因在于资源有限自动关闭连接是为了节省资源。而像 WebSocket 这样的长连接协议则适用于实时性和双向通信要求较高的场景因此两者都有各自的应用场景。 SSE就是HTTP协议下的一个协议补充
Demo效果 Gitee 源码地址
https://gitee.com/sen2020/express-test
我们来看request和response的格式请求核心就两个返回核心是三个不缓存保持存活事件流类型只要发出去的http请求带着这些header参数那么SSE协议连接就建立了非常的简单。 结合之前的文档你就能理解了fetch只要拼凑出来这样header就可以建立SSE连接不需要什么特殊的处理几乎任何一个Web框架都支持接下来用node.js的Express实现一个打字机的小demo
服务端代码
import express from express;
import fs from fs;
import readline from readline;
import { EventEmitter } from events;
import path from path;
import { fileURLToPath } from url;const app express();
const port 3000;// 获取当前文件的路径
const __filename fileURLToPath(import.meta.url);
const __dirname path.dirname(__filename);// 创建事件管理器
const eventEmitter new EventEmitter();// 设置静态文件目录来提供前端页面和资源
app.use(express.static(path.join(__dirname, public)));// SSE 路由根据 ISBN 请求电子书
app.get(/events, (req, res) {const { isbn } req.query;if (!isbn) {res.status(400).send(Missing ISBN);return;}// 设置 headers 确保保持长连接res.setHeader(Content-Type, text/event-stream);res.setHeader(Cache-Control, no-cache);res.setHeader(Connection, keep-alive);// 确保每个连接只会被注册一次避免重复订阅同一事件const onNewData (data) {res.write(data: ${JSON.stringify(data)}\n\n);};eventEmitter.on(isbn, onNewData);// 当客户端断开时移除事件监听req.on(close, () {eventEmitter.removeListener(isbn, onNewData);res.end();});// 开始读取并发送字符流readFileCharacterByCharacter(isbn);
});// 从文件中逐字符读取数据并触发事件
const readFileCharacterByCharacter async (isbn) {const filePath path.join(__dirname, books, ${isbn}.txt);// 如果文件不存在则返回错误if (!fs.existsSync(filePath)) {eventEmitter.emit(isbn, { message: Error: Book with ISBN ${isbn} not found. });return;}const fileStream fs.createReadStream(filePath);const rl readline.createInterface({input: fileStream,crlfDelay: Infinity,});for await (const line of rl) {for (const char of line) {await new Promise(resolve setTimeout(resolve, 50)); // 模拟打字机效果每50ms发送一个字符eventEmitter.emit(isbn, { message: char });}eventEmitter.emit(isbn, { message: \n });}// 告诉前端书本内容已经结束eventEmitter.emit(isbn, { message: End of book. });
};app.listen(port, () {console.log(SSE server and static files serving at http://localhost:${port});
});
html代码
!DOCTYPE html
html langen
headmeta charsetUTF-8meta nameviewport contentwidthdevice-width, initial-scale1.0titleBook Reader/titlelink relstylesheet hrefstyles.css
/head
body
div classcontainerh1Book Reader/h1div classform-containerinput typetext idisbn value1234567890 placeholderEnter ISBNbutton idstartStart Reading/button/divdiv idcontent/div
/div
script srcscripts.js/script
/body
/html
前端JS代码
document.getElementById(start).addEventListener(click, () {const isbn document.getElementById(isbn).value.trim();if (!isbn) {alert(Please enter a valid ISBN);return;}const contentDiv document.getElementById(content);contentDiv.innerHTML ; // 清空之前的内容// 如果已经有一个 EventSource先关闭它if (window.eventSource) {window.eventSource.close();}// 创建一个新的 EventSource 连接window.eventSource new EventSource(/events?isbn${isbn});let currentLine ; // 用于累积当前行的字符let paragraph document.createElement(p); // 创建一个段落元素contentDiv.appendChild(paragraph); // 初始添加一个段落window.eventSource.onmessage (event) {const data JSON.parse(event.data);const char data.message;// 检查是否接收到 End of book. 消息if (char End of book.) {paragraph document.createElement(p);paragraph.textContent End of book.;contentDiv.appendChild(paragraph);// 关闭 EventSource 连接window.eventSource.close();return; // 停止进一步处理}// 如果是换行符渲染当前行并创建新的段落if (char \n) {paragraph document.createElement(p); // 创建新的段落contentDiv.appendChild(paragraph); // 添加新段落到内容区currentLine ; // 清空当前行} else {// 累积字符到当前行并更新当前段落内容currentLine char;paragraph.textContent currentLine; // 逐字符更新当前段落内容}// 自动滚动到底部contentDiv.scrollTop contentDiv.scrollHeight;};window.eventSource.onerror () {console.log(EventSource connection closed.);window.eventSource.close();};
});
CSS样式表
body {font-family: Arial, sans-serif;background-color: #f4f4f4;margin: 0;padding: 0;display: flex;justify-content: center;align-items: center;height: 100vh;
}.container {text-align: center;background-color: #fff;padding: 20px;border-radius: 8px;box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}.form-container {margin-bottom: 20px;
}#isbn {padding: 10px;font-size: 16px;width: 200px;margin-right: 10px;
}button {padding: 10px 15px;font-size: 16px;cursor: pointer;
}#content {text-align: left;margin-top: 20px;max-height: 300px;overflow-y: auto;background-color: #f9f9f9;padding: 15px;border: 1px solid #ddd;
}
目录结构 package.json配置 SSE的应用场景注意事项 php这种语言是无法搞这个事情了因为一个连接需要一个进程一台主机4核4G估计也只能建立200个进程就是极限了再长期保持就是扯淡所以你必须用异步IO的Web框架来实现SSE的连接才可以大大提高连接数。
用SSE做推送-负载均衡场景怎么破 首先我们要知道的是一个Web服务一旦启动就是一个独立的进程你想往这个进程中塞东西要么你能拿到这个进程对外的变量app或者server这个大概率不可能因为启动时是不对外提供这种变量的那怎么办 这肯定就是进程间通信的事情了这里不做深入探讨无非就是管道共享内存队列等等还有个套接字套接字是啥说实话我既懂又不懂 但是这个进程可以注册到一个端口上这样外界就可以发消息进来这也合理就像人有个耳朵一样如果一个人聋了你当然是没办法跟他说话的。 可以让这个进程自己提供一个http服务这样大家调用时就通用了通过这个http接口我们就可以将信息发送给这个服务里去 拿来做服务端推送机制那就是要在服务器级别收集client如果是负载均衡方式启动了多台服务器怎么办 刚好1列表就讲到这个问题每个独立服务都将自己的IP:端口号存放在redis中同时也罢自己的收集到的client创建个编码塞到redis里去例如user_idclient-ip-port 这种对应这样如果有一个广播需要广播一批人那么就能从redis中找到这批人然后再提取他们所在的ip:port然后把消息发给这些服务这些服务接收到user_id后再在自己Map表中找到client然后一个个发送数据过去就实现了负载均衡下的消息推送你学废了吗 类似hyperf框架这种开了多个进程来承接不同的client玩法是一样找到user_id:ip:port:process_id就可以实现同样的广播效果不过这玩意自己实现起来有点麻烦我也没玩过看了下调度就是这样实现的。
Express实现单服务的广播机制代码
import express from express;
import fs from fs;
import readline from readline;
import { EventEmitter } from events;
import path from path;
import { fileURLToPath } from url;const app express();
const port 3000;// 获取当前文件的路径
const __filename fileURLToPath(import.meta.url);
const __dirname path.dirname(__filename);// 创建事件管理器
const eventEmitter new EventEmitter();// 全局连接管理器
let clients [];// 设置静态文件目录来提供前端页面和资源
app.use(express.static(path.join(__dirname, public)));// SSE 路由客户端连接到服务器
app.get(/events, (req, res) {const headers {Content-Type: text/event-stream,Cache-Control: no-cache,Connection: keep-alive,};res.writeHead(200, headers);// 新的客户端连接加入全局连接管理器clients.push(res);// 清理客户端断开连接时的处理req.on(close, () {clients clients.filter(client client ! res);res.end();});
});// 广播新书上架通知给所有客户端
const broadcastNewBook (bookTitle) {clients.forEach(client {client.write(data: ${JSON.stringify({ message: New book available: ${bookTitle} })}\n\n);});
};// 上架新书时调用此函数
app.post(/new-book, express.json(), (req, res) {const { title } req.body;if (!title) {res.status(400).send(Book title is required);return;}// 广播新书消息给所有客户端broadcastNewBook(title);res.status(200).send(New book ${title} broadcasted to all clients.);
});// 启动服务器
app.listen(port, () {console.log(SSE server running at http://localhost:${port});
});Express中如何一个启动一个服务同时支持http/websocket两种协议? 这个要想明白一个事情Express本身分为两个模块其他Web框架也是这样的一个是协议处理模块app另外也即当http发送过来请求时浏览器会根据你输入的协议http:// 和 ws:// 在request的header中追加一个upgrade:websocket有了这个标识被Express的http模块识别到之后它调用的app就是ws的模块的app所有的逻辑都走这边刚才我们已经了解到http的关闭是由app模块发的close来控制的只要这边不发close这个连接就会一直保持着因为连接就是socketsocket就是全双工的所以只要ws模块不主动发close过来那么这个连接就可以保持长连接同时ws模块会把这个连接的信息记录下来以方便后续不断地再接收数据再发回数据这就是两个协议可以共用一个端口的机制。 将两种协议整合在一个Express下有什么好处他们连接对象都是在一个进程中因为启动时就启动了一个进程这个进程启动了http模块和ws模块所以两者共用一个一套上下文因此如果你在Express启动前创建一个clients []将连接过来的所有ws client都塞进去那就意味着 意味着你可以通过发送http请求给Express服务来实现广播效果是不是爽歪歪 效果截图 服务端代码
import express from express;
import { WebSocketServer } from ws;
import path from path;
import { fileURLToPath } from url;const app express();
const port 3000;// 使用 JSON 中间件解析 POST 请求体
app.use(express.json());// 获取当前文件的路径
const __filename fileURLToPath(import.meta.url);
const __dirname path.dirname(__filename);// 配置静态文件目录
app.use(express.static(path.join(__dirname, public/ws)));app.use((req, res, next) {res.setHeader(Content-Security-Policy, connect-src self http://localhost:3000);next();
});// 创建 WebSocket 服务器并与 Express 集成
const wss new WebSocketServer({ noServer: true });// 保存所有 WebSocket 客户端
let clients [];// WebSocket 连接处理
wss.on(connection, (ws) {console.log(Client connected);clients.push(ws);// 处理消息ws.on(message, (message) {console.log(Received message: ${message});});// 当客户端断开连接时移除它ws.on(close, () {clients clients.filter(client client ! ws);console.log(Client disconnected);});
});// 广播消息给所有 WebSocket 客户端
const broadcastMessage (message) {clients.forEach(client {if (client.readyState client.OPEN) {client.send(message);}});
};// 上架新书的 HTTP 接口
app.post(/new-book, (req, res) {const { title } req.body;if (!title) {return res.status(400).send(Book title is required);}const message New book available: ${title};broadcastMessage(message);res.status(200).send(New book ${title} has been broadcasted to all WebSocket clients.);
});// 处理升级请求WebSocket 连接时需要的处理逻辑
app.server app.listen(port, () {console.log(Server is running on port ${port});
});app.server.on(upgrade, (request, socket, head) {wss.handleUpgrade(request, socket, head, (ws) {wss.emit(connection, ws, request);});
});
Html 代码
!DOCTYPE html
html langen
headmeta charsetUTF-8meta nameviewport contentwidthdevice-width, initial-scale1.0titleWebSocket Client/title
/head
body
h1WebSocket Book Notification/h1
div idcontent/divscriptconst contentDiv document.getElementById(content);// 创建 WebSocket 连接const ws new WebSocket(ws://localhost:3000);// 处理接收到的消息ws.onmessage (event) {const paragraph document.createElement(p);paragraph.textContent Received: ${event.data};contentDiv.appendChild(paragraph);};// 处理 WebSocket 连接打开ws.onopen () {console.log(WebSocket connected);};// 处理 WebSocket 连接关闭ws.onclose () {console.log(WebSocket disconnected);};
/script
/body
/html
js发送广播的请求fetch
fetch(http://localhost:3000/new-book, {method: POST,headers: {Content-Type: application/json},body: JSON.stringify({title: The Great Adventure WSSSSSSSS})
})
.then(response response.text())
.then(data console.log(data))
.catch(error console.error(Error:, error));