线程和进程

1.Threading 包

threading封装了相当多的线程操作

  • enumerate()转为序列字典的时候会将所有线程打印出来,

  • start()方法会让线程开始进行

  • join()方法会进行阻塞

import threading
import time


def func1():
    print("func1")
    time.sleep(1)


def func2():
    print("func2")
    time.sleep(1)


if __name__ == "__main__":
    print("当前线程的信息", threading.enumerate())
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    print("当前线程的信息", threading.enumerate())
    t1.start()
    t2.start()
    print("当前线程的信息", threading.enumerate())
    t1.join()
    t2.join()
    print("当前线程的信息", threading.enumerate())



out:
当前线程的信息 [<_MainThread(MainThread, started 12236)>]
当前线程的信息 [<_MainThread(MainThread, started 12236)>]
func1
func2
当前线程的信息[<_MainThread(MainThread, started 12236)>, <Thread(Thread-1, started 14100)>, <Thread(Thread-2, started 2712)>]
当前线程的信息 [<_MainThread(MainThread, started 12236)>]

线程是无序的:

import threading
import time


def func1():
    for i in range(10):
        print("func1")
        time.sleep(1)


def func2():
    for i in range(10):
        print("func2")
        time.sleep(1)


if __name__ == "__main__":
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    t1.start()
    t2.start()

输出会有部分内容,func2func1前面

2.互斥锁

对需要线程间共享的数据,使用锁来保证数据不被脏读、重复读之类的问题。

先看一下不加锁的问题所在:

import threading

num = 0


def func1():
    global num
    for i in range(1000000):
        num += 1
    print("func1处理结束:",num)


def func2():
    global num
    for i in range(1000000):
        num += 1
    print("func2处理结束:",num)


if __name__ == "__main__":
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("结束:", num)

out:
func1处理结束: 1002210
func2处理结束: 1175871
结束: 1175871

这里就发生了数据的重复读。

这里给资源加上锁就不会有这样的问题了。

先获取锁,然后加锁,然后解锁。就三步操作。

锁一定要在线程start()之前获取

import threading


num = 0


def func1():
    global num
    for i in range(100000):
        lock.acquire()
        num += 1
        lock.release()
    print("func1处理结束:",num)


def func2():
    global num
    for i in range(100000):
        lock.acquire()
        num += 1
        lock.release()
    print("func2处理结束:",num)


if __name__ == "__main__":
    lock = threading.Lock()
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("结束:", num)

out:
func1处理结束: 147648
func2处理结束: 200000
结束: 200000

对于读取同一个数据的线程,所有的线程都需要加锁。

锁会影响执行效率

3.死锁

就是你给我笔我就把我的本子给你,

你说你给我本子我就把我的笔给你。

两个人都无法获取,都无法释放。

代码如下:

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        mutexA.acquire()
        print(self.name," --- do 1 up ------")
        time.sleep(1)
        mutexB.acquire()
        print(self.name," --- do 1 down ------")
        mutexB.release()
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        mutexB.acquire()
        print(self.name," --- do 2 up ------")
        time.sleep(1)
        mutexA.acquire()
        print(self.name," --- do 2 down ------")
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()


out:
Thread-1  --- do 1 up ------
Thread-2  --- do 2 up ------

输出只有这两行。

因为两个线程都在等待对方释放锁,然后再自己释放锁。但是条件都不成立

产生了==死锁==。

4.GIL 锁

多线程和多进程是不一样的。

多进程是真正的并行,而多线程是伪并行,实际上他只是交替执行。

是什么导致多线程,只能交替执行呢?是一个叫 GIL(Global Interpreter Lock ,全局解释器锁)的东西。

GIL 的概念:

任何 Python 线程执行前,必须先获得 GIL 锁,然后,每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行。这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁, 所以,多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个 核

这个是 Python 的解释器 CPython 引入的概念。还有其他解释器。但是默认的认为 Python == CPython

默许了 Python 有 GIL 锁这个东西。

避免 GIL 锁的方法:

  • 使用多进程代替
  • 不使用 CPython

5.线程之间进行通信

5.1 队列

queue.Queue()是实现的一个队列。

put可以放入,get可以取,qsize可以获取长度

如果队列为空,则会一直阻塞,不会结束。

如果不想阻塞,可以给get添加timeout参数

会抛出异常

队列初始化的时候可以设置maxsize参数定义最大长度,限制put

