I have dataframe:
data = [{"category": 'A', "bigram": 'delicious spaghetti', "vector": [0.01, -0.02, 0.03], 'all_vector' : 2},
{"category": 'A', "bigram": 'delicious dinner', "vector": [0.04, 0.05, 0.06], 'all_vector' : 2},
{"category": 'B', "bigram": 'new blog', "vector": [-0.14, -0.15, -0.16], 'all_vector' : 2},
{"category": 'B', "bigram": 'bright sun', "vector": [0.071, -0.09, 0.063], 'all_vector' : 2}
]
sdf = spark.createDataFrame(data)
+----------+-------------------+--------+---------------------+
|all_vector|bigram |category|vector |
+----------+-------------------+--------+---------------------+
|2 |delicious spaghetti|A |[0.01, -0.02, 0.03] |
|2 |delicious dinner |A |[0.04, 0.05, 0.06] |
|2 |new blog |B |[-0.14, -0.15, -0.16]|
|2 |bright sun |B |[0.071, -0.09, 0.063]|
+----------+-------------------+--------+---------------------+
I need to element-wise add lists in a vector column and divide by all_vector column ( i need normalize vector). Then group by category column. I wrote an example code but unfortunately it doesn’t work:
@udf_annotator(returnType=ArrayType(FloatType()))
def result_vector(vector, all_vector):
lst = [sum(x) for x in zip(*vector)] / all_vector
return lst
sdf_new = sdf\
.withColumn('norm_vector', result_vector(F.col('vector'), F.col('all_vector')))\
.withColumn('rank', F.row_number().over(Window.partitionBy('category')))\
.where(F.col('rank') == 1)
I want it this way:
+----------+-------------------+--------+-----------------------+---------------------+
|all_vector|bigram |category|norm_vector |vector |
+----------+-------------------+--------+-----------------------+---------------------+
|2 |delicious spaghetti|A |[0.05, 0.03, 0.09] |[0.01, -0.02, 0.03] |
|2 |delicious dinner |A |[0.05, 0.03, 0.09] |[0.04, 0.05, 0.06] |
|2 |new blog |B |[-0.069, -0.24, -0.097]|[-0.14, -0.15, -0.16]|
|2 |bright sun |B |[-0.069, -0.24, -0.097]|[0.071, -0.09, 0.063]|
+----------+-------------------+--------+-----------------------+---------------------+
>Solution :
The zip_with function will help you zip two arrays and apply a function element wise. To use the function, we can create an array collection of the arrays in the vector column, and use the aggregate function. There might also be other simpler ways to do this though.
data_sdf. \
withColumn('vector_collection', func.collect_list('vector').over(wd.partitionBy('cat'))). \
withColumn('ele_wise_sum',
func.expr('''
aggregate(vector_collection,
cast(array() as array<double>),
(x, y) -> zip_with(x, y, (a, b) -> coalesce(a, 0) + coalesce(b, 0))
)
''')
). \
show(truncate=False)
# +---+---------------------+----------------------------------------------+-------------------------------------+
# |cat|vector |vector_collection |ele_wise_sum |
# +---+---------------------+----------------------------------------------+-------------------------------------+
# |B |[-0.14, -0.15, -0.16]|[[-0.14, -0.15, -0.16], [0.071, -0.09, 0.063]]|[-0.06900000000000002, -0.24, -0.097]|
# |B |[0.071, -0.09, 0.063]|[[-0.14, -0.15, -0.16], [0.071, -0.09, 0.063]]|[-0.06900000000000002, -0.24, -0.097]|
# |A |[0.01, -0.02, 0.03] |[[0.01, -0.02, 0.03], [0.04, 0.05, 0.06]] |[0.05, 0.030000000000000002, 0.09] |
# |A |[0.04, 0.05, 0.06] |[[0.01, -0.02, 0.03], [0.04, 0.05, 0.06]] |[0.05, 0.030000000000000002, 0.09] |
# +---+---------------------+----------------------------------------------+-------------------------------------+