Scalable cohort sampler

May 10, 2019·
Dr. Georg Heiler
Dr. Georg Heiler
· 5 min read
Dask natively scales Python
blog

I have a some binary cohorts and need to sample for each cohort where the target label 1 is matching a observation from a dataframe where the target is 0.

sample data

First, I will generate some sample data.

import tqdm              
import numpy as np
import pandas as pd

tqdm.pandas()

seed = 47
np.random.seed(seed)

size = 10000
df = pd.DataFrame({i: np.random.randint(1,100,size=size) for i in ['metric']})
df['label'] =  np.random.randint(0,2, size=size)
df['group_1'] =  pd.Series(np.random.randint(1,12, size=size)).astype(object)
df['group_2'] =  pd.Series(np.random.randint(1,10, size=size)).astype(object)
display(df.head())

group_0 = df[df['label'] == 0]
group_0 = group_0.reset_index(drop=True)
group_0 = group_0.rename(index=str, columns={"metric": "metric_group_0"})

join_columns_enrich = ['group_1', 'group_2']
join_real = ['metric_group_0']
join_real.extend(join_columns_enrich)
group_0 = group_0[join_real]
display(group_0.head())
group_1 = df[df['label'] == 1]
group_1 = group_1.reset_index(drop=True)
# group_1['metric_group_0'] = np.nan
display(group_1.head())

This gives two dataframes:

   metric_group_0 group_1 group_2
0              87      10       4
1              25       7       9
2              67       3       5
3              37       9       1
4               9       6       7
   metric  label group_1 group_2
0      92      1       4       6
1      86      1       4       8
2      95      1       6       3
3      97      1       1       3
4      90      1       2       9

In python a naive and slow an implementation to sample the data might look like:

k = 3
resulting_df = None

def knnJoinSingle(original_element, group_0, join_columns, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0.query(query)
    if len(candidates) > 0:
        to_join = candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
    else:
        to_join = np.nan
    original_element['metric_group_0'] = to_join
    return original_element

for i in range(1, k+1):
    print(i)
    # WARNING:not setting random state, otherwise always the same record is picked
    # in case of same values from group selection variables. Is there a better way?
    rand_knn1_enriched = group_1.progress_apply(lambda x: knnJoinSingle(x, group_0, join_columns_enrich, random_state=None), axis = 1)

    rand_knn1_enriched['run'] = i
    
    if resulting_df is None:
        resulting_df = rand_knn1_enriched
    else:
        resulting_df = pd.concat([resulting_df, rand_knn1_enriched])
        
resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])

display(resulting_df.head())
print(len(resulting_df))
print(resulting_df.difference.isnull().sum()) 

It takes about one minute to sample the records. Can the performance be improved?

paralellize the computation.

Dask is a library in native Python to simply scale computations.

a minimal and contrived example

Here a minimal example which outlines how dask neatly parallelizes pandas:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
display(df_first)

def foo(row, lookup):
    # some computation which relies on the lookup
    return lookup['foo'].iloc[2]
    
df_second_dask = dd.from_pandas(df_second, npartitions=2)
df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))
display(df_second_dask.visualize())

df_second_dask.compute()

There are not many new elements. Initially, the data is broadcasted to the worker nodes: client.scatter(df_first, broadcast=True). Dask never triggers the computation. It is lazy and only generates a graph df_second_dask.visualize(). This graph can be visualized. Only when finally pressing the button to actually execute df_second_dask.compute(), then the computation is initiated.

rewriting the sampler in dask

import dask.dataframe as dd
from dask.distributed import Client
import dask

client = Client()
display(client)
client.cluster

def knnJoinSingle_series(original_element, group_0_scattered, join_columns, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0_scattered.query(query)
    if len(candidates) > 0:
        return candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
    else:
        return np.nan

def compute_dfs(gr_0, gr_1, k):
    resulting_df = None
    group_0_scattered = client.scatter(gr_0, broadcast=True)

    group_1_dask = dd.from_pandas(gr_1, npartitions=16)
    group_1_dask = client.persist(group_1_dask)

    for i in range(1, k+1):
        print(i)
        dask_result_series = group_1_dask.apply(knnJoinSingle_series,
                                                           args=[group_0_scattered, join_columns_enrich, None],
                                                           axis = 1,
                                                           meta=('metric_group_0', 'int64'))

        display(dask_result_series.visualize())
        local_result_series = dask_result_series.compute()
        group_1['metric_group_0'] = local_result_series
        group_1['run'] = i

        if resulting_df is None:
            resulting_df = group_1
        else:
            resulting_df = pd.concat([resulting_df, group_1])
            
    resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
    resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])
    display(resulting_df.head())
    print(len(resulting_df))
    print(resulting_df.difference.isnull().sum())
    return resulting_df

resulting_df = compute_dfs(group_0, group_1, 10)

So what is the difference? Let’s look at the code i a bit more detail:

Instead of a regular pandas.DataFrmae a parallelized dask.DataFrame is created. Then the group_0 dataframe is broadcasted to all worker nodes group_0_scattered = client.scatter(gr_0, broadcast=True). The original Python function (knnJoinSingle_series) does not need to be changed.

So far, nothing has happend. Only a graph of operations which need to be calculated has been set up. Now: group_1 = group_1_dask.compute() to compute the results.

As the for loop is iterating over group_1_dask for k times, it makes sense to persist the dataframe on the worker nodes: group_1_dask = client.persist(group_1_dask) ensures that one does not need to re-transform the original pandas data frame over and over to a dask dataframe.

conclusions

Dask can easily release the GIL and not only parallelize on a single node. This nicely speeds up the sampling process. In fact instead of sampling for 3 runs in dask 10 runs are sampled and take about one minute and 16 seconds.

If you have an even better implementation of the sampler, feel free to add a comment with your improvements.

However, you still should rethink if there is not a even better option: as for loops of for loops do not offer the best runtime. Using plain pandas, one could simply sample multiple times from the other dataframe:

def randomMatchingCondition(original_element, group_0, join_columns, k, random_state):
    limits_dict = original_element[join_columns_enrich].to_dict()
    query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
    candidates = group_0.query(query)
    if len(candidates) > 0:
        return candidates.sample(n=k, random_state=random_state, replace=True)['metric_group_0'].values
    else:
        return np.nan

k = 3
group_1['metric_group_0'] = group_1.progress_apply(randomMatchingCondition,
                                                   args=[group_0, join_columns_enrich, k, None],
                                                   axis = 1)
print(group_1.isnull().sum())
group_1 = group_1[~group_1.metric_group_0.isnull()]
display(group_1.head())

s=pd.DataFrame({'metric_group_0':np.concatenate(group_1.metric_group_0.values)},index=group_1.index.repeat(group_1.metric_group_0.str.len()))
s = s.join(group_1.drop('metric_group_0',1),how='left')
s['pos_in_array'] = s.groupby(s.index).cumcount()
s.head()
Dr. Georg Heiler
Authors
senior data expert
Georg is a Senior data expert at Magenta and a ML-ops engineer at ASCII. He is solving challenges with data. His interests include geospatial graphs and time series. Georg transitions the data platform of Magenta to the cloud and is handling large scale multi-modal ML-ops challenges at ASCII.