Concatenate two dataframes in pyspark by substring search

I have two dataframes with the following structure:

Dataframe A:

Current Accession String
A_1 AAAABBBC
A_2 AAACR

This data frame contains 1 millions strings.

Dataframe B:

Accession String
C_34 RRRQAAAABBBC
C_35 RAAAABBBC
C_36 WWWWAAACR

I want to get a final dataframe by looking with the substring in dataframe A into dataframe B and create a new column with the new accessions found, results should look like:

Current Accession String Mapped Accession
A_1 AAAABBBC [C_34,C_35]
A_2 AAACR [C_36]

I have explored join in pyspark but it needs exact match. Which doesn’t work with sub-string matching.

Answer

Column.contains can be used:

from pyspark.sql import functions as F

dfA = ...
dfB = ...

dfA.join(dfB, on=dfB["String"].contains(dfA["String"])) 
  .groupBy("CurrentAccession").agg(
    F.first(dfA["String"]),
    F.collect_list("Accession")
  ).show()

Output:

+----------------+-------------+-----------------------+
|CurrentAccession|first(String)|collect_list(Accession)|
+----------------+-------------+-----------------------+
|             A_1|     AAAABBBC|           [C_34, C_35]|
|             A_2|        AAACR|                 [C_36]|
+----------------+-------------+-----------------------+

However, there is a downside using contains as join condition: a cross join is executed by Spark:

dfA.join(dfB, on=dfB["String"].contains(dfA["String"])).explain()

shows

== Physical Plan ==
CartesianProduct Contains(String#71, String#67)
:- *(1) Filter isnotnull(String#67)
:  +- *(1) Scan ExistingRDD[CurrentAccession#66,String#67]
+- *(2) Filter isnotnull(String#71)
   +- *(2) Scan ExistingRDD[Accession#70,String#71]