首页

    python多进程pool map与apply比较

    标签:python

    Pool.map

    map(func, iterable[, chunksize])

    import multiprocessing
    import time
    import signal
    import random
    
    def one_param_worker(i):
        t = random.random()
        print 'worker:%s,proccess:%s,sleep:%f' % (
            i, multiprocessing.current_process(), t)
        time.sleep(t)
        return 'return %s' % (i, )
    
    class Test(object):
        @classmethod
        def process(cls):
            start = time.time()
            result=process_pool.map(one_param_worker, range(5))
            print 'finish result:%s,cost:%f' % (result, time.time() - start)
    
            t = random.random()
            time.sleep(t)
            print 'do other thing,sleep:', t
    
    def stop_callback(signum, frame):
        print "received signum %d, process stopping" % (signum,)
        global is_alive
        global process_pool
        is_alive = False
        process_pool.close()
        print "process stopped"
    
    if __name__ == '__main__':
        is_alive = True
        process_pool = multiprocessing.Pool(4)
        signal.signal(signal.SIGTERM, stop_callback)
    
        while is_alive:
            Test.process()
            time.sleep(5)
    
    worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.833124
    worker:1,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.634145
    worker:2,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.170667
    worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.238150
    worker:4,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.914616
    finish result:['return 0', 'return 1', 'return 2', 'return 3', 'return 4'],cost:1.088779
    do other thing,sleep: 0.36407446494
    
    • 这里func只能接受函数,不能接受class method,instance method,比如
    result=process_pool.map(cls.one_param_worker, range(5))
    
    • func只能接受一个参数,这个参数是map(func, iterable[, chunksize])方法的第二个参数(iterable)里面的值

    This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks

    • map会阻塞后面代码
    • 虽然每个进程执行时间不同,最后返回的结果是有序的

    Pool.map_async

    map_async(func, iterable[, chunksize[, callback]])

    def one_param_worker(i):
        t = random.random()
        print 'worker:%s,proccess:%s,sleep:%f' % (
            i, multiprocessing.current_process(), t)
        time.sleep(t)
        return 'return %s' % (i, )
    
    def cb(result):
        print 'call cb ', result
    
    
    class Test(object):
        @classmethod
        def cb(cls, result):
            print 'call cb ', result
    
        @classmethod
        def process(cls):
            start = time.time()
            result=process_pool.map_async(one_param_worker,range(5),callback=cb)
            print 'finish result:%s,cost:%f' % (result,time.time()-start)
    
            t = random.random()
            time.sleep(t)
            print 'do other thing,sleep:', t
    
    finish result:<multiprocessing.pool.MapResult object at 0x10ff13150>,cost:0.000092
    worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.290402
    worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.561862
    worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.160119
    worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.716796
    worker:4,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.778553
    do other thing,sleep: 0.632692802837
    call cb  ['return 0', 'return 1', 'return 2', 'return 3', 'return 4']
    
    • 和上面map一样,这里func**只能

      1. 接受函数*,不能接受class method,instance method*
      2. 接受一个参数,这个参数是map_async(func, iterable[, chunksize[, callback]])方法的第二个参数(iterable)里面的值
      3. 返回的结果是有序的
    • map_async没有阻塞后面代码,多进程执行完进入回调

    • 如果map_async后result.get()获取结果
    result=process_pool.map_async(one_param_worker,range(5),callback=cb)
    print 'finish result:%s,cost:%f' % (result.get(),time.time()-start)
    

    会阻塞后面代码,直到多进程执行完

    worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.074355
    worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.883103
    worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.676604
    worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.379367
    worker:4,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.757308
    call cb  ['return 0', 'return 1', 'return 2', 'return 3', 'return 4']
    finish result:['return 0', 'return 1', 'return 2', 'return 3', 'return 4'],cost:0.884893
    do other thing,sleep: 0.848403632886
    
    • map_async的callback参数可以是类方法
    result=process_pool.map_async(one_param_worker,range(5),callback=cls.cb)
    

    Pool.apply_async

    apply_async(func[, args[, kwds[, callback]]])

    def multi_params_worker(i, msg1, msg2, msg3):
        t = random.random()
        print 'worker:%s,proccess:%s,sleep:%f' % (
            i, multiprocessing.current_process(), t)
        time.sleep(t)
        return 'return %s' % (i, )
    
    def cb(result):
        print 'call cb ', result
    
    class Test(object):
        @classmethod
        def process(cls):
            start = time.time()
            for i in range(5):
                result = process_pool.apply_async(
                    multi_params_worker,
                    args=(
                        i,
                        'hello',
                        'hehe',
                        'haha',
                    ),
                    callback=cb)
    
            t = random.random()
            time.sleep(t)
            print 'do other thing,sleep:', t
    
    worker:0,proccess:<Process(PoolWorker-1, started daemon)>,sleep:0.346032
    worker:1,proccess:<Process(PoolWorker-2, started daemon)>,sleep:0.771462
    worker:2,proccess:<Process(PoolWorker-3, started daemon)>,sleep:0.800825
    worker:3,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.333540
    do other thing,sleep: 0.317560466602
    worker:4,proccess:<Process(PoolWorker-4, started daemon)>,sleep:0.821762
    call cb  return 3
    call cb  return 0
    call cb  return 1
    call cb  return 2
    call cb  return 4
    
    • apply_async方法的func可以是多参数函数
    • 这里func只能接受函数,不能接受class method,instance method,比如
                result = process_pool.apply_async(
                    cls.multi_params_worker,
                    args=(
                        i,
                        'hello',
                        'hehe',
                        'haha',
                    ),
                    callback=cb)
    
    • apply_async的callback参数可以是类方法
                result = process_pool.apply_async(
                    cls.multi_params_worker,
                    args=(
                        i,
                        'hello',
                        'hehe',
                        'haha',
                    ),
                    callback=cls.cb)
    
    • 不会阻塞后面代码
    • 由于单个进程执行完,就进入回调,回调获得的结果是无序的

    summary

    Multi-args Blocking Ordered-results
    map no yes yes
    map_async no no yes
    apply_async yes no no

    参考

    Python multiprocessing.Pool: Difference between map, apply, map_async, apply_async


    不定期更新