Scalable cohort sampler

Use Dask to paralellize python

Dask natively scales Python

I have a some binary cohorts and need to sample for each cohort where the target label 1 is matching a observation from a dataframe where the target is 0.

sample data

First, I will generate some sample data.

import tqdm              
import numpy as np
import pandas as pd

tqdm.pandas()

seed = 47
np.random.seed(seed)

size = 10000
df = pd.DataFrame({i: np.random.randint(1,100,size=size) for i in ['metric']})
df['label'] =  np.random.randint(0,2, size=size)
df['group_1'] =  pd.Series(np.random.randint(1,12, size=size)).astype(object)
df['group_2'] =  pd.Series(np.random.randint(1,10, size=size)).astype(object)
display(df.head())

group_0 = df[df['label'] == 0]
group_0 = group_0.reset_index(drop=True)
group_0 = group_0.rename(index=str, columns={"metric": "metric_group_0"})

join_columns_enrich = ['group_1', 'group_2']
join_real = ['metric_group_0']
join_real.extend(join_columns_enrich)
group_0 = group_0[join_real]
display(group_0.head())
group_1 = df[df['label'] == 1]
group_1 = group_1.reset_index(drop=True)
# group_1['metric_group_0'] = np.nan
display(group_1.head())

This gives two dataframes:

   metric_group_0 group_1 group_2
0              87      10       4
1              25       7       9
2              67       3       5
3              37       9       1
4               9       6       7
   metric  label group_1 group_2
0      92      1       4       6
1      86      1       4       8
2      95      1       6       3
3      97      1       1       3
4      90      1       2       9

In python a naive and slow an implementation to sample the data might look like:

k = 3
resulting_df = None

def knnJoinSingle(original_element, group_0, join_columns, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0.query(query)
    if len(candidates) > 0:
        to_join = candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
    else:
        to_join = np.nan
    original_element['metric_group_0'] = to_join
    return original_element

for i in range(1, k+1):
    print(i)
    # WARNING:not setting random state, otherwise always the same record is picked
    # in case of same values from group selection variables. Is there a better way?
    rand_knn1_enriched = group_1.progress_apply(lambda x: knnJoinSingle(x, group_0, join_columns_enrich, random_state=None), axis = 1)

    rand_knn1_enriched['run'] = i
    
    if resulting_df is None:
        resulting_df = rand_knn1_enriched
    else:
        resulting_df = pd.concat([resulting_df, rand_knn1_enriched])
        
resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])

display(resulting_df.head())
print(len(resulting_df))
print(resulting_df.difference.isnull().sum()) 

It takes about one minute to sample the records. Can the performance be improved?

paralellize the computation.

Dask is a library in native Python to simply scale computations.

a minimal and contrived example

Here a minimal example which outlines how dask neatly parallelizes pandas:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
display(df_first)

def foo(row, lookup):
    # some computation which relies on the lookup
    return lookup['foo'].iloc[2]
    
df_second_dask = dd.from_pandas(df_second, npartitions=2)
df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))
display(df_second_dask.visualize())

df_second_dask.compute()

There are not many new elements. Initially, the data is broadcasted to the worker nodes: client.scatter(df_first, broadcast=True). Dask never triggers the computation. It is lazy and only generates a graph df_second_dask.visualize(). This graph can be visualized. Only when finally pressing the button to actually execute df_second_dask.compute(), then the computation is initiated.

rewriting the sampler in dask

import dask.dataframe as dd
from dask.distributed import Client
import dask

client = Client()
display(client)
client.cluster

def knnJoinSingle_series(original_element, group_0_scattered, join_columns, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0_scattered.query(query)
    if len(candidates) > 0:
        return candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
    else:
        return np.nan

def compute_dfs(gr_0, gr_1, k):
    resulting_df = None
    group_0_scattered = client.scatter(gr_0, broadcast=True)

    group_1_dask = dd.from_pandas(gr_1, npartitions=16)
    group_1_dask = client.persist(group_1_dask)

    for i in range(1, k+1):
        print(i)
        dask_result_series = group_1_dask.apply(knnJoinSingle_series,
                                                           args=[group_0_scattered, join_columns_enrich, None],
                                                           axis = 1,
                                                           meta=('metric_group_0', 'int64'))

        display(dask_result_series.visualize())
        local_result_series = dask_result_series.compute()
        group_1['metric_group_0'] = local_result_series
        group_1['run'] = i

        if resulting_df is None:
            resulting_df = group_1
        else:
            resulting_df = pd.concat([resulting_df, group_1])
            
    resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
    resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])
    display(resulting_df.head())
    print(len(resulting_df))
    print(resulting_df.difference.isnull().sum())
    return resulting_df

resulting_df = compute_dfs(group_0, group_1, 10)

So what is the difference? Let’s look at the code i a bit more detail:

Instead of a regular pandas.DataFrmae a parallelized dask.DataFrame is created. Then the group_0 dataframe is broadcasted to all worker nodes group_0_scattered = client.scatter(gr_0, broadcast=True). The original Python function (knnJoinSingle_series) does not need to be changed.

So far, nothing has happend. Only a graph of operations which need to be calculated has been set up. Now: group_1 = group_1_dask.compute() to compute the results.

As the for loop is iterating over group_1_dask for k times, it makes sense to persist the dataframe on the worker nodes: group_1_dask = client.persist(group_1_dask) ensures that one does not need to re-transform the original pandas data frame over and over to a dask dataframe.

conclusions

Dask can easily release the GIL and not only parallelize on a single node. This nicely speeds up the sampling process. In fact instead of sampling for 3 runs in dask 10 runs are sampled and take about one minute and 16 seconds.

If you have an even better implementation of the sampler, feel free to add a comment with your improvements.

However, you still should rethink if there is not a even better option: as for loops of for loops do not offer the best runtime. Using plain pandas, one could simply sample multiple times from the other dataframe:

def randomMatchingCondition(original_element, group_0, join_columns, k, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0.query(query)
    if len(candidates) > 0:
        return candidates.sample(n=k, random_state=random_state, replace=True)['metric_group_0'].values
    else:
        return np.nan

k = 3
group_1['metric_group_0'] = group_1.progress_apply(randomMatchingCondition,
                                                   args=[group_0, join_columns_enrich, k, None],
                                                   axis = 1)
print(group_1.isnull().sum())
group_1 = group_1[~group_1.metric_group_0.isnull()]
display(group_1.head())

s=pd.DataFrame({'metric_group_0':np.concatenate(group_1.metric_group_0.values)},index=group_1.index.repeat(group_1.metric_group_0.str.len()))
s = s.join(group_1.drop('metric_group_0',1),how='left')
s['pos_in_array'] = s.groupby(s.index).cumcount()
s.head()
Georg Heiler
Georg Heiler
research & software engineer specialized in data

My research interests include large geo-spatial time and network data analytics.