Learning Man's Blog

Python 多线程、多进程、协程(未完结)

字数统计: 2.1k阅读时长: 7 min
2019/01/31 Share

现在是不是该又重新投入py3的怀抱了(=@__@=)?

阻塞/非阻塞、同步/异步

http://www.cnblogs.com/Anker/p/3254269.html

例子:

老张爱喝茶,废话不说,煮开水.
出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶).
1 老张把水壶放到火上,立等水开.(同步阻塞)
老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有.(同步非阻塞)
老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶.水开之后,能大声发出嘀~~~~的噪音.
3 老张把响水壶放到火上,立等水开.(异步阻塞)
老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶.(异步非阻塞)
老张觉得自己聪明了.

所谓同步异步,只是对于水壶而言.
普通水壶,同步;响水壶,异步.
虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了.这是普通水壶所不能及的.
同步只能让调用者去轮询自己(情况2中),造成老张效率的低下.

所谓阻塞非阻塞,仅仅对于老张而言.
立等的老张,阻塞;看电视的老张,非阻塞.
情况1和情况3中老张就是阻塞的,媳妇喊他都不知道.虽然3中响水壶是异步的,可对于立等的老张没有太大的意义.所以一般
异步是配合非阻塞使用的,这样才能发挥异步的效用.

——来源网络,作者不明.

并发/并行

Erlang 之父 Joe Armstrong 用一张5岁小孩都能看懂的图解释了并发与并行的区别

并发是两个队列交替使用一台咖啡机;并行是两个队列同时使用两台咖啡机;如果串行,一个队列使用一台咖啡机。

并发和并行都可以是很多个线程,就看这些线程能不能同时被(多个)cpu执行,如果可以就说明是并行,而并发是多个线程被(一个)cpu 轮流切换着执行。

异步

twisted

asyncio

协程

又名 微线程 纤程

协程,与线程的抢占式调度不同,它是协作式调度。协程也是单线程,但是它能让原来要使用异步+回调方式写的非人类代码,可以用看似同步的方式写出来。

常见的库

greenlet

eventlet

stackless

多线程

由于在python中,存在了GIL,也就是全局解释器锁,从而在每次进行获得cpu的时候,同时只有一个线程获得了cpu的运行,在这个方面可以认为是线程安全的,但是在线程运行的时候,是共享内存的,共享相同的数据信息,从而这个时候python的线程就不那么安全了。在python中,要保证数据的正确性,并且自己对数据进行控制,对数据进行加锁并且自己释放锁。

python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

GIL

Python的Thread是真实操作系统的Thread,两者没有差别。在Linux下是由pthreads实现的,而在windows下是由Windows threads实现的,并通过操作系统调度算法进行调度。为了充分利用CPU,python计算当前已执行了多少数量的指令达到阈值就会立即(100 ticks)来释放GIL。



锁类型

在python的threading模块中,提供了三种锁,在进行锁的操作的时候,必须在每个线程中,自己获取锁,然后自己释放锁,否则会造成一直在等待,也可以称之为死锁。

  • threading.Lock
  • threading.RLock
  • threading.Semaphore
  • threading.Condition
  • threading.Event
  • threading.Timer
  • threading.Barrier

Lock 互斥锁、同步锁

如果分出一个线程并上锁后再调用其他函数,被调用的函数中也再使用Lock()示例就会造成死锁(例如在多个互斥锁存在时候,A上锁并调用B,B需要上锁但是需要A释放,B不能结束导致A也无法结束,锁死)

Rlock 递归锁

它与Lock的区别在于RLock在同一个线程内可以重复申请,所以它可以被用于递归操作中,如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁

import threading


n = 2
max_n = 10
x = 0
# 修改为threading.Lock()即会造成死锁阻塞
lock = threading.RLock()


def countup(m):
    global x
    if m > 0:
        lock.acquire()
        x += 1
        countup(m - 1)
        lock.release()
        print '%s: %s\r\n' % (threading.currentThread().getName(), x)
    else:
        return


