南山网站建设公司乐云seo,怀化建网站,织梦网站版权,青岛百度快速排名优化一、多线程
threading 模块
threading 模块对象
对象描述Thread表示一个执行线程的对象Lock锁原语对象#xff08;与 thread 模块中的锁一样#xff09;RLock可重入锁对象#xff0c;使单一线程可以#xff08;再次#xff09;获得已持有的锁#xff08;递归锁#x…一、多线程
threading 模块
threading 模块对象
对象描述Thread表示一个执行线程的对象Lock锁原语对象与 thread 模块中的锁一样RLock可重入锁对象使单一线程可以再次获得已持有的锁递归锁Condition条件变量对象使得一个线程等待另一个线程苗族特定的条件比如改变状态或某个数据值Event添加变量的通用版本任意数量的线程等待某个时间的发生在该事件发生后所有线程将被激活Semaphore为线程键共享的有限资源提供一个 计算器信号量)如果没有可用资源时会被阻塞BoundedSemaphore与 Semaphore 相似不过不允许超过初始值Timer与 Thread 相似在运行前要等待一段时间Barrier创建一个障碍必须达到指定数量的线程后才可以继续
守护线程
1、thread 模块不支持守护线程当主线程退出后所有子线程也会退出不管其是否在工作
2、threading 模块支持守护线程等待一个客户端请求服务的服务器如果客户端没有请求守护线程是空闲的如果把一个线程设置为守护线程就表示这个线程不重要的。进程退出时不需要等待这个线程执行完成。
如果主线程退出时不需要等待某些子线程完成可以将子线程设置为 守护线程标记为真时表示该线程不重要。在启动线程前执行 thread.daemonTrue 可以设置守护线程检查线程的守护状态也可以判断它。
Thread 类
threading 模块的 Thread 类是主要的执行对象下面是 Thread 对象的属性和方法列表
属性描述方法描述name线程名start()开始执行该线程ident线程的标识符run()定义线程功能的方法通常在子类中被应用开发者重写daemon布尔值表示这个线程是否是守护线程join(timeoutNone)直至启动的线程终止之前一直挂起除非给出 timeout否则一直阻塞
Thread(groupNone, targetNone, nameNone, agrs(), kwargs{}, verboseNone, daemonNone)# 实例化一个线程对象需要一个可调用的 target一般是函数及其参数 args元组或 kwargs
# 也可以传递 name 或 group 参数daemon 将会设定 thread.daemon 属性/标志创建线程的三种方法
创建 Thread 实例传递给它一个函数
import threading
from time import sleep, ctimeloops [4, 2]
def loop(nloop, nsec):print(loop 函数开始执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})sleep(nsec)print(loop 函数结束执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})def main():print(主函数开始执行, ctime())threads []nloops range(len(loops))for i in nloops:t threading.Thread(targetloop, args(i, loops[i]))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print(程序结束, ctime())if __name__ __main__:main()上述程序将生成两个线程将其添加到一个列表中循环启动 start()。join() 方法将会程序挂起会等待所有线程结束或超时。因此要比 等待释放的无限循环更加清晰。
运行结果如下
主函数开始执行 Sat Sep 7 17:31:40 2019
loop 函数开始执行 0时间: Sat Sep 7 17:31:40 2019
loop 函数开始执行 1时间: Sat Sep 7 17:31:40 2019
loop 函数结束执行 1时间: Sat Sep 7 17:31:42 2019
loop 函数结束执行 0时间: Sat Sep 7 17:31:44 2019
程序结束 Sat Sep 7 17:31:44 2019创建 Thread 的实例传给它一个可调用的类实例
import threading
from time import sleep, ctimeloops [4, 2]class ThreadFunc:def __init__(self, func, args, name):self.name nameself.func funcself.args argsdef __call__(self, *args, **kwargs):当调用 ThreadFunc() 时会自动执行 fun()self.func(*self.args)def loop(nloop, nsec):print(loop 函数开始执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})sleep(nsec)print(loop 函数结束执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})def main():print(主函数开始执行, ctime())threads []nloops range(len(loops))for i in nloops: # 创建 Thread 的实例传给它一个可调用的类实例t threading.Thread(targetThreadFunc(loop, (i, loops[i]), loop.__name__))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print(程序结束, ctime())if __name__ __main__:main()派生 Thread 的子类并创建子类的实例
自定义的类要继承 threading.Thread构造函数必须先调用其基类的构造函数__call__() 在子类中必须要写 run()
import threading
from time import sleep, ctimeloops [4, 2]class MyThread(threading.Thread):def __init__(self, func, args, name):threading.Thread.__init__(self)self.name nameself.func funcself.args argsdef run(self, *args, **kwargs):self.func(*self.args)def loop(nloop, nsec):print(loop 函数开始执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})sleep(nsec)print(loop 函数结束执行 %(nloop)s时间: %(ctime)s % {nloop: nloop, ctime: ctime()})def main():print(主函数开始执行, ctime())threads []nloops range(len(loops))for i in nloops:t MyThread(loop, (i, loops[i]), loop.__name__)threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print(程序结束, ctime())if __name__ __main__:main()threading 模块的其他函数
active_count()当前活动的 Thread 对象个数current_thread返回当前的 Thread 对象enumerate()返回当前活动的 Thread 对象列表settrace(func)为所有线程设置一个 trace 函数setprofile(func)为所有线程设置一个 profile 函数stack_size(size0)返回新建线程的栈大小或为后续创建的线程设定栈的大小为 size
启动和停止线程
import time
import threadingdef countdown(n):while n 0:print(T-minus%stime%s % (n, time.ctime()))n - 1time.sleep(1)if t.is_alive():print(Still running...)else:print(Completed)print(主线程...)if __name__ __main__:t threading.Thread(targetcountdown, args(5, ))t.start()t.join()判断一个线程是否存活可以调用 is_alive() 方法解释器会在所有线程都结束后才执行剩余的代码如果需要长时间运行的线程 或者一直运行的后台任务可以使用后台线程
threading.Thread(targetcountdown, args(5, ), daemonTrue)后台线程无法等待这些线程会在主线程终止时自动销毁。但是你也不能对线程做额外的高级操作如发送信号调整它的调度终止线程等。
join() 方法
join() 方法将悬挂当前子线程直至所有子线程结束。
import time, threadingdef loop():print(thread %s is running... % threading.current_thread().name)n 0while n 5:n 1print(thread %s %s % (threading.current_thread().name, n))time.sleep(1)print(thread %s ended. % threading.current_thread().name)if __name__ __main__:print(thread %s is running % threading.current_thread().name)t threading.Thread(targetloop, nameLoopThread)t.start()t.join()print(thread %s ended % threading.current_thread().name)name指定子线程名字不指定默认为 thread-1、thread-2MainThread 为主线程threading.current_thread().name获取当前线程的实例名字
可以看到 join() 它将子线程添加到当前主线程中并等待子线程终止才允许它后面的代码
thread MainThread is running
thread LoopThread is running...
thread LoopThread 1
thread LoopThread 2
thread LoopThread 3
thread LoopThread 4
thread LoopThread 5
thread LoopThread ended.
thread MainThread ended停止线程
如果线程执行一些如 I/O 这样的阻塞操作通过轮询来终止线程将使得线程间的协调变得非常棘手。如如果一个线程一直阻塞在一个 I/O 操作上就永远无法返回检查自己是否已经被结束了。要正确处理这些问题需要利用 超时循环 来小心操作线程
class IOTask:def terminate(self):self._running Falsedef run(self, sock):# sock is a socketsock.settimeout(5) # set timeout periodwhile self._running:try:data sock.recv(8192)breakexcept socket.timeout:continue# continued processing...# Terminatedreturn判断线程是否启动
启动了一个线程但是你想知道它是否真的已经开始了由于线程是独立运行的且状态不可预测的。如果程序中其他线程 要通过判断某个线程的状态来确定自己的下一步操作这就会显得很棘手。
我们可以使用 Event 对象来解决这个问题Event 对象包含一个可由线程设置信号的标志允许线程等待某事的发生。
初始时标志为假标志未假时这个线程会被一直阻塞直至标志为真
from threading import Thread, Event
import timedef countdown(n, started_evt):print(countdown 开始...)started_evt.set() # 设置标志为真while n 0:print(T-minus, n)n - 1time.sleep(1)started_evt Event() # 创建 Event 对象print(启动 countdown 函数)
t Thread(targetcountdown, args(5, started_evt))
t.start()# 等待线程开始
started_evt.wait()
print(countdown is running...)可以看到 countdown is running... 一直在 countdown 开始... 输出之后才打印这是因为 event 在协调线程。使得主线程要等 countdown() 函数输出启动信息后才继续执行。
启动 countdown 函数
countdown 开始...
T-minus 5
countdown is running...
T-minus 4
T-minus 3
T-minus 2
T-minus 1如果你将 started_evt.set() 注销掉再运行程序会发现程序一直被阻塞…这是因为标志为假线程被阻塞。
三、协程
Python 协程只能运行在时间循环中但是一旦事件循环运行又会阻塞当前任务。因此 动态添加任务/协程
需要再开一个线程这个线程主要任务时运行事件循环因为是无限循环会阻塞当前线程
import asyncio
from threading import Threadasync def production_task():i 0while True:# 将 consumption 这个协程每秒注册一个运行到线程中的循环thread_loop 每秒会获取一个一直打印 i 的无限循环任务# run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用asyncio.run_coroutine_threadsafe(consumption(i), thread_loop)await asyncio.sleep(1) # 必须加 awaiti 1async def consumption(i):while True:print(我是第{}任务.format(i))await asyncio.sleep(1)def start_loop(loop):# 运行事件循环loop 以参数的形式传递进来运行asyncio.set_event_loop(loop)loop.run_forever()thread_loop asyncio.new_event_loop() # 获取一个事件循环
run_loop_thread Thread(targetstart_loop, args(thread_loop,)) # 将每次事件循环运行在一个线程中防止阻塞当前主线程
run_loop_thread.start() # 运行线程同时协程事件循环也会运行advocate_loop asyncio.get_event_loop() # 将生产任务的协程注册到这个循环中
advocate_loop.run_until_complete(production_task()) # 运行次循环我是第0任务
我是第1任务
我是第0任务
我是第1任务
我是第0任务
我是第2任务
我是第3任务
我是第1任务
我是第0任务四、五种 unix IO 模型
epoll 并不代表一定比 select 好效率高
在并发高的情况下连接活跃度不是很高的情况下epoll 比 select 好比如Web 连接用户连接可能随时断开在并发性不高同时连接活跃select 比 epoll 好比如游戏连接一般要保持持续连接
select、poll、epoll 本质还是同步阻塞的之所以能支持高并发是因为一个进程能同时监听多个文件描述符
非阻塞 IO 不一定比阻塞好因为它要一直循环检测服务器是否有数据返回如果后续的程序不依赖前面的连接其效率要高要是依赖前面的程序效率不一定要好。