Skip to content Skip to sidebar Skip to footer

Restrictions On Dynamically Created Functions With Concurrent.futures.processpoolexecutor

I am trying to do some multiprocessing with functions that I dynamically create within other functions. It seems I can run these if the function fed to ProcessPoolExecutor is modul

Solution 1:

As the error messages suggest, it's more to do with the pickling rather than dynamically generated functions. From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

only picklable objects can be executed and returned.

and from https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled, the sorts of functions which can be pickled:

functions defined at the top level of a module (using def, not lambda)

which suggests other sorts of functions can't be pickled. From the code in the question a function that doesn't adhere to this leaps out: dynamic_func from...

defmake_func(a):
    defdynamic_func(i):
        return i, i**2 + a
    return dynamic_func

...and you hint that this is the issue....

So I would have thought it was possible to somehow get python to recognise them as "true" functions

You can! You can put dynamic_func on the top level, and use partial rather than a closure...

from functools import partial
defdynamic_func(a, i):
    return i, i**2 + a

defmake_func(a):
    return partial(dynamic_func, a)

So in full...

import concurrent.futures
from functools import partial

defdynamic_func(a, i):
    return i, i**2 + a

defmake_func(a):
    return partial(dynamic_func, a)

f_dyns = [make_func(a) for a inrange(10)]
defloopfunc(i):
    return f_dyns[i](i)


classTest:
    def__init__(self, myfunc):
        self.f = myfunc

    defloopfunc(self, i):
        return self.f(i)

    defrun(self):
        with concurrent.futures.ProcessPoolExecutor(3) as executor:
            for i,r in executor.map(self.loopfunc, range(10)):
                print(i,":",r)

o2 = Test(make_func(1))
o2.run()

But... why the original form without classes worked, I don't know. By my understanding, it would be trying to pickle a non-top level function, and so I think my understanding is flawed.

Solution 2:

The answer can not solve all condition. for exampel:

def _picklable_func(func, *args, **kwargs):
    myfunc = partial(func, *args)
    return myfunc

def invoke_func():
    mod_fun_str = "oh.mymodule.myfunc"
    mod_name, func_name = mod_fun_str.rsplit('.', 1)
    mod = importlib.import_module(mod_name)
    func = getattr(mod, func_name)
    future = self.threadpool.submit(func, *args, **kwargs) # feature.result() always returnNone
    #future = self.threadpool.submit(_picklable_func(func, *args, **kwargs),  **kwargs) # this doesn't work too, 'myfunc' always returned Noneif __name__=="__main__":
    invoke_func()

The problem is future.result() in function invoke_func still always returned None.

# myfunc in oh.mymodule  module is defmyfunc():
    return"hello"

Post a Comment for "Restrictions On Dynamically Created Functions With Concurrent.futures.processpoolexecutor"