首页 前端 正文
  • 本文约4993字,阅读需25分钟
  • 13
  • 0

FastAPI实战:WebSocket长连接保持与心跳机制,从入门到填坑

摘要

本文通过一个真实的上线案例,详细讲解FastAPI与JavaScript实现WebSocket长连接保持的心跳机制。你会了解为什么连接会断、心跳原理是什么、前后端代码怎么写,以及那些文档里没写的调优陷阱。照着做,让你的实时通信稳如老狗。

你是不是也遇到过——WebSocket连接动不动就断开,尤其是在移动端,用户切换个Wi-Fi或者电梯里信号晃一下,消息就收不到了?用户投诉说“APP消息延迟”,你一查日志,满屏都是WebSocket disconnected,然后疯狂重连,服务器压力山大,用户体验稀碎。

有些项目图省事,觉得WebSocket连上就行了,结果线上跑了半天,运维小哥就发来报警:连接数忽高忽低,很多连接存活不到2分钟。查日志,好家伙,Nginx默认proxy_read_timeout 60秒,加上移动网络运营商会掐掉长时间无流量的连接,双向夹击,连接全断了。

核心结论:WebSocket长连接保持,不能靠“连上就不管”,必须引入心跳机制——就像两个人打电话,每隔一会儿问一句“喂,还在吗?”。今天我就把FastAPI后端 + JavaScript前端的完整心跳实现,掰开了揉碎了讲给你听,顺便把我踩过的坑标红。

本文路线图

  • 为什么WebSocket会断?—— 中间件超时、网络状态变化
  • 心跳原理:ping-pong 还是 pong-ping?
  • FastAPI后端:接收心跳消息 + 超时管理
  • JavaScript前端:定时发送心跳 + 断线重连
  • 完整可运行代码示例
  • 那些年我踩过的坑(间隔设置、重复定时器、服务端主动断开)

🧠 第一部分:连接为什么会断?

把WebSocket想象成一条水管,数据就是水。如果水管一直流水,它就不会堵。但要是你半天不放水,中间的路由器、防火墙就觉得“嘿,这管子是不是废弃了?”——咔嚓一刀给你掐了。尤其是在移动网络下,运营商的NAT网关空闲超时可能只有30秒到几分钟。还有我们常用的Nginx,默认proxy_read_timeout是60秒,一旦60秒内没有数据从后端发到客户端,Nginx就会自作主张断开连接。

所以,要想让连接长存,唯一的方法就是定期发送一些“无用”的数据,告诉中间件:“我还活着,别砍我!”——这就是心跳。

💓 第二部分:心跳机制的两种姿势

心跳本质是一种ping/pong模式。WebSocket协议本身有控制帧PingPong,但浏览器原生JS的WebSocket API并没有直接暴露发送Ping帧的方法,所以我们一般用普通消息模拟:

  • 方案A:客户端定时发送ping消息,服务器收到后立即回复pong
  • 方案B:服务器定时发送ping,客户端回复pong。但同样,客户端需要能解析并回复。

更常见的做法是客户端主动发心跳,服务器只需响应或记录。为啥?因为客户端更能感知网络变化,且断开后能立即重连。下面我就以客户端发心跳为例,上代码。

⚙️ 第三部分:FastAPI后端实战

先搭一个最简单的FastAPI WebSocket端点。这里我用了/ws路径,接收心跳消息(约定JSON格式{"type": "ping"}),并回复{"type": "pong"}。同时,为了及时清理死连接,我会记录每个连接的最后心跳时间,启动一个后台任务检查超时(比如60秒没收到心跳就主动close)。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
from datetime import datetime, timedelta

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[WebSocket, datetime] = {}
        self._heartbeat_check_interval = 30   # 每30秒检查一次
        asyncio.create_task(self.heartbeat_checker())

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[websocket] = datetime.utcnow()
        print(f"新连接加入,当前连接数:{len(self.active_connections)}")

    def disconnect(self, websocket: WebSocket):
        if websocket in self.active_connections:
            del self.active_connections[websocket]
            print(f"连接断开,当前连接数:{len(self.active_connections)}")

    async def handle_messages(self, websocket: WebSocket):
        try:
            while True:
                data = await websocket.receive_text()
                try:
                    msg = json.loads(data)
                except:
                    continue
                # 如果是心跳ping,更新最后心跳时间并回复pong
                if msg.get("type") == "ping":
                    self.active_connections[websocket] = datetime.utcnow()
                    await websocket.send_text(json.dumps({"type": "pong"}))
                else:
                    # 其他业务消息,按需处理
                    await websocket.send_text(json.dumps({"type": "echo", "data": msg}))
        except WebSocketDisconnect:
            self.disconnect(websocket)

    async def heartbeat_checker(self):
        while True:
            await asyncio.sleep(self._heartbeat_check_interval)
            now = datetime.utcnow()
            timeout = timedelta(seconds=70)  # 超过70秒没心跳就断开
            dead_conns = []
            for ws, last_ping in self.active_connections.items():
                if now - last_ping > timeout:
                    dead_conns.append(ws)
            for ws in dead_conns:
                try:
                    await ws.close(code=1000, reason="heartbeat timeout")
                except:
                    pass
                self.disconnect(ws)

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    await manager.handle_messages(websocket)

