[Python 多线程] Lock、阻塞锁、非阻塞锁 (八)
线程同步技术:
解决多个线程争抢同一个资源的情况,线程协作工作。一份数据同一时刻只能有一个线程处理。
解决线程同步的几种方法:
Lock、RLock、Condition、Barrier、semaphore
1)Lock 锁
锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。
当用阻塞参数设置为 False 时, 不要阻止。如果将阻塞设置为 True 的调用将阻止, 则立即返回 False;否则, 将锁定设置为锁定并返回 True。
Lock的方法:
acquire(blocking=True,timeout=-1) 加锁。默认True阻塞,阻塞可以设置超时时间。非阻塞时成功获取锁返回True,否则返回False。
当blocking设置为False时,不阻塞,同一个锁对象,其它线程可以重用,但最后都必须释放。
如果设置为True(默认True),其它试图调用锁的线程将阻塞,并立即返回False。阻塞可以设置超时时间。
release() 释放锁。可以从任何线程调用释放。已上锁的锁,会被重置为unlocked,对未上锁的锁调用,会抛RuntimeError异常: cannot release un-acquired lock。
不使用Lock的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
#不使用Lock锁的例子 import logging import threading,time logging.basicConfig(level = logging.INFO) # 10 -> 100cups cups = [] lock = threading.Lock() def worker(lock:threading.Lock,task = 100 ): while True : count = len (cups) time.sleep( 0.1 ) if count > = task: break logging.info(count) cups.append( 1 ) logging.info( "{} make 1........ " . format (threading.current_thread().name)) logging.info( "{} ending=======" . format ( len (cups))) for x in range ( 10 ): threading.Thread(target = worker,args = (lock, 100 )).start() 运行结果: INFO:root:Thread - 7 make 1. ....... INFO:root: 93 INFO:root:Thread - 5 make 1. ....... INFO:root: 95 INFO:root:Thread - 6 make 1. ....... INFO:root: 92 INFO:root:Thread - 2 make 1. ....... INFO:root: 94 INFO:root:Thread - 8 make 1. ....... INFO:root: 97 INFO:root:Thread - 10 make 1. ....... INFO:root: 96 INFO:root:Thread - 4 make 1. ....... INFO:root: 98 INFO:root:Thread - 1 make 1. ....... INFO:root: 99 INFO:root:Thread - 9 make 1. ....... INFO:root: 109 ending = = = = = = = INFO:root: 109 ending = = = = = = = INFO:root: 109 ending = = = = = = = |
还是使用前面的10个工人生产100杯子的例子, 当做到99个杯子时,10个工人都发现还少一个,都去做了一个,一共做了109个,超出了100个,就发生了不可预期的结果。
临界线判断失误,多生产了杯子。
解决方法就可以用锁,来解决资源争抢。当一个人看杯子数量时,就上锁,其它人只能等着,看完杯子后发现少一个就把这最后一个做出来,然后数量加一,解锁,其他人再看到已经有100个杯子时,就可以停止工作。
加锁的时机非常重要:看杯子数量时加锁,增加数量后释放锁。
使用Lock的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#Lock import logging import threading import time logging.basicConfig(level = logging.INFO) # 10 -> 100cups cups = [] lock = threading.Lock() def worker(lock:threading.Lock,task = 100 ): while True : if lock.acquire( False ): count = len (cups) time.sleep( 0.1 ) if count > = task: lock.release() break logging.info(count) cups.append( 1 ) lock.release() logging.info( "{} make 1........ " . format (threading.current_thread().name)) logging.info( "{} ending=======" . format ( len (cups))) for x in range ( 10 ): threading.Thread(target = worker,args = (lock, 100 )).start() 运行结果: INFO:root: 0 INFO:root:Thread - 1 make 1. ....... INFO:root: 1 INFO:root:Thread - 5 make 1. ....... INFO:root: 2 INFO:root:Thread - 6 make 1. ....... .... INFO:root:Thread - 3 make 1. ....... INFO:root: 97 INFO:root:Thread - 3 make 1. ....... INFO:root: 98 INFO:root:Thread - 4 make 1. ....... INFO:root: 99 INFO:root:Thread - 3 make 1. ....... INFO:root: 100 ending = = = = = = = INFO:root: 100 ending = = = = = = = INFO:root: 100 ending = = = = = = = ..... |
在使用了锁以后,虽然保证了结果的准确性,但是性能下降了很多。
一般来说加锁以后还要有一些功能实现,在释放之前还有可能抛异常,一旦抛出异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。
死锁解决办法:
1、使用 try..except..finally 语句处理异常、保证锁的释放
2、with 语句上下文管理,锁对象支持上下文管理。只要实现了__enter__和__exit__魔术方法的对象都支持上下文管理。
锁的应用场景:
独占锁: 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
共享锁: 如果共享资源是不可变的值时,所有线程每一次读取它都是同一样的值,这样的情况就不需要锁。
使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就变成了串行,要么排队执行,要么争抢执行。
- 加锁时间越短越好,不需要就立即释放锁。
- 一定要避免死锁。
不使用锁时,有了效率,但是结果是错的。
使用了锁,变成了串行,效率地下,但是结果是对的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import threading import time lock = threading.Lock() def work(): print ( 'working..' ) time.sleep( 0.2 ) lock.release() # 1解锁 lock.acquire() # 1上锁 print ( "get locker 1" ) threading.Thread(target = work).start() time.sleep( 1 ) lock.acquire() # 2上锁 print ( "get locker 2" ) threading.Thread(target = work).start() print ( "release locker" ) 运行结果: get locker 1 working.. get locker 2 working.. release locker |
同一个锁对象在释放后可以再次使用。
但是如果同一把锁加锁后,又被别人拿了,自己就阻塞了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import threading import time lock = threading.Lock() def work(): print ( 'working..' ) time.sleep( 0.2 ) lock.release() # 1解锁 lock.acquire() # 1上锁 print ( "get locker 1" ) lock.acquire() # 2上锁 print ( "get locker 2" ) threading.Thread(target = work).start() threading.Thread(target = work).start() print ( "release locker" ) 运行结果: get locker 1 阻塞状态.... |
阻塞锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#阻塞锁 import threading,time lock = threading.Lock() def foo(): ret = lock.acquire() print ( "{} Locked. {}" . format (ret,threading.current_thread())) time.sleep( 10 ) threading.Thread(target = foo).start() threading.Thread(target = foo).start() 运行结果: True Locked. <Thread(Thread - 1 , started 123145559191552 )> |
lock.acquire()默认设置blocking=True,两个线程使用同一个Lock锁对象,只要Thread-1线程不释放,第二个线程就无法获取锁,且会使Thread-1线程阻塞。
如果想让多个线程同时都可以使用一个锁对象,就必须使用非阻塞锁,或者第一个线程使用完锁之后立刻释放,然后第二个线程再使用。
非阻塞锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#非阻塞锁 import threading,time lock = threading.Lock() def foo(): ret = lock.acquire( False ) print ( "{} Locked. {}" . format (ret,threading.current_thread())) time.sleep( 10 ) threading.Thread(target = foo).start() threading.Thread(target = foo).start() 运行结果: True Locked. <Thread(Thread - 1 , started 123145516146688 )> False Locked. <Thread(Thread - 2 , started 123145521401856 )> Process finished with exit code 0 |
lock.acquire(False)设置blocking=False表示不阻塞,使用同一个Lock锁对象时,第二个线程仍可以使用锁,且第一个锁不会被阻塞。
非阻塞锁2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#非阻塞锁 import threading,logging,time FORMAT = '%(asctime)s\t [%(threadName)s,%(thread)d] %(message)s' logging.basicConfig(level = logging.INFO, format = FORMAT ) def worker(tasks): for task in tasks: time.sleep( 0.01 ) if task.lock.acquire( False ): #False非阻塞 logging.info( '{} {} begin to start' . format (threading.current_thread().name,task.name)) else : logging.info( '{} {} is working' . format (threading.current_thread().name,task.name)) class Task: def __init__( self ,name): self .name = name self .lock = threading.Lock() tasks = [Task( 'task={}' . format (t)) for t in range ( 5 )] for i in range ( 3 ): t = threading.Thread(target = worker,name = 'worker-{}' . format (i),args = (tasks,)) t.start() 运行结果: 2017 - 12 - 19 16 : 37 : 49 , 556 [worker - 2 , 123145390018560 ] worker - 2 task = 0 begin to start 2017 - 12 - 19 16 : 37 : 49 , 556 [worker - 1 , 123145384763392 ] worker - 1 task = 0 is working 2017 - 12 - 19 16 : 37 : 49 , 557 [worker - 0 , 123145379508224 ] worker - 0 task = 0 is working 2017 - 12 - 19 16 : 37 : 49 , 567 [worker - 2 , 123145390018560 ] worker - 2 task = 1 begin to start 2017 - 12 - 19 16 : 37 : 49 , 567 [worker - 1 , 123145384763392 ] worker - 1 task = 1 is working 2017 - 12 - 19 16 : 37 : 49 , 568 [worker - 0 , 123145379508224 ] worker - 0 task = 1 is working 2017 - 12 - 19 16 : 37 : 49 , 580 [worker - 1 , 123145384763392 ] worker - 1 task = 2 begin to start 2017 - 12 - 19 16 : 37 : 49 , 580 [worker - 2 , 123145390018560 ] worker - 2 task = 2 is working 2017 - 12 - 19 16 : 37 : 49 , 580 [worker - 0 , 123145379508224 ] worker - 0 task = 2 is working 2017 - 12 - 19 16 : 37 : 49 , 591 [worker - 1 , 123145384763392 ] worker - 1 task = 3 begin to start 2017 - 12 - 19 16 : 37 : 49 , 592 [worker - 2 , 123145390018560 ] worker - 2 task = 3 is working 2017 - 12 - 19 16 : 37 : 49 , 592 [worker - 0 , 123145379508224 ] worker - 0 task = 3 is working 2017 - 12 - 19 16 : 37 : 49 , 604 [worker - 1 , 123145384763392 ] worker - 1 task = 4 begin to start 2017 - 12 - 19 16 : 37 : 49 , 604 [worker - 2 , 123145390018560 ] worker - 2 task = 4 is working 2017 - 12 - 19 16 : 37 : 49 , 604 [worker - 0 , 123145379508224 ] worker - 0 task = 4 is working |