-
-
Save yong27/7869662 to your computer and use it in GitHub Desktop.
import multiprocessing | |
import pandas as pd | |
import numpy as np | |
def _apply_df(args): | |
df, func, kwargs = args | |
return df.apply(func, **kwargs) | |
def apply_by_multiprocessing(df, func, **kwargs): | |
workers = kwargs.pop('workers') | |
pool = multiprocessing.Pool(processes=workers) | |
result = pool.map(_apply_df, [(d, func, kwargs) | |
for d in np.array_split(df, workers)]) | |
pool.close() | |
return pd.concat(list(result)) | |
def square(x): | |
return x**x | |
if __name__ == '__main__': | |
df = pd.DataFrame({'a':range(10), 'b':range(10)}) | |
apply_by_multiprocessing(df, square, axis=1, workers=4) | |
## run by 4 processors | |
@joshlk It's been a while, but could you provide details about your bug? I have been able to use this snippet without any issues.
I used without problems too.
Thanks!! This is a great example.
Yes, the order of the rows will be lost, because the Dataframe is appended back, as and when the sub-process completes it. To resolve this bug, we need to associate a key with each group(in the ascending order), and when they're returned, we sort them.
Refer to the modified version below-
https://gist.github.com/tejaslodaya/562a8f71dc62264a04572770375f4bba
CC: @joshlk @smsaladi
Using Dask, you can use parallel apply.
http://dask.pydata.org/en/latest/dataframe-api.html?highlight=apply#dask.dataframe.DataFrame.apply
It's not working when trying to get data (from another big map) as new column
Seems like it need shared memory
Cant you just use pool.join(). It should take care of the index
Another note worth adding:
I wanted to pass an argument to the function I wanted to apply. I found the best way to do that with the suggested implementation is using functools.partial
, which creates pickleable objects, so the application would look like:
apply_by_multiprocessing(df, functools.parial(my_func, some_arg), axis=1, workers=4)
I'm getting this error
PicklingError: Can't pickle <function _apply_df at 0x7f2c2e876158>: attribute lookup _apply_df on main failed
Thanks for the Code, have a good idea from this. But I need to send two data frames and one pandas.core.groupby.generic.DataFrameGroupBy object to my_function(). How I can achieve this through multiprocessing. if anyone can share thought / expertise, that will be great help.
Thanks for the Code, have a good idea from this. But I need to send two data frames and one pandas.core.groupby.generic.DataFrameGroupBy object to my_function(). How I can achieve this through multiprocessing. if anyone can share thought / expertise, that will be great help.
Check this https://github.com/zahrashuaib/parallel-computing. The dataframe sent to the function for the multiprocess.
I wrote a package to use apply methods on Series, DataFrames and GroupByDataFrames on multiple cores. It makes it very easy to do multiprocessing in Pandas.
You can check the documentation at https://github.com/akhtarshahnawaz/multiprocesspandas
You can also install the package directly using pip
pip install multiprocesspandas
Then doing multiprocessing is as simple as importing the package as
from multiprocesspandas import applyparallel
and then using applyparallel instead of apply like
def func(x):
import pandas as pd
return pd.Series([x['C'].mean()])
df.groupby(["A","B"]).apply_parallel(func, num_processes=30)
On large dataframes there seems to be a bug whereby the order of values for each row are lost. i.e. the values are shuffled