超出长度的时候一样是会阻塞,也可以添加 timeout 参数。

接下来实现经典的生产者消费者问题:

import random
import threading
import time
from queue import Queue

q = Queue()

def producer():
    # 生产骨头
    count = 0
    while True:
        count += 1
        s = "骨头%s号"%count
        print("生产了", s)
        q.put(s)
        time.sleep(random.random())


def dog():
    # 消费骨头
    num = 0
    while True:
        print(f"小狗{num}吃掉了{q.get()}")
        time.sleep(random.random())
        num += 1


if __name__ == '__main__':
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=dog)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

out:
生产了 骨头1号
小狗0吃掉了骨头1号
生产了 骨头2号
小狗1吃掉了骨头2号
生产了 骨头3号
小狗2吃掉了骨头3号
生产了 骨头4号
小狗3吃掉了骨头4号
生产了 骨头5号
小狗4吃掉了骨头5号
生产了 骨头6号
小狗5吃掉了骨头6号
生产了 骨头7号
小狗6吃掉了骨头7号
生产了 骨头8号
生产了 骨头9号
小狗7吃掉了骨头8号
生产了 骨头10号
生产了 骨头11号
小狗8吃掉了骨头9号
小狗9吃掉了骨头10号
小狗10吃掉了骨头11号
生产了 骨头12号
生产了 骨头13号
生产了 骨头14号
小狗11吃掉了骨头12号
生产了 骨头15号
生产了 骨头16号
小狗12吃掉了骨头13号
小狗13吃掉了骨头14号
生产了 骨头17号
生产了 骨头18号
生产了 骨头19号
小狗14吃掉了骨头15号
小狗15吃掉了骨头16号
生产了 骨头20号

可以看到有时候生产了好几个

有时候一次吃掉好几个

5.2 线程池

在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化 的数量的线程,放到队列中,让过来的任务立刻能够使用,就形成了线程池。

创建线程池通过concurrent.futures库中的ThreadPoolExecutor实现的。

future对象:在未来的某一时刻完成操作的对象。 submit方法可以返回一个future对象。

线程池就是,已创建的线程一直在(初始化的时候就创建了),任务来了就挑一个跑。

是用来限制线程数量的

submit会把任务放入线程池中,返回 future 对象

done会判断 future 是否执行完成

result会获取阻塞主线程,直到获取返回值才结束阻塞。

import threading
from concurrent.futures import ThreadPoolExecutor
import time

# 制定最多运行N个线程
ex = ThreadPoolExecutor(max_workers=3)


def func(num):
    print("线程名:",threading.current_thread(),"我的编号为",num)
    time.sleep(1)
    return "我是返回值"


# 返回future对象
f = ex.submit(func, 1)
print(f.done())
time.sleep(1.5)
print(f.done())
print("done结束")
print(f.result())
print("done结束,result结束")

out:
线程名: <Thread(ThreadPoolExecutor-0_0, started daemon 12200)> 我的编号为 1
False
True
done结束
我是返回值
done结束,result结束

限制的 max_workers,最多只有两个一起执行。可以一起放,但是上面的不结束下面的不执行

使用 map 方法替换 submit 方法

map 会将可迭代的对象进行迭代后传入方法汇总,在将方法放入池子内部

map 会返回一个生成器,在函数的任务结束后生成。返回值是严格按照传入的顺序返回的

生成器的值为 return 后的返回值。

import threading
from concurrent.futures import ThreadPoolExecutor
import time

# 制定最多运行N个线程
ex = ThreadPoolExecutor(max_workers=2)


def func(num):
    print("线程名:",threading.current_thread(),"我的编号为",num)
    time.sleep(1)
    return num


ret = ex.map(func,[i for i in range(10)])

for i in ret :
    print("返回值是:",i)


out:
线程名: <Thread(ThreadPoolExecutor-0_0, started daemon 3372)> 我的编号为 0
线程名: <Thread(ThreadPoolExecutor-0_1, started daemon 11292)> 我的编号为 1
线程名:返回值是:  0
<Thread(ThreadPoolExecutor-0_0, started daemon 3372)> 我的编号为 2
线程名: <Thread(ThreadPoolExecutor-0_1, started daemon 11292)> 我的编号为 3
返回值是: 1
线程名: 返回值是: 2
<Thread(ThreadPoolExecutor-0_0, started daemon 3372)> 我的编号为 4
线程名:返回值是: 3
 <Thread(ThreadPoolExecutor-0_1, started daemon 11292)> 我的编号为 5
