当前位置: 首页 > news >正文

一站式服务中心建立网站小程序

一站式服务中心,建立网站小程序,随州有哪些网站建设的公司,网站制作资源1、DBUtils 简介、使用 DBUtils 简介 DBUtils 是一套用于管理 数据库 连接池 的Python包#xff0c;为 高频度、高并发 的数据库访问提供更好的性能#xff0c;可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装… 1、DBUtils 简介、使用 DBUtils 简介 DBUtils 是一套用于管理 数据库 连接池 的Python包为 高频度、高并发  的数据库访问提供更好的性能可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装和连接。该连接可在各种多线程环境中使用。 使用场景如果使用的是流行的对象关系映射器 SQLObject 或 SQLAlchemy 之一则不需要 DBUtils因为它们带有自己的连接池。SQLObject 2 (SQL-API) 实际上是从 DBUtils 中借用了一些代码将池化分离到一个单独的层中。 DBUtils 提供两种外部接口 PersistentDB 提供线程专用的数据库连接并自动管理连接。PooledDB 提供线程间可共享的数据库连接并自动管理连接。 另外实际使用的数据库驱动也有所依赖比如SQLite数据库只能使用PersistentDB作连接池。 下载地址http://www.webwareforpython.org/downloads/DBUtils/ 使用 DBUtils 数据库 连接池 安装pip install DBUtils 示例MySQLdb 模块使用 连接池对象只初始化一次一般可以作为模块级代码来确保。 PersistentDB 的连接例子: import DBUtils.PersistentDB# maxusage 则为一个连接最大使用次数 persist DBUtils.PersistentDB.PersistentDB(dbpaiMySQLdb,maxusage1000,**kwargs)# 获取连接池 conn persist.connection() # 关闭连接池 conn.close() 参数 dbpai 指定使用的数据库模块兼容 DB-API 。下面是支持 DB-API 2 规范的数据库模块 pip install pymysqlmysql pip install pymssqlsqlserver pip install cx_Oracleoracle pip install phoenixdbhbase pip install sqlite3sqlite3 python自带 DBUtils 仅提供给了连接池管理实际的数据库操作依然是由符合 DB-API 2 标准的目标数据库模块完成的。 示例pymysql 模块使用 PooledDB 使用方法同 PersistentDB只是参数有所不同。 dbapi 数据库接口mincached 启动时开启的空连接数量maxcached 连接池最大可用连接数量maxshared 连接池最大可共享连接数量maxconnections 最大允许连接数量blocking 达到最大数量时是否阻塞maxusage 单个连接最大复用次数setsession 用于传递到数据库的准备会话如 [”set name UTF-8″] 。 conn pooled.connection() cur conn.cursor() cur.execute(sql) res cur.fetchone() cur.close()   # 或者 del cur conn.close()  # 或者 del conn import pymysql from dbutils.pooled_db import PooledDB# 定义连接参数 pool PooledDB(creatorpymysql,maxconnections6,mincached2,maxcached5,blockingTrue,hostlocalhost,userroot,passwd123456,dbmydb,port3306,charsetutf8mb4 )def main():# 从连接池获取连接conn pool.connection()cursor conn.cursor()# 执行 SQL 语句sql SELECT * FROM studentscursor.execute(sql)result cursor.fetchall()# 处理查询结果for row in result:print(row)# 关闭游标和连接cursor.close()conn.close()if __name__ __main__:main()示例面向对象 使用 DBUtils 使用DBUtils数据库连接池中的连接操作数据库import json import pymysql import datetime from DBUtils.PooledDB import PooledDB import pymysqlclass MysqlClient(object):__pool None;def __init__(self, mincached10, maxcached20, maxshared10, maxconnections200, blockingTrue,maxusage100, setsessionNone, resetTrue,host127.0.0.1, port3306, dbtest,userroot, passwd123456, charsetutf8mb4)::param mincached:连接池中空闲连接的初始数量:param maxcached:连接池中空闲连接的最大数量:param maxshared:共享连接的最大数量:param maxconnections:创建连接池的最大数量:param blocking:超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理:param maxusage:单个连接的最大重复使用次数:param setsession:optional list of SQL commands that may serve to preparethe session, e.g. [set datestyle to ..., set time zone ...]:param reset:how connections should be reset when returned to the pool(False or None to rollback transcations started with begin(),True to always issue a rollback for safetys sake):param host:数据库ip地址:param port:数据库端口:param db:库名:param user:用户名:param passwd:密码:param charset:字符编码if not self.__pool:self.__class__.__pool PooledDB(pymysql,mincached, maxcached,maxshared, maxconnections, blocking,maxusage, setsession, reset,hosthost, portport, dbdb,useruser, passwdpasswd,charsetcharset,cursorclasspymysql.cursors.DictCursor)self._conn Noneself._cursor Noneself.__get_conn()def __get_conn(self):self._conn self.__pool.connection();self._cursor self._conn.cursor();def close(self):try:self._cursor.close()self._conn.close()except Exception as e:print(e)def __execute(self, sql, param()):count self._cursor.execute(sql, param)print(count)return countstaticmethoddef __dict_datetime_obj_to_str(result_dict):把字典里面的datatime对象转成字符串使json转换不出错if result_dict:result_replace {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}result_dict.update(result_replace)return result_dictdef select_one(self, sql, param()):查询单个结果count self.__execute(sql, param)result self._cursor.fetchone():type result:dictresult self.__dict_datetime_obj_to_str(result)return count, resultdef select_many(self, sql, param()):查询多个结果:param sql: qsl语句:param param: sql参数:return: 结果数量和查询结果集count self.__execute(sql, param)result self._cursor.fetchall():type result:list[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]return count, resultdef execute(self, sql, param()):count self.__execute(sql, param)return countdef begin(self):开启事务self._conn.autocommit(0)def end(self, optioncommit):结束事务if option commit:self._conn.autocommit()else:self._conn.rollback()if __name__ __main__:mc MysqlClient()sql1 SELECT * FROM shiji WHERE id 1result1 mc.select_one(sql1)print(json.dumps(result1[1], ensure_asciiFalse))sql2 SELECT * FROM shiji WHERE id IN (%s,%s,%s)param (2, 3, 4)print(json.dumps(mc.select_many(sql2, param)[1], ensure_asciiFalse))不用 连接池 import MySQLdb conn MySQLdb.connect(hostlocalhost,userroot,passwdpwd,dbmyDB,port3306) #import pymysql #conn pymysql.connect(hostlocalhost, port3306, dbgame, userroot, password123456, charsetutf8) curconn.cursor() SQLselect * from table1 rcur.execute(SQL) rcur.fetchall() cur.close() conn.close() 使用 连接池 import MySQLdb from DBUtils.PooledDB import PooledDB #5为连接池里的最少连接数 pool PooledDB(MySQLdb,5,hostlocalhost,userroot,passwdpwd,dbmyDB,port3306) # 以后每次需要数据库连接就是用connection函数获取连接就好了 conn pool.connection() curconn.cursor() SQLselect * from table1 rcur.execute(SQL) rcur.fetchall() cur.close() conn.close() 多线程 使用 连接池 import sys import threading import MySQLdb import DBUtils.PooledDBconnargs { host:localhost, user:user1, passwd:123456, db:test } def test(conn):try:cursor conn.cursor()count cursor.execute(select * from users)rows cursor.fetchall()for r in rows: passfinally:conn.close()def testloop():print (testloop)for i in range(1000):conn MySQLdb.connect(**connargs)test(conn)def testpool():print (testpool)pooled DBUtils.PooledDB.PooledDB(MySQLdb, **connargs)for i in range(1000):conn pooled.connection()test(conn)def main():t testloop if len(sys.argv) 1 else testpoolfor i in range(10):threading.Thread(target t).start()if __name__ __main__:main() 虽然测试方式不是很严谨但从测试结果还是能感受到 DBUtils 带来的性能提升。当然我们我们也可以在 testloop() 中一直重复使用一个不关闭的 Connection但这却不适合实际开发时的情形。 2、Flask 配置蓝图数据库连接池上下文原理 Flask之配置文件蓝图数据库连接池上下文原理https://www.cnblogs.com/yunweixiaoxuesheng/p/8418135.html 配  置 方式一使用字典方式配置 app.config[SESSION_COOKE_NAME] session_liling 方式二引入文件设置 from flask import Flask app Flask(__name__) app.config.from_pyfile(settings.py)    # 引用settings.py中的AAAA print(app.config[AAAA])        # 123 # settings.py AAAA 123 方法三使用环境变量设置推荐使用 from flask import Flask app Flask(__name__) import os os.environ[FLASK-SETTINGS] settings.py app.config.from_envvar(FLASK-SETTINGS) 方式四通过对象方式导入使用,可根据不同环境选择不同的配置推荐使用 from flask import Flask app Flask(__name__) app.config.from_object(settings.BaseConfig) print(app.config[NNNN])  # 123 # settings.py class BaseConfig(object):  # 公用配置     NNNN 123 class TestConfig(object):     DB 127.0.0.1 class DevConfig(object):     DB 192.168.1.1 class ProConfig(object):     DB 47.18.1.1 不同的文件 引用配置 from flask import Flask,current_appapp Flask(__name__)app.secret_key adfadsfhjkhakljsdfhapp.config.from_object(settings.BaseConfig)app.route(/index) def index():print(current_app.config[NNNN])return xxxif __name__ __main__:app.run() instance_path、instance_relative_config from flask import Flask,current_appapp Flask(__name__,instance_pathNone,instance_relative_configFalse) # 默认 instance_relative_config False 那么 instance_relative_config和instance_path都不会生效 # instance_relative_configTrueinstance_path才会生效app.config.from_pyfile(settings.py)将会失效 # 配置文件找的路径按instance_path的值作为配置文件路径 # 默认instance_pathNoneNone会按照当前路径下的instance文件夹为配置文件的路径 # 如果设置路径按照设置的路径查找配置文件。app.config.from_pyfile(settings.py)app.route(/index) def index():print(current_app.config[NNNN])return xxxif __name__ __main__:app.run() 蓝  图 对应用程序的目录结构进行分配一般适用于小中型企业 代码 # crm/__init__.py # 创建flask项目用蓝图注册不同的模块from flask import Flask from .views import account from .views import orderapp Flask(__name__)app.register_blueprint(account.account) app.register_blueprint(order.order)------------------------------------------------------------------------ # manage.py # 启动文件 import crmif __name__ __main__:crm.app.run()------------------------------------------------------------------------ # crm/views/account.py # 视图函数模块,Blueprint将函数引入app from flask import Blueprintaccount Blueprint(account,__name__,url_prefix/xxx)account.route(/login) def login():return Login Flask 数据库 连接池 https://www.cnblogs.com/TheLand/p/9178305.html ORM ( 对象关系映射 ) ORMObject-Relational Mapping对象关系映射是一种编程技术用于在关系型数据库和面向对象编程语言之间建立映射关系。它允许开发人员使用面向对象的方式来操作数据库而无需直接编写或执行 SQL 查询。 ORM 提供了一个抽象层将数据库表格映射为对象并提供了一组方法和工具以便于进行数据库的增删改查操作。开发人员可以通过使用对象和方法来表示和操作数据而不必关心底层的 SQL 语句和数据库细节。 常见的 ORM 框架包括 SQLAlchemy是 python 操作数据库的一个库能够进行 orm 映射是一个功能强大的 Python ORM 框架支持多种数据库后端提供了高级的查询功能和事务管理等特性。是为高效和高性能的数据库访问设计实现了完整的企业级持久模型。SQLAlchemy 的理念是SQL 数据库的量级和性能重要于对象集合而对象集合的抽象又重要于表和行。Flask-SQLAlchemyFlask-SQLAlchemy 是一个与 Flask 框架集成的 SQLAlchemy 扩展它简化了在 Flask 应用程序中使用 SQLAlchemy 进行数据库操作的过程。它提供了一组简单而强大的工具和功能使得与数据库的交互变得更加轻松和高效。Django ORMDjango 框架自带的 ORM提供了简单易用的接口支持多种数据库后端并具有强大的查询和模型关联功能。HibernateJava 领域中最流行的 ORM 框架为 Java 对象和关系型数据库之间提供了映射和管理。 使用 ORM 的好处包括 提高开发效率ORM 提供了面向对象的编程接口使得开发人员能够更快速地进行数据库操作减少了编写和调试 SQL 语句的工作量。跨数据库平台ORM 框架通常支持多种数据库后端使得开发人员能够轻松地切换或同时使用不同的数据库系统。数据库抽象和安全性ORM 隐藏了底层的数据库细节提供了一层抽象有助于维护和管理数据库结构并提供了安全性保护如参数绑定和防止 SQL 注入。更好的可维护性和可测试性使用 ORM 可以提高代码的可读性和可维护性使得进行单元测试和集成测试更加容易。 注意ORM 并不能解决所有数据库问题。在某些情况下复杂的查询和性能要求可能需要直接使用原生 SQL。因此根据具体的需求和场景谨慎选择和使用合适的 ORM 框架。 为什么 使用 数据库 连接池 多连接如果不用连接池时每次操作都要链接数据库链接次数过多数据库会耗费过多资源数量过大的话数据库会过载导致程序运行缓慢。单连接在程序中全局创建连接导致程序会一直使用一个连接避免了反复连接造成的问题。但是多线程时就得加锁。这样就变成串行没法实现并发 解决方法 方式 1为每一个线程创建一个链接是基于本地线程来实现的。thread.local每个线程独立使用自己的数据库链接该线程关闭不是真正的关闭本线程再次调用时还是使用的最开始创建的链接直到线程终止数据库链接才关闭。如果线程比较多还是会创建很多连接方式 2创建一个链接池为所有线程提供连接使用时来进行获取使用完毕后在放回到连接池。假设最大链接数有10个其实也就是一个列表当你pop一个人家会在append一个链接池的所有的链接都是按照排队的这样的方式来链接的。链接池里所有的链接都能重复使用共享的 即实现了并发又防止了链接次数太多 基于 DBUtils 数据库连接池 数据库连接池避免每次操作都要连接数据库一直使用一个连接多线程也会出现问题可加锁但变为串行 import pymysql import threading from threading import RLockLOCK RLock() CONN pymysql.connect(host127.0.0.1,port3306,userroot,password123,databaseok1,charsetutf8 )def task(arg):with LOCK:cursor CONN.cursor()cursor.execute(select * from book)result cursor.fetchall()cursor.close()print(result)for i in range(10):t threading.Thread(targettask, args(i,))t.start()线程之间 的 数据隔离 本地线程 可以实现线程之间的数据隔离。保证每个线程都只有自己的一份数据在操作时不会影响别人的即使是多线程自己的值也是互相隔离的 import threading import time# 本地线程对象 local_values threading.local()def func(num):# 第一个线程进来本地线程对象会为他创建一个# 第二个线程进来本地线程对象会为他创建一个{线程1的唯一标识{name:1},线程2的唯一标识{name:2},}:param num: :return: local_values.name num # 4# 线程停下来了time.sleep(2)# 第二个线程 local_values.name去local_values中根据自己的唯一标识作为key获取value中name对应的值print(local_values.name, threading.current_thread().name)for i in range(5):th threading.Thread(targetfunc, args(i,), name线程%s % i)th.start()模式一每个线程创建一个连接 基于threading.local实现创建每个连接。 每个线程会创建一个连接该线程没有真正关闭。 再次调用该线程时还是使用原有的连接。 线程真正终止的时候连接才会关闭。 from DBUtils.PersistentDB import PersistentDB import pymysqlPOOL PersistentDB(creatorpymysql, # 使用链接数据库的模块maxusageNone, # 一个链接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。如[set datestyle to ..., set time zone ...]ping0,# ping MySQL服务端检查是否服务可用。# 如0 None never, 1 default whenever it is requested, 2 when a cursor is created, 4 when a query is executed, 7 alwayscloseableFalse,# 如果为False时 conn.close() 实际上被忽略供下次使用再线程关闭时才会自动关闭链接。如果为True时 conn.close()则关闭链接那么再次调用pool.connection时就会报错因为已经真的关闭了连接pool.steady_connection()可以获取一个新的链接threadlocalNone, # 本线程独享值得对象用于保存链接对象如果链接对象被重置host127.0.0.1,port3306,userroot,password123,databasepooldb,charsetutf8 )def func():# conn SteadyDBConnection()conn POOL.connection()cursor conn.cursor()cursor.execute(select * from tb1)result cursor.fetchall()cursor.close()conn.close() # 不是真的关闭而是假的关闭。 conn pymysql.connect() conn.close()conn POOL.connection()cursor conn.cursor()cursor.execute(select * from tb1)result cursor.fetchall()cursor.close()conn.close()import threadingfor i in range(10):t threading.Thread(targetfunc)t.start() 模式二线程复用连接池 (推荐) 创建一个连接池为所有线程提供连接线程使用连接时获取连接使用完毕放回连接池。 线程不断地重用连接池里的连接。 import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnectionPOOL PooledDB(creatorpymysql, # 使用链接数据库的模块maxconnections6, # 连接池允许的最大连接数0和None表示不限制连接数mincached2, # 初始化时链接池中至少创建的空闲的链接0表示不创建maxcached5, # 链接池中最多闲置的链接0和None不限制maxshared3,# 链接池中最多共享的链接数量0和None表示全部共享。# PS: 无用因为pymysql和MySQLdb等模块的 threadsafety都为1所有值无论设置为多少# _maxcached永远为0所以永远是所有链接都共享。blockingTrue, # 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待然后报错maxusageNone, # 一个链接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。如[set datestyle to ..., set time zone ...]# ping MySQL服务端检查是否服务可用。# 如0 None never, 1 default whenever it is requested, # 2 when a cursor is created, 4 when a query is executed, 7 alwaysping0,host127.0.0.1,port3306,userroot,password123456,databaseflask_test,charsetutf8 )def func():# 检测当前正在运行连接数的是否小于最大链接数如果不小于则等待或报raise TooManyConnections异常# 否则# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。# 如果最开始创建的链接没有链接则去创建一个SteadyDBConnection对象# 再封装到PooledDedicatedDBConnection中并返回。# 一旦关闭链接后连接就返回到连接池让后续线程继续使用。# PooledDedicatedDBConnectionconn POOL.connection()# print(th, 链接被拿走了, conn1._con)# print(th, 池子里目前有, pool._idle_cache, \r\n)cursor conn.cursor()cursor.execute(select * from userinfo)result cursor.fetchall()print(result)conn.close()conn POOL.connection()# print(th, 链接被拿走了, conn1._con)# print(th, 池子里目前有, pool._idle_cache, \r\n)cursor conn.cursor()cursor.execute(select * from userinfo)result cursor.fetchall()conn.close()func() 上下文管理 所谓上下文像考试题目根据上下文回答一下问题。 程序中泛指的外部环境像wsgi来的网络请求而且通常只有上文。 flask中的上下文被使用在 current_appsessionrequest 上。 flask 本地线程 from flask import sessiontry:from greenlet import getcurrent as get_ident # grenlet协程模块 except ImportError:try:from thread import get_identexcept ImportError:from _thread import get_ident # get_ident()获取线程的唯一标识class Local(object): # 引用session中的LocalStack下的Local__slots__ (__storage__, __ident_func__) # __slots__该类在外面调用时只能调用定义的字段其他的不能调用def __init__(self):# object.__setattr__为self设置值等价于self.__storage__ {}# 为父类object中包含的__steattr__方法中的self.__storage__ {}# 由于类内包含__steattr__self.xxx对象.xxx时会自动会触发__steattr__# 当前__steattr__中storage self.__storage__又会像self.xxx要值故会造成递归# 所以在父类中__steattr__方法赋值避免self.xxx调用__setattr__造成的递归object.__setattr__(self, __storage__, {})object.__setattr__(self, __ident_func__, get_ident) # 赋值为协程def __iter__(self):return iter(self.__storage__.items())def __release_local__(self):self.__storage__.pop(self.__ident_func__(), None)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident self.__ident_func__() # 获取单钱线程(协程)的唯一标识storage self.__storage__ # {}try:storage[ident][name] value # { 111 : {stack:[] },222 : {stack:[] } }except KeyError:storage[ident] {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)_local Local() # flask的本地线程功能类似于本地线程如果有人创建Local对象并设置值每个线程里一份 _local.stack [] # _local.stack会调用__setattr__的self.__ident_func__()取唯一标识等特殊栈 from flask import sessiontry:from greenlet import getcurrent as get_ident except ImportError:try:from thread import get_identexcept ImportError:from _thread import get_ident # 获取线程的唯一标识 get_ident()class Local(object):__slots__ (__storage__, __ident_func__)def __init__(self):# self.__storage__ {}# self.__ident_func__ get_identobject.__setattr__(self, __storage__, {})object.__setattr__(self, __ident_func__, get_ident)def __iter__(self):return iter(self.__storage__.items())def __release_local__(self):self.__storage__.pop(self.__ident_func__(), None)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident self.__ident_func__() # 获取当前线程(协程)的唯一标识storage self.__storage__ # {}try:storage[ident][name] value # { 111:{stack:[] },222:{stack:[] } }except KeyError:storage[ident] {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)_local Local() _local.stack []使用 flask 中的 stack 和 local from functools import partial from flask.globals import LocalStack, LocalProxy_request_ctx_stack LocalStack()class RequestContext(object):def __init__(self, environ):self.request environdef _lookup_req_object(name):top _request_ctx_stack.topif top is None:raise RuntimeError(_request_ctx_stack)return getattr(top, name)# 实例化了LocalProxy对象_lookup_req_object参数传递 session LocalProxy(partial(_lookup_req_object, session)) local {“标识”: {stack: [RequestContext(),]} }_request_ctx_stack.push(RequestContext(c1)) # 当请求进来时放入print(session) # 获取 RequestContext(c1) top方法 print(session) # 获取 RequestContext(c1) top方法 _request_ctx_stack.pop() # 请求结束pop示例 from functools import partial from flask.globals import LocalStack, LocalProxyls LocalStack()class RequestContext(object):def __init__(self, environ):self.request environdef _lookup_req_object(name):top ls.topif top is None:raise RuntimeError(ls)return getattr(top, name)session LocalProxy(partial(_lookup_req_object, request))ls.push(RequestContext(c1)) # 当请求进来时放入 print(session) # 视图函数使用 print(session) # 视图函数使用 ls.pop() # 请求结束popls.push(RequestContext(c2)) print(session)ls.push(RequestContext(c3)) print(session)Flask SQLAlchemy 使用 连接池 Flask SQLAlchemy 提供了内置的连接池功能可以方便地配置和使用。 在Flask应用中我们可以通过配置 SQLALCHEMY_POOL_SIZE 参数来设置连接池的大小。连接池的大小决定了同时打开的数据库连接的数量。例如我们可以将连接池的大小设置为10 app.config[SQLALCHEMY_POOL_SIZE] 10 from flask_sqlalchemy import SQLAlchemyapp Flask(__name__) app.config[SQLALCHEMY_DATABASE_URI] mysql://user:passwordlocalhost/db_name db SQLAlchemy(app)# 创建模型类 class User(db.Model):id db.Column(db.Integer, primary_keyTrue)name db.Column(db.String(50))# 添加数据到数据库 user User(nameJohn) db.session.add(user) db.session.commit()# 查询数据 all_users User.query.all()# 更新数据 user User.query.filter_by(nameJohn).first() user.name Jane db.session.commit()# 删除数据 user User.query.filter_by(nameJane).first() db.session.delete(user) db.session.commit()3、flask http 连接池 Flask 本身并不提供内置的 HTTP 连接池功能但可以使用第三方库来实现在 Flask 中使用 HTTP 连接池。其中一个常用的库是 urllib3它提供了高级的连接池管理功能。 示例在 Flask 中使用 urllib3 来创建和管理 HTTP 连接池 安装pip install urllib3 from flask import Flask import urllib3app Flask(__name__) 使用 urllib3.PoolManager() 创建了一个连接池管理器对象 http然后使用 http.request() 方法发送了一个 GET 请求。 您可以根据需要进行配置和自定义例如设置最大连接数、超时时间、重试策略等。以下是一个示例展示了如何进行自定义设置 app.route(/index_1) def index_1():http urllib3.PoolManager()response http.request(GET, http://api.example.com)return response.data 对连接池进行了一些自定义配置包括最大连接数、每个连接的最大数量、连接和读取的超时时间以及重试策略。 使用 urllib3 可以更好地控制和管理 HTTP 连接提高 Flask 应用程序的性能和效率。 app.route(/index_2) def index_2():http urllib3.PoolManager(num_pools10, # 最大连接数maxsize100, # 每个连接的最大数量timeouturllib3.Timeout(connect2.0, read5.0), # 连接和读取的超时时间retriesurllib3.Retry(total3, backoff_factor0.1, status_forcelist[500, 502, 503, 504]) # 重试策略)response http.request(GET, http://api.example.com)return response.data
http://www.hkea.cn/news/14278318/