if __name__ == '__main__':
    for i in range(n):
        t = threading.Thread(target=countup, args=(max_n,))
        t.start()

BoundedSemaphore 信号量

线程锁锁住一个线程在运行和修改数据,而信号量可以自己控制同一时刻运行几个线程和几个线程修改数据,也就是设置最大同时(并发)运行的线程数

import threading
import time


def run(n):
    # 获取信号量
    semaphore.acquire()
    print('task %s is running' % n)
    # 暂停1s方便看出一次运行几个线程
    time.sleep(1)
    # 释放信号量
    semaphore.release()


if __name__ == '__main__':
    # 生成信号量实例并设置信号量为5
    semaphore = threading.BoundedSemaphore(5)
    # 开启50个线程
    for i in range(50):
        t = threading.Thread(target=run, args=(i,))
        t.start()

Condition 条件变量

除了能提供RLock()或Lock()的方法外,
还提供了 wait()、notify()、notifyAll()方法。

  • acquire([timeout])/release():调用关联的锁的相应方法。
  • wait([timeout]):线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)
    才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
    调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.
  • notify(n=1):通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。
  • notifyAll():如果wait状态线程比较多,notifyAll的作用就是通知所有线程(这个一般用得少)

Event 事件

在进行多线程的时候,可以判断一个事件发生,然后触发做另外的事情,从而可以使用event

在使用event的时候,clear表示将flag设置为false,set表示设置为true,wait表示在false的时候,一直等待,从而当producter没有数据的时候,consumer一直在等待。

import threading
import time
import Queue


lock = threading.Lock()
queue = Queue.Queue(10)
event = threading.Event()
threads = []
threadsc = []


def producter(name,queue,lock):
    event.clear()
    print '%s start to product...' % name
    queue.put('something')
    time.sleep(3)
    print 'product something'
    event.set()
    event.wait()

def consumer(name,queue,lock):
    print '%s start to consume...' % name
    event.wait()
    queue.get()
    print 'consume something'
    event.set()


if __name__ == '__main__':
    t = threading.Thread(target=producter,args=('kel%s' % i,queue,lock))
    threads.append(t)
    threads[i].start()
    t = threading.Thread(target=consumer,args=('smile%s' % i,queue,lock))
    threadsc.append(t)
    threadsc[i].start()

Barrier 栅栏

Barrier常用来实现这样的线程同步,多个线程运行到某个时间点以后每个线程都需要等着其他线程都准备好以后再同时进行下一步工作。

应用场景:并发初始化 —— 所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据,检查,如果这些工作没完成就不能正常工作运行。

多进程

共享资源

在使用多进程的过程中,最好不要使用共享资源,Multiprocessing类中共享资源可以使用3种方式

  • Queue
  • Array
  • Manager

Queue、Array无法配合Pool一起使用,但Manager可以。

参考资料

  1. http://3xp10it.cc/python/2016/08/30/python多进程多线程/
  2. ※通读:http://python.jobbole.com/88291/
  3. GIL讲解:http://cenalulu.github.io/python/gil-in-python/
  4. Python锁讲解:https://www.cnblogs.com/kellyseeme/p/5525017.html
  5. 多进程资源共享:https://thief.one/2016/11/24/Multiprocessing共享资源/
CATALOG
  1. 1. 阻塞/非阻塞、同步/异步
  2. 2. 并发/并行
  3. 3. 异步
    1. 3.1. twisted
    2. 3.2. asyncio
  4. 4. 协程
    1. 4.1. 常见的库
      1. 4.1.1. greenlet
      2. 4.1.2. eventlet
      3. 4.1.3. stackless
  5. 5. 多线程
    1. 5.1. GIL
    2. 5.2. 锁类型
      1. 5.2.1. Lock 互斥锁、同步锁
      2. 5.2.2. Rlock 递归锁
      3. 5.2.3. BoundedSemaphore 信号量
      4. 5.2.4. Condition 条件变量
      5. 5.2.5. Event 事件
      6. 5.2.6. Barrier 栅栏
  6. 6. 多进程
    1. 6.1. 共享资源
  7. 7. 参考资料