Skip to content Skip to sidebar Skip to footer

How To Parallelize The Row Wise Pandas Dataframe's Apply() Method

I have the following code: import pandas as pd import time def enrich_str(str): val1 = f'{str}_1' val2 = f'{str}_2' val3 = f'{str}_3' time.sleep(3)

Solution 1:

I recommend you use the pathos fork of multiprocessing, because it will handle pickling the DataFrames better. imap returns an iterator, not a DataFrame, so you have to convert it back:

defenrich_row(row_tuple):
    passed_row = row_tuple[1]
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

Note that I'm using df.iterrows() which returns an iterator of tuples (row_number, row), so I modified enrich_row to handle this format.

Solution 2:

I accepted @albert's answer as it works on Linux. Anyway I found the Dask dataframe's apply() method really strightforward. As I mentioned in a previous comment, at first the operation was not performed in parallel on a dataset of 120 rows. I later discovered that the 120 rows used only one partition of the Dask dataframe. Therefore it was sufficient to do a repartition to obtain the desired parallelism. Here an example of the code using Dask (which is raising some strange warnings...).

Post a Comment for "How To Parallelize The Row Wise Pandas Dataframe's Apply() Method"