当前位置: 首页 > news >正文

网站模块数据同步中国万网商城

网站模块数据同步,中国万网商城,网站主页设计模板图片,王烨涛本文主要从提供SSE方式接入DeepSeek#xff0c;并通过fastapi websocket对外提供接入方法。 参考文档#xff1a; 腾讯云大模型#xff1a;https://cloud.tencent.com/document/product/1759/109380 fastAPI官网#xff1a;https://fastapi.tiangolo.com/ WebSocketManager… 本文主要从提供SSE方式接入DeepSeek并通过fastapi websocket对外提供接入方法。 参考文档 腾讯云大模型https://cloud.tencent.com/document/product/1759/109380 fastAPI官网https://fastapi.tiangolo.com/ WebSocketManager 提供WebsocketManager对websocket连接实例进行管理代码片段如下 from fastapi import WebSocketclass WsManager:websocket managerconnectors: dict[str, WebSocket] {}staticmethoddef get_connector(connector_id: str) - WebSocket | None:获取连接实例:param connector_id::return:connector WsManager.connectors.get(connector_id)return connectorstaticmethoddef add_connector(connector_id: str, connector: WebSocket):添加websocket客户端:param connector_id: 客户端ID:param connector: 客户端连接实例:return:WsManager.connectors[connector_id] connectorstaticmethoddef remove_connector(connector_id: str):移除websocket连接:param connector_id: 连接ID:return:try:del WsManager.connectors[connector_id]except Exception:passstaticmethodasync def send_message(connector_id: str, message: str):connector WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_text(message)except Exception as e:print(消息发送失败)staticmethodasync def send_message_json(connector_id: str, message: dict):connector WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_json(message)except Exception as e:print(消息发送失败)接入DeepSeek import uuid import os import requests import jsonfrom src.core.WsManager import WsManager from .session import get_sessionTCLOUD_URL lke.tencentcloudapi.com TCLOUD_DeepSeek_SSE_URL https://wss.lke.cloud.tencent.com/v1/qbot/chat/sseSECRET_ID XXXXXXXXXXXXXXXXXXXX SECRET_KEY XXXXXXXXXXXXXXXXXXXXX REGION ap-guangzhou TYPE 5def get_session():生成一个 UUID:return session id 字符串new_uuid uuid.uuid4()# 将 UUID 转换为字符串session_id str(new_uuid)return session_iddef _resolve_message(message: str, prev_message: str):处理消息:param message::param prev_message::return:if prev_message :return Noneif prev_message.startswith(event:):# 生成消息message message.replace(data:, ).strip()try:message_json json.loads(message)message_type message_json.get(type)payload message_json.get(payload)if message_type token_stat:# token统计事件return Noneelif message_type thought:# 思考事件return Noneelif message_type error:# 错误事件return Noneelif message_type reference:# 参考来源事件return Nonecontent payload.get(content)record_id payload.get(record_id)request_id payload.get(request_id)session_id payload.get(session_id)trace_id payload.get(trace_id)message_id payload.get(message_id)return {type: message_type,data: {content: content,record_id: record_id,request_id: request_id,session_id: session_id,trace_id: trace_id,message_id: message_id,}}except Exception as e:print(e)else:return Noneasync def get_q_a_result(visitor_id: str, question: str):获取问答结果:param visitor_id: 访客ID:param question: 问题内容:return: 回答内容session_id get_session()req_data {content: f{question},bot_app_key: XXXXXXXX, # 可以获取方式见下图visitor_biz_id: visitor_id,session_id: session_id,streaming_throttle: 100}res requests.post(TCLOUD_DeepSeek_SSE_URL, datajson.dumps(req_data),streamTrue, headers{Accept: text/event-stream})prev_message: str # 上一条消息if res.status_code 200:for line in res.iter_lines():if line:data line.decode(utf-8)message _resolve_message(data, prev_message)if message:await WsManager.send_message_json(visitor_id, message)prev_message dataelse:print(Failed to get data. Status code: {response.status_code})# 关闭websocket连接connector WsManager.get_connector(visitor_id)if connector:await connector.close()WsManager.remove_connector(visitor_id) 进入控制台 2. 创建应用并点击调用 通过fastapi 以websocket方式对外提供接口 这个接口是前台连接websocket后后台直接根据连接参数开始调用问答问答结束后自动关闭websocket连接 import asynciofrom fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.responses import JSONResponsefrom src.core.WsManager import WsManager from src.utils.tcloudLLM import get_example_recommend from src.utils.session import get_visitor_idrouter APIRouter(prefix/api/tcloud, tags[腾信云大模型接口])router.websocket(/ws/{compound_name}) async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):await websocket.accept()visitor_id get_visitor_id()WsManager.add_connector(visitor_id, websocket)# 本来使用BackgroundTask去做后台任务结果不知道什么原因调用不起来然后换成asyncio处理asyncio.create_task(get_example_recommend(visitor_id, compound_name)) # 创建对应问答任务try:while True:data await websocket.receive_text()await websocket.send_text(fMessage text was: {data})except WebSocketDisconnect:WsManager.remove_connector(visitor_id)except Exception:WsManager.remove_connector(visitor_id)如果需要进行多轮问答提供为多次问答设置相关的 request_id 参数进行消息串联。
http://www.hkea.cn/news/14423429/

相关文章:

  • 网站是否需要备案做网站服务器 自己电脑还是租
  • 合川网站建设公司百度账号登录
  • 垂直行业门户网站有哪些网站内链怎么坐锚文本
  • 小语种网站开发建站公司网站模板论坛
  • 投票网站源码php更知鸟wordpress
  • 如何做好一个购物网站软件开发的收官之战是什么
  • 招聘网站建设公司营销型网站的建设与推广辅导记录
  • 网站 服务 套餐如何设置wordpress不自动更新
  • wordpress开启全站ssl网站开发实践意义
  • 网站开发岗位就业分析公众号开放平台
  • 网站与系统开发什么样的人适合做策划
  • 阿里做网站怎么做上海专业高端网站建
  • 有没有免费推广的appseo整站优化外包
  • 合格的网站设计师需要会什么软件申请摇号广州网站
  • 上海做网站公司哪家好网站制作引擎
  • 合肥专业网站建设公司哪家好自己提供域名做网站
  • 博星卓越电子商务网站建设实训平台站长网站素材网
  • 网站管理和维护旅游网站分析
  • 南宁网站设计组织建设方面
  • 网站开发虚拟电话金属建材企业网站建设方案
  • 建设行业公司网站wordpress云建站教程视频
  • 山西网站建设企业永城网站建设
  • 南宁建设网站哪里好多用户商城系统的优势
  • 个人建设网站论文国产crm系统91
  • 江门建站引擎网站
  • 青岛网站建设网站做景观设施的网站
  • 百度网盟网站有哪些专业网站建设是哪家
  • 深圳html5网站建设价格美团网站建设
  • 网站站点不安全东莞百度seo新网站快速排名
  • 做的很漂亮的网站微网站制作软件