python多进程pool map与apply比较
标签:python
Pool.map
map(func, iterable[, chunksize])
import multiprocessing
import time
import signal
import random
def one_param_worker(i):
t = random.random()
print 'worker:%s,proccess:%s,sleep:%f' % (
i, multiprocessing.current_process(), t)
time.sleep(t)
return 'return %s' % (i, )
class Test(object):
@classmethod
def process(cls):
start = time.time()
result=process_pool.map(one_param_worker, range(5))
print 'finish result:%s,cost:%f' % (result, time.time() - start)
t = random.random()
time.sleep(t)
print 'do other thing,sleep:', t
def stop_callback(signum, frame):
print "received signum %d, process stopping" % (signum,)
global is_alive
global process_pool
is_alive = False
process_pool.close()
print "process stopped"
if __name__ == '__main__':
is_alive = True
process_pool = multiprocessing.Pool(4)
signal.signal(signal.SIGTERM, stop_callback)
while is_alive:
Test.process()
time.sleep(5)
worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.833124
worker:1,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.634145
worker:2,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.170667
worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.238150
worker:4,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.914616
finish result:['return 0', 'return 1', 'return 2', 'return 3', 'return 4'],cost:1.088779
do other thing,sleep: 0.36407446494
- 这里func只能接受函数,不能接受class method,instance method,比如
result=process_pool.map(cls.one_param_worker, range(5))
- func只能接受一个参数,这个参数是map(func, iterable[, chunksize])方法的第二个参数(iterable)里面的值
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks
- map会阻塞后面代码
- 虽然每个进程执行时间不同,最后返回的结果是有序的
Pool.map_async
map_async(func, iterable[, chunksize[, callback]])
def one_param_worker(i):
t = random.random()
print 'worker:%s,proccess:%s,sleep:%f' % (
i, multiprocessing.current_process(), t)
time.sleep(t)
return 'return %s' % (i, )
def cb(result):
print 'call cb ', result
class Test(object):
@classmethod
def cb(cls, result):
print 'call cb ', result
@classmethod
def process(cls):
start = time.time()
result=process_pool.map_async(one_param_worker,range(5),callback=cb)
print 'finish result:%s,cost:%f' % (result,time.time()-start)
t = random.random()
time.sleep(t)
print 'do other thing,sleep:', t
finish result:<multiprocessing.pool.MapResult object at 0x10ff13150>,cost:0.000092
worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.290402
worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.561862
worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.160119
worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.716796
worker:4,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.778553
do other thing,sleep: 0.632692802837
call cb ['return 0', 'return 1', 'return 2', 'return 3', 'return 4']
和上面map一样,这里func**只能
- 接受函数*,不能接受class method,instance method*
- 接受一个参数,这个参数是map_async(func, iterable[, chunksize[, callback]])方法的第二个参数(iterable)里面的值
- 返回的结果是有序的
map_async没有阻塞后面代码,多进程执行完进入回调
- 如果map_async后
result.get()
获取结果
result=process_pool.map_async(one_param_worker,range(5),callback=cb)
print 'finish result:%s,cost:%f' % (result.get(),time.time()-start)
会阻塞后面代码,直到多进程执行完
worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.074355
worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.883103
worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.676604
worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.379367
worker:4,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.757308
call cb ['return 0', 'return 1', 'return 2', 'return 3', 'return 4']
finish result:['return 0', 'return 1', 'return 2', 'return 3', 'return 4'],cost:0.884893
do other thing,sleep: 0.848403632886
- map_async的callback参数可以是类方法
result=process_pool.map_async(one_param_worker,range(5),callback=cls.cb)
Pool.apply_async
apply_async(func[, args[, kwds[, callback]]])
def multi_params_worker(i, msg1, msg2, msg3):
t = random.random()
print 'worker:%s,proccess:%s,sleep:%f' % (
i, multiprocessing.current_process(), t)
time.sleep(t)
return 'return %s' % (i, )
def cb(result):
print 'call cb ', result
class Test(object):
@classmethod
def process(cls):
start = time.time()
for i in range(5):
result = process_pool.apply_async(
multi_params_worker,
args=(
i,
'hello',
'hehe',
'haha',
),
callback=cb)
t = random.random()
time.sleep(t)
print 'do other thing,sleep:', t
worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.346032
worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.771462
worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.800825
worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.333540
do other thing,sleep: 0.317560466602
worker:4,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.821762
call cb return 3
call cb return 0
call cb return 1
call cb return 2
call cb return 4
- apply_async方法的func可以是多参数函数
- 这里func只能接受函数,不能接受class method,instance method,比如
result = process_pool.apply_async(
cls.multi_params_worker,
args=(
i,
'hello',
'hehe',
'haha',
),
callback=cb)
- apply_async的callback参数可以是类方法
result = process_pool.apply_async(
cls.multi_params_worker,
args=(
i,
'hello',
'hehe',
'haha',
),
callback=cls.cb)
- 不会阻塞后面代码
- 由于单个进程执行完,就进入回调,回调获得的结果是无序的
summary
Multi-args | Blocking | Ordered-results | |
---|---|---|---|
map | no | yes | yes |
map_async | no | no | yes |
apply_async | yes | no | no |
参考
Python multiprocessing.Pool: Difference between map, apply, map_async, apply_async
不定期更新