2021年5月4日星期二

Optimizing a "cross join" pyspark Levinstein distance query

In my pyspark code I have a table with the following columns (added example data):

id name sequence
1 a AGCCAGC
1 b ACCCAGC
2 a AAGCAAA
2 c AGCCAAA

I am trying to construct a graph where each ID is a totally separate sub-graph and an edge exists between two rows if their name is different and the Levenshtein Distance between them is less then a constant. Also sequence length is constant across rows.

Code example:

edge_df = source_df.join(      end_df,      [F.col('source_id') == F.col('end_id'), F.col('source_name') !=       F.col('end_name')],  ).withColumn(      'edit_distance',      F.levenshtein(F.col('source_sequence'), F.col('end_sequence')),  ).filter(      F.col('edit_distance') <= F.lit(job_info['cutoffs']['inclusion']),  ).persist()  

Where source_df and end_df are the same dataframe with the column names presented above with a prefix of source / end (content is the same). I also repartitioned based on ID because I don't need to calculate the edges for rows with a different ID.

orig_df = orig_df.repartition(PARALLELISM, 'id')  

My problem is that some of the data is just very biased s.t. some id's have tons of rows and some little (example from real data with 1014 "names"):

id row_no
1116 803683
9151 766044
12696 500045
5579 318143
2756 7083
2152 7075
5436 7050

Eventually I get a few working executors and the rest are doing nothing since they completed their part. Repartition without 'id' as key doesn't help and PARALLELISM is high.

The sequences should be similar so either having cached levenshtein would help but I also feel spark should send those larger id counts to multiple executors and let each executor calculate a batch of it. It has so many idle executors...

How do I optimize the problem? The data seems to split well but because it's a cross join it's more about the average sequences per name in id then just stacking multiple id's that each have a small number of rows (because it's n^2)

Thanks a lot!

https://stackoverflow.com/questions/67393955/optimizing-a-cross-join-pyspark-levinstein-distance-query May 05, 2021 at 09:07AM

没有评论:

发表评论