Python Concurrency
Posted on July 15, 2017
Tags: concurrency
- IO-bound , Slow IO, Many Connections
- Use Async
- example: web client requests/downloads(IO)
- Content from Server IO-bound , Fast IO , Limited Connections
- Use Multi-thread
- Non IO-bound, CPU calculations
- Use Multi-processing
hello
1 Threading
import threading
def fstThread(): #parent thread
= threading.Thread(target=someFunction,args=(2,))
sndThread
sndThread.start()1)
someFunction(
def someFunction(k): #child thread
print(f"{k} from {threading.current_thread().name}\n")
fstThread()#>2 from Thread-6
#>1 from MainThread
2 Multiprocessing
import multiprocessing
def fstProcess():
= multiprocessing.Process(target=someFunctionA,args=(2,))
sndProcess
sndProcess.start()1)
someFunctionA(def someFunctionA(k):
print(f"{k} from {multiprocessing.current_process().name}\n")
fstProcess()##Note using IDLE or Jupyter, the Child process shows no output
##Using python script the output would look similar to the threading output, 2 prints
2.0.0.1 Race condition
import threading
from time import sleep
= [0]
count
def inc():
= count[0]
temp 0)
sleep(0] = temp + 1
count[
= threading.Thread(target = inc, args = ())
other
other.start()
inc()print(f"count is {count[0]}") #output is sometimes 2, sometimes 1
2.0.0.2 Producer Consumer non-synchronized
import threading
from time import sleep
= []
Items = []
Commands
def consume():
while len(Commands) == 0:
pass #stuck in this loop waiting for Items.append loop to end
print(f"item is {Items}")
def produce():
= threading.Thread(target=consume,args=())
consumer
consumer.start()for i in range(10):
Items.append(i)"go")
Commands.append( produce()
2.0.0.3 Producer Consumer synchronized
from queue import Queue
= Queue()
queue
def synchronized_consume():
while True:
print('got an item:', queue.get())
#
queue.task_done()
def synchronized_produce():
= threading.Thread(target=synchronized_consume, args=())
consumer = True #Parent or "synchronized_produce()" process will not wait for the consumer before exiting
consumer.daemon
consumer.start()for i in range(10):
queue.put(i)
queue.join()
synchronized_produce()
2.0.0.4 Lock
= set() ##The set that we will sychronize with a lock
seen = threading.Lock()
seen_lock
def already_seen(item):
with seen_lock:
if item not in seen: # Only one thread is checking if item is in sychronizd set
#modifying a sychronized set, means only one thread can add to the set seen
seen.add(item) return False
return True
2.0.0.5 Barrier
= [0, 0]
counters = threading.Barrier(2)
barrier
def count(thread_num, steps):
for i in range(steps):
= counters[1 - thread_num]
other # wait for reads to complete
barrier.wait() = other + 1
counters[thread_num] # wait for writes to complete
barrier.wait()
def threaded_count(steps):
= threading.Thread(target=count, args=(1, steps))
other
other.start()0, steps)
count(print('counters:', counters)
10) threaded_count(
2.0.0.6 Message passing
def process_consume(in_pipe):
while True:
= in_pipe.recv()
item if item is None:
return
print('got an item:', item)
def process_produce():
= multiprocessing.Pipe(False)
pipe = multiprocessing.Process(target=process_consume, args=(pipe[0],))
consumer
consumer.start()for i in range(10):
1].send(i)
pipe[1].send(None) # done signal
pipe[
process_produce()