本地建站工具,百度网页首页,商城类网站如何众筹,广州网站搭建哪家好说明 暂时考虑的场景是单条数据处理特别复杂和耗时的场景。 场景如下#xff1a;
要对一篇文档进行实体处理#xff0c;然后再对实体进行匹配。在这个过程当中#xff0c;涉及到了好几部分服务#xff1a;
1 实体识别服务2 数据库查询服务3 es查询服务
整个处理包成了服…说明 暂时考虑的场景是单条数据处理特别复杂和耗时的场景。 场景如下
要对一篇文档进行实体处理然后再对实体进行匹配。在这个过程当中涉及到了好几部分服务
1 实体识别服务2 数据库查询服务3 es查询服务
整个处理包成了服务在单线程处理增量的时候非常正常但尝试进行并行调用的时候出现了问题。每次报错的时候都是显示Connection Reset By Peer,感觉像是服务端连接的问题。由于每一部分都可能是瓶颈我没(时间)法准确定位问题所在很有可能是同时起了5个实体识别GPU的抢占导致的问题(负载经常在100%)。
所以这个事有两个启示
1 对于One处理的设计是否可以保存中间关键步骤的跟踪2 系统资源瓶颈。服务资源是瓶颈(GPU、网络、数据库IO),如果目标是瓶颈资源(Server模式)很容易出现失败。反过来如果从瓶颈资源出发,尽力而为(Worker)反而会有更高的资源利用率。但是Server模式是面向消费者的而Worker模式则是面向生产者的如果我们要把工作交出去还是应该采用Server模式。 所以要解决的问题是在保证逻辑正确的情况下且有大量miss的情况下如何尽快的完成业务上的任务。 内容
在数据处理架构中,可靠性与效率也是一对钳制指标这类型指标混在一起是不行的必须要分开。所以在机器学习里有
1 精确率与召回率。是分开来研究的当然也有最终融合的指标(F1Score)2 TCP/IP。
TCP/IP 协议栈中的两个核心协议是 TCP传输控制协议提供可靠的、面向连接的通信服务确保数据包按顺序到达且无错误。 IP互联网协议负责将数据包从源地址传输到目的地址但不保证传输的可靠性。 在我的业务场景下TCP可以视为数据库比对而IP则视为队列处理。
(TCP模式)数据库的设计目的就是为了可靠、长期地保存大量的数据我们把任务、结果以及里程碑节点(中间过程)保存在数据库中是合适的。现在假设只有两个节点任务和结果。虽然我发现在有明显瓶颈的地方是实体识别这里应该独立一个里程碑节点出来
(IP模式)队列的设计目的则是缓冲和分发。缓冲是解决生产者的困扰这样不需要知道机器的能力是多少把要做的任务发完就好了。然后还可以解决“众包”问题通过多个worker进行任务分摊尽可能快的执行任务。在这个过程中必然会产生大量的不确定问题导致worker处理、交付失败的。这也是快所要付出的代价。
通过数据库 队列的组合就可以做到既快又好。
1 任务数据入库
简单起见暂时还是采用mysql。
先将原始数据灌到source表。
之前用离线方式跑了一部分数据将这部分数据搬到 result表。
right_files list_file_names_without_extension(right_path)
res_file_list []
get_right_file_names []
for the_file in list(right_files):the_tem_data from_pickle(the_file, ./right/)get_right_file_names.append(the_file)res_file_list.append(the_tem_data)res_file_list1 flatten_list(res_file_list)
# 过滤掉失败的
res_file_list2 [x for x in res_file_list1 if x !detail]
right_res_df pd.DataFrame(res_file_list2)# 引入与数据库表规范对接的数据模型
from pydantic import BaseModel,field_validator
class DocEnt(BaseModel):doc_id : strent_list : list maaped_ent: list propertydef ent_list_str(self):return ,.join(self.ent_list)propertydef mapped_list_str(self):return ,.join(self.maaped_ent)def dict(self):data_dict {}data_dict[doc_id] self.doc_iddata_dict[ent_list_str] self.ent_list_strdata_dict[mapped_list_str] self.mapped_list_strreturn data_dictfrom typing import Listclass DocEnt_list(BaseModel):data_list: List[DocEnt]# 将结果数据转为可被数据库接受的字段模式
docent_list DocEnt_list(data_list right_res_df.to_dict(orientrecords))
docent_list1 [x.dict() for x in docent_list.data_list]将合法的数据结果与ORM对接,先引入数据模型。
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func, Text, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetimem7_24013_url fmysqlpymysql://xxx:xxx172.17.0.1:24013/mydb# from urllib.parse import quote_plus
# the_passed quote_plus(!#*)
# # 创建数据库引擎
m7_engine create_engine(m7_24013_url)# 创建基类
Base declarative_base()# 定义数据模型
class DocEntMap(Base):__tablename__ doc_ent_mapid Column(Integer, primary_keyTrue)# CompileError: (in table users, column name): VARCHAR requires a length on dialect mysqldoc_id Column(String(50))ent_list_str Column(Text)mapped_list_str Column(Text)create_time Column(DateTime, defaultlambda: datetime.now())# 创建索引__table_args__ (Index(idx_doc_id, doc_id),Index(idx_create_time, create_time),)Base.metadata.create_all(m7_engine)
# 创建会话
Session sessionmaker(bindm7_engine)分批次存储数据
ent_map_lb ListBatchIterator(docent_list1, 1000)
import tqdm
with Session() as session:for i,some_list in tqdm.tqdm(enumerate(ent_map_lb)):test_list [DocEntMap(**x) for x in some_list]# 一次性添加到会话中session.add_all(test_list)# 提交会话session.commit()因为是mysql我按照每批1000来操作每秒能存2批这个速度也能接受了。
2 比较差集
方式一mysql不支持
SELECT id FROM table1
EXCEPT DISTINCT
SELECT id FROM table2;方式2left join
-- 查找在 table1 中存在但在 table2 中不存在的 id
SELECT t1.id FROM table1 t1
LEFT JOIN table2 t2 ON t1.id t2.id
WHERE t2.id IS NULL;我使用Sqlalchmey进行比较并获取数据稍慢但方法简单
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func, Text, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetimem7_24013_url fmysqlpymysql://xxx:xxx172.17.0.1:24013/mydb# from urllib.parse import quote_plus
# the_passed quote_plus(!#*)
# # 创建数据库引擎
m7_engine create_engine(m7_24013_url)# 创建基类
Base declarative_base()# 定义数据模型
class DocEntMap(Base):__tablename__ doc_ent_mapid Column(Integer, primary_keyTrue)# CompileError: (in table users, column name): VARCHAR requires a length on dialect mysqldoc_id Column(String(50))ent_list_str Column(Text)mapped_list_str Column(Text)create_time Column(DateTime, defaultlambda: datetime.now())# 创建索引__table_args__ (Index(idx_doc_id, doc_id),Index(idx_create_time, create_time),)# 定义模型类
class SourceData(Base):__tablename__ source_dataid Column(Integer, primary_keyTrue)mid Column(String(50))content Column(Text)created Column(String(50))def dict(self):data_dict {}data_dict[doc_id] self.middata_dict[text] self.contentreturn data_dict# 创建表如果表已经存在这一步将忽略
Base.metadata.create_all(m7_engine)# 创建会话
Session sessionmaker(bindm7_engine)# 分页查询
page 1
page_size 1000while True:offset (page - 1) * page_sizeresult session.query(SourceData).filter(~SourceData.mid.in_(session.query(DocEntMap.doc_id))).offset(offset).limit(page_size).all()if not result:breakif page % 10 0:print(page)resent_task_list [x.dict() for x in result]produces Producer(servers KAFKASERVER,raw_msg_list resent_task_list, topicthe_topic )resp req.post(http://agent:port/send_msg/,json produces.dict()).json()page 1翻页到后面还是慢的不过确实比较简单省事。
如果使用clickhouse,选取列的速度还是非常快的。要做好索引之后再取数应该效率会比较高。后续再看吧我现在都倾向先用lazy版的。
sqlSELECT mid FROM sourceEXCEPTSELECT doc_id FROM target
;datachc._exe_sql(sql)3 kafka 消费
消费也是通过kafka agent来做的但是比我之前在本机跑慢。我猜是因为worker在处理完任务请求下一个时无论多快都要进行序列化。而且因为通过agent进行消费者需要对数据进行两次序列化这个还是会比较耗时的。
我的想法是通过一个专门的数据拉取程序事先将数据从kafka上拉下来以short_uuid命名然后存在本地的left。然后本地的worker从left中取数。 为什么要大费周章从kafka取而不是直接从数据库取 主要目的是为了更好的分发。例如此时又要加另外两台机器协同处理难道我还要在手工分配数据吗
纯粹取数的程序消费速度一定比处理程序快多了这样就避免了每次要处理时才进行序列化。