💻 第四部分:JavaScript前端实现

前端主要做三件事:建立连接、定时发心跳、监听断开自动重连。我习惯把WebSocket封装成一个类,方便复用。直接上代码:

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.heartbeatInterval = 30000; // 30秒一次心跳
        this.reconnectInterval = 3000;   // 断线后3秒重连
        this.heartbeatTimer = null;
        this.reconnectTimer = null;
        this.connect();
    }

    connect() {
        this.ws = new WebSocket(this.url);
        this.ws.onopen = () => {
            console.log('WebSocket 已连接');
            // 连接成功后,启动心跳
            this.startHeartbeat();
            // 如果之前有重连定时器,清掉
            if (this.reconnectTimer) {
                clearTimeout(this.reconnectTimer);
                this.reconnectTimer = null;
            }
        };

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            if (data.type === 'pong') {
                console.log('收到心跳pong,连接正常');
                // 可以在这里更新UI显示最后心跳时间,但不必须
            } else {
                // 处理其他业务消息
                console.log('业务消息', data);
            }
        };

        this.ws.onclose = (e) => {
            console.log('WebSocket 关闭', e.reason);
            // 停止心跳
            this.stopHeartbeat();
            // 尝试重连
            this.reconnect();
        };

        this.ws.onerror = (err) => {
            console.error('WebSocket 错误', err);
            this.ws.close();
        };
    }

    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.ws && this.ws.readyState === WebSocket.OPEN) {
                this.ws.send(JSON.stringify({ type: 'ping' }));
                console.log('发送心跳ping');
            } else {
                console.warn('连接未开启,停止发送心跳');
                this.stopHeartbeat();
            }
        }, this.heartbeatInterval);
    }

    stopHeartbeat() {
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
        }
    }

    reconnect() {
        this.stopHeartbeat();
        if (!this.reconnectTimer) {
            this.reconnectTimer = setTimeout(() => {
                console.log('尝试重连...');
                this.connect();
            }, this.reconnectInterval);
        }
    }

    // 主动关闭连接(比如页面卸载时)
    close() {
        this.stopHeartbeat();
        if (this.ws) {
            this.ws.close();
        }
    }
}

// 使用示例
const client = new WebSocketClient('ws://你的域名/ws');
// 页面关闭前主动清理
window.addEventListener('beforeunload', () => client.close());

🧪 第五部分:跑起来看看效果

启动FastAPI(uvicorn main:app --reload),打开浏览器控制台,你会看到每隔30秒发送一次ping,服务器立即回复pong。即使你断开Wi-Fi再打开,客户端也会自动重连,并且重连后心跳继续。🎯

💣 第六部分:那些年我踩过的坑(必看)

  • 坑1:心跳间隔太短,服务器压力大——1秒一次纯属自残,30秒一次足够,既保活又省资源。
  • 坑2:服务端没做超时主动断开——客户端突然掉线(比如用户强制杀进程),服务端不知道,连接一直占着内存。所以后台心跳检查一定要有,超时就close。
  • 坑3:重连时忘记清理旧定时器——每次重连都新建一个setInterval,导致多个心跳线程并发,消息爆炸。解决方案:重连前先stopHeartbeat()
  • 坑4:前后端心跳格式约定不一致——我的是{"type":"ping",如果你后端用字段heartbeat,一定记得对齐,否则服务器不认,相当于没心跳。
  • 坑5:没考虑SSL/加密连接——生产环境用wss://,证书配置要正确,否则连接直接被拒绝。

扫描二维码,在手机上阅读
评论
博主关闭了评论
友情链接