线程名:返回值是: 4
 <Thread(ThreadPoolExecutor-0_0, started daemon 3372)> 我的编号为 6
线程名:返回值是: 5
 <Thread(ThreadPoolExecutor-0_1, started daemon 11292)> 我的编号为 7
线程名: 线程名: <Thread(ThreadPoolExecutor-0_0, started daemon 3372)> 我的编号为返回值是: 6
返回值是: 7
<Thread(ThreadPoolExecutor-0_1, started daemon 11292)> 我的编号为 8
 9
返回值是: 8
返回值是: 9

使用as_completed这个函数是为submit而生的

你总想通过一种办法来解决submit后啥时候完成的吧 , 而不是一次次调用future.done或者使用future.result吧。 concurrent.futures.as_completed(fs, timeout=None) 返回一个生成器,在迭代过程中会阻塞。 直到线程完成或者异常时,产生一个Future对象。 同时注意, map方法返回是有序的, as_completed是那个哪个先完成/失败就返回。

wait 是阻塞函数,第一个参数和as_completed一样, 一个可迭代的future序列,返回一个元组 ,包含 2 个set , 一个完成的,一个未完成的

最后说一下回调:add_done_callback(fn),回调函数是在调用线程完成后再调用的,在同一个线程中.

6.进程间的通信

6.1 队列

主进程创建的子进程是无法获取主进程中的数据的。

进程间的资源是不共享的,无法使用之前的普通的队列 Queue 来实现。

要使用新的队列

import multiprocessing
q = multiprocessing.Queue()

放入:put

取出:get

上述两个,如果手动指定方法的 block 参数为 False

那么,当之前需要阻塞的情况会立马抛出异常

1)如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了 timeout,则会等待 timeout 秒, 若还没读取到任何消息,则抛出"Queue.Empty"异常; 2)如果 block 值为 False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常; Queue.get_nowait():相当 Queue.get(False); Queue.put(item,[block[,

timeout]]):将 item 消息写入队列,block 默认值为 True; 1)如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入, 此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了 timeout,则会 等待 timeout 秒,若还没空间,则抛出"Queue.Full"异常; 2)如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常; Queue.put_nowait(item):相当 Queue.put(item, False);

判断是否为空:empty

判断是否满:full

数量:qsize

例子:

import multiprocessing
import random
import time

q = multiprocessing.Queue()


def producer(q):
    # 生产骨头
    count = 0
    while True:
        count += 1
        s = "骨头%s号"%count
        print("生产了", s)
        q.put(s)
        time.sleep(random.random())


def dog(q):
    # 消费骨头
    num = 0
    while True:
        print(f"小狗{num}吃掉了{q.get()}")
        num += 1
        time.sleep(random.random())


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=dog,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

out:
生产了 骨头1号
小狗0吃掉了骨头1号
生产了 骨头2号
小狗1吃掉了骨头2号
生产了 骨头3号
小狗2吃掉了骨头3号
生产了 骨头4号
小狗3吃掉了骨头4号
生产了 骨头5号
小狗4吃掉了骨头5号
生产了 骨头6号
小狗5吃掉了骨头6号
生产了 骨头7号
小狗6吃掉了骨头7号
生产了 骨头8号
小狗7吃掉了骨头8号
生产了 骨头9号
生产了 骨头10号
小狗8吃掉了骨头9号
生产了 骨头11号

==在进程间进行共享数据的时候,必须将数据作为参数传入进程中,不能使用全局变量。==

==这一点是和线程不同的==

6.2 进程池

from concurrent.futures import ProcessPoolExecutor
import time


def func1():
    print("A")
    time.sleep(1)


def func2():
    print("B")
    time.sleep(1)


if __name__ == '__main__':
    p = ProcessPoolExecutor()
    f1 = p.submit(func1)
    f2 = p.submit(func2)
    print(f1)
    print(f2)
    p.shutdown()
    print(f1)
    print(f2)


out:
<Future at 0x245a4dd5708 state=running>
<Future at 0x245a4e1c988 state=pending>
A
B
<Future at 0x245a4dd5708 state=finished returned NoneType>
<Future at 0x245a4e1c988 state=finished returned NoneType>

shutdown方法会调用joinclose,不用一个个执行

