Scalable cohort sampler

Use Dask to paralellize python Dask natively scales Python

I have a some binary cohorts and need to sample for each cohort where the target label 1 is matching a observation from a dataframe where the target is 0.

sample data

First, I will generate some sample data.

import tqdm
import numpy as np
import pandas as pd

tqdm.pandas()

seed = 47
np.random.seed(seed)

size = 10000
df = pd.DataFrame({i: np.random.randint(1,100,size=size) for i in ['metric']})
df['label'] =  np.random.randint(0,2, size=size)
df['group_1'] =  pd.Series(np.random.randint(1,12, size=size)).astype(object)
df['group_2'] =  pd.Series(np.random.randint(1,10, size=size)).astype(object)

group_0 = df[df['label'] == 0]
group_0 = group_0.reset_index(drop=True)
group_0 = group_0.rename(index=str, columns={"metric": "metric_group_0"})

join_columns_enrich = ['group_1', 'group_2']
join_real = ['metric_group_0']
join_real.extend(join_columns_enrich)
group_0 = group_0[join_real]
group_1 = df[df['label'] == 1]
group_1 = group_1.reset_index(drop=True)
# group_1['metric_group_0'] = np.nan

This gives two dataframes:

metric_group_0 group_1 group_2
0              87      10       4
1              25       7       9
2              67       3       5
3              37       9       1
4               9       6       7
metric  label group_1 group_2
0      92      1       4       6
1      86      1       4       8
2      95      1       6       3
3      97      1       1       3
4      90      1       2       9

In python a naive and slow an implementation to sample the data might look like:

k = 3
resulting_df = None

def knnJoinSingle(original_element, group_0, join_columns, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0.query(query)
if len(candidates) > 0:
to_join = candidates.sample(n=1, random_state=random_state)['metric_group_0'].values
else:
to_join = np.nan
original_element['metric_group_0'] = to_join
return original_element

for i in range(1, k+1):
print(i)
# WARNING:not setting random state, otherwise always the same record is picked
# in case of same values from group selection variables. Is there a better way?
rand_knn1_enriched = group_1.progress_apply(lambda x: knnJoinSingle(x, group_0, join_columns_enrich, random_state=None), axis = 1)

rand_knn1_enriched['run'] = i

if resulting_df is None:
resulting_df = rand_knn1_enriched
else:
resulting_df = pd.concat([resulting_df, rand_knn1_enriched])

resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])

print(len(resulting_df))
print(resulting_df.difference.isnull().sum())

It takes about one minute to sample the records. Can the performance be improved?

paralellize the computation.

Dask is a library in native Python to simply scale computations.

a minimal and contrived example

Here a minimal example which outlines how dask neatly parallelizes pandas:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
display(df_first)

def foo(row, lookup):
# some computation which relies on the lookup
return lookup['foo'].iloc

df_second_dask = dd.from_pandas(df_second, npartitions=2)
df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

There are not many new elements. Initially, the data is broadcasted to the worker nodes: client.scatter(df_first, broadcast=True). Dask never triggers the computation. It is lazy and only generates a graph df_second_dask.visualize(). This graph can be visualized. Only when finally pressing the button to actually execute df_second_dask.compute(), then the computation is initiated.

rewriting the sampler in dask

import dask.dataframe as dd
from dask.distributed import Client

client = Client()
display(client)
client.cluster

def knnJoinSingle_series(original_element, group_0_scattered, join_columns, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0_scattered.query(query)
if len(candidates) > 0:
return candidates.sample(n=1, random_state=random_state)['metric_group_0'].values
else:
return np.nan

def compute_dfs(gr_0, gr_1, k):
resulting_df = None
group_0_scattered = client.scatter(gr_0, broadcast=True)

group_1_dask = dd.from_pandas(gr_1, npartitions=16)

for i in range(1, k+1):
print(i)
args=[group_0_scattered, join_columns_enrich, None],
axis = 1,
meta=('metric_group_0', 'int64'))

group_1['metric_group_0'] = local_result_series
group_1['run'] = i

if resulting_df is None:
resulting_df = group_1
else:
resulting_df = pd.concat([resulting_df, group_1])

resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])
print(len(resulting_df))
print(resulting_df.difference.isnull().sum())
return resulting_df

resulting_df = compute_dfs(group_0, group_1, 10)

So what is the difference? Let’s look at the code i a bit more detail:

Instead of a regular pandas.DataFrmae a parallelized dask.DataFrame is created. Then the group_0 dataframe is broadcasted to all worker nodes group_0_scattered = client.scatter(gr_0, broadcast=True). The original Python function (knnJoinSingle_series) does not need to be changed.

So far, nothing has happend. Only a graph of operations which need to be calculated has been set up. Now: group_1 = group_1_dask.compute() to compute the results.

As the for loop is iterating over group_1_dask for k times, it makes sense to persist the dataframe on the worker nodes: group_1_dask = client.persist(group_1_dask) ensures that one does not need to re-transform the original pandas data frame over and over to a dask dataframe.

conclusions

Dask can easily release the GIL and not only parallelize on a single node. This nicely speeds up the sampling process. In fact instead of sampling for 3 runs in dask 10 runs are sampled and take about one minute and 16 seconds.

If you have an even better implementation of the sampler, feel free to add a comment with your improvements.

However, you still should rethink if there is not a even better option: as for loops of for loops do not offer the best runtime. Using plain pandas, one could simply sample multiple times from the other dataframe:

def randomMatchingCondition(original_element, group_0, join_columns, k, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0.query(query)
if len(candidates) > 0:
return candidates.sample(n=k, random_state=random_state, replace=True)['metric_group_0'].values
else:
return np.nan

k = 3
group_1['metric_group_0'] = group_1.progress_apply(randomMatchingCondition,
args=[group_0, join_columns_enrich, k, None],
axis = 1)
print(group_1.isnull().sum())
group_1 = group_1[~group_1.metric_group_0.isnull()]

s=pd.DataFrame({'metric_group_0':np.concatenate(group_1.metric_group_0.values)},index=group_1.index.repeat(group_1.metric_group_0.str.len()))
s = s.join(group_1.drop('metric_group_0',1),how='left')
s['pos_in_array'] = s.groupby(s.index).cumcount() 