相关文章:

  • 太仓智能网站开发深圳市建设工程交易服务
  • 设计外贸网站医院网站设计怎么做
  • 建设厅国网查询网站长沙网
  • 推广型网站建设模板做网站的回扣
  • 外贸企业网站对外贸的重要性wordpress扫码付费可见
  • 深圳企业网站建设推荐公司dw做网站简单吗
  • 上市设计网站泸州市建设厅网站
  • 网站开发维护合同聊城做网站公司聊城博达
  • 网站设计专业有前途吗宁波网站建设招商加盟
  • 深圳宝安高端网站建设公司阿里云服务器部署网站
  • 中国建工网校官网重庆网站seo建设
  • 金华市东阳市建设局网站国外过期域名查询网站
  • 建设部网站如何登录监理工程师建百度网站
  • 兰州建设局网站网站建设衤金手指花总十五
  • 华强北做电子网站赣州快车微信公众号
  • 需要锦州网站建设网站建站服务的公司
  • 上海网站建设过程汕头网站建设工作
  • 怎样做网站优化排名视频网站X站H站搭建建设
  • 山东住房和建设庭网站wordpress主题广告
  • 不用下载就能看的网站的浏览器wordpress 百度
  • 青岛哪里可以建网站绵阳做网站的公司有哪些
  • 银川做淘宝网站的网站流量统计数据库设计
  • 快手流量推广网站成都软件外包开发
  • 创建qq网站吗dw建网站
  • 空间站 对接如何做能切换语言的网站
  • 谷歌广告投放教程推广优化公司网站
  • 怎么套模板 网站nike定制在哪个app
  • 定制网站与模板网站苏州电商系统开发
  • 汕头搭建建站安阳论坛最新消息
  • 在线游戏网站南宁工程造价建设信息网站