submit方法一样是返回的一个future,获取返回值用future.result()方法来获取

concurrent.futures模块的基础是ExectuorExecutor是一个抽象类,它不能被直接使用。

但是它提供的两个子类ThreadPoolExecutorProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。

我们可以将相应的tasks直接放入线程池/进程池,不需要维 护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

Future这个概念,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

Executor中定义了 submit() 方法,这个方法的作用是提交一个可执行的回调task ,并返回一个future实例。

future对象代表的就是给定的调用。 submit() 方法实现进程池/线程池

p.submit(task,i)默认为异步执行, p.submit(task,i).result()即同步执行

进程池之间的的通信

import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import random
import time


def producer(q):
    # 生产骨头
    count = 0
    while True:
        count += 1
        s = "骨头%s号" % count
        print("生产了", s)
        q.put(s)
        time.sleep(random.random())


def dog(q):
    # 消费骨头
    num = 0
    while True:
        print(f"小狗{num}吃掉了{q.get()}")
        num += 1
        time.sleep(random.random())


if __name__ == '__main__':
    # 进程池之间的通信不能使用下面这个Queue
    # q = multiprocessing.Queue()
    # 要使用这个队列
    q = multiprocessing.Manager().Queue()
    p = ProcessPoolExecutor()
    p1 = p.submit(producer, q)
    p2 = p.submit(dog, q)
    p.shutdown()


out:
生产了 骨头1号
小狗0吃掉了骨头1号
生产了 骨头2号
小狗1吃掉了骨头2号
生产了 骨头3号
小狗2吃掉了骨头3号
生产了 骨头4号
小狗3吃掉了骨头4号
生产了 骨头5号
生产了 骨头6号
小狗4吃掉了骨头5号
生产了 骨头7号
小狗5吃掉了骨头6号
生产了 骨头8号
生产了 骨头9号
小狗6吃掉了骨头7号

multiprocessing.Manager().Queue(),进程池之间的通信必须使用这个队列。

7.协程

协程不会出现资源互抢的问题

协程实际上就是线程内部多个函数的不停切换执行(避免耗时操作)

协程是依赖生成器的

生成器可以暂停函数,同时切换到其他函数去执行

关键字yield

之前的生成器是为了产生数据使用的(惰性查询,惰性生成数据)

生成器是为了接受数据存在的,那么它就变成了协程

协程中最重要的是send方法,用来传输数据

7.1 实现简单的协程

import time

def func1():
    print("A")
    yield
    time.sleep(1)

def func2():
    print("B")
    yield
    time.sleep(1)

# 产生生成器
g1 = func1()
g2 = func2()

next(g1)
next(g2)
next(g1)

out:

A
B
Traceback (most recent call last):
  File "D:/PyCharm Workspace/Python进阶/test.py", line 19, in <module>
    next(g1)
StopIteration

A 和 B 同时打印,然后报错。这个不是协程,是生成器,因为没有接受数据。

那么给生成器加一个装饰器,在调用的时候预激活就行了。

预激活必须使用send(None)next()

7.2 结合装饰器

import time
from functools import wraps

def coroutine(func):
    @wraps(func)
    def primer(*args,**kwargs):
        gen = func(*args,**kwargs)
        next(gen)
        return gen
    return primer


@coroutine
def func1():
    print("A   1")
    yield
    time.sleep(1)
    print("A   2")
    return "A完毕"

@coroutine
def func2():
    print("B   1")
    yield
    time.sleep(1)
    print("B   2")
    return "B完毕"


# 产生生成器
g1 = func1()
g2 = func2()

try:
    g1.send(None)
except StopIteration as e:
    print(e)

try:
    g2.send(None)
except StopIteration as e:
    print(e)

out:
A   1
B   1
A   2
A完毕
B   2
B完毕

submit是默认异步执行,后面可以添加.result()来指定立即执行

7.3 使用 greenlet

​ 需要使用switch()手动切,

​ 缺点

  • 如果方法很多要切,那么就很麻烦。

  • 没有时间并发

7.4 使用 gevent

基于greenlet封装。

里面有gevent.sleep(1)来休眠。不支持time模块。

join()就执行了,但是所有都执行了,因为所有方法都在一个线程里。

spawn会切换。

尽量不用join,用gevent.joinall([gevent.spawn(func1),……])

会一直阻塞,除非所有的全跑完。

为了支持time

from gevent import monkey

monkey.patch_all()