Friday, June 19, 2020

PySpark: Add column in dataframe using UDF

Introduction


This article demonstrates how to add column in dataframe using UDF in PySpark.
The recommended rdd.map or rdd.mapPartitions for large datasets processing instead of UDF because its slow in large datasets.
You can refer this link for rdd.map and rdd.mapPartitions

Add new column in DataFrame using UDF


from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

appName = "AddColumnUsingUDF"

#Spark Session
spark = SparkSession.Builder().appName(appName).getOrCreate()

filePath = 'C:/tools/data/sampleEmpSalaryData.csv'

#Read csv file
sampleSalaryDF = spark.read.format('csv').options(header='true').load(filePath)

#DF has many columns, so restricting to limited columns for better display
#Here I am using Python list to pass column names, you may pass column names as directly in select method.
colList = ['Emp ID', 'First Name', 'Last Name', 'Date of Birth', 'Salary', 'Last % Hike']
sampleSalaryDF = sampleSalaryDF.select(*colList)
sampleSalaryDF.show(5)

Output:

+------+----------+---------+-------------+------+-----------+
|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|
+------+----------+---------+-------------+------+-----------+
|677509|      Lois|   Walker|    3/29/1981|168251|        21%|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%|
+------+----------+---------+-------------+------+-----------+

using UDF in DataFrame:

#Using UDF in DataFrame - Option 1
#Python function for UDF
def addRevisedSalaryUDF(salary, lastHike):
    lastHike = int(lastHike.replace("%", ""))
    revisedSalary = float(salary) + (float(salary) * lastHike / 100)
    return revisedSalary

#create UDF function variable to use in DataFrame
revisedSalaryUDF = udf(addRevisedSalaryUDF, FloatType())

sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', revisedSalaryUDF("Salary", "Last % Hike"))
sampleSalaryDFUpdated.show(5)

Output:

+------+----------+---------+-------------+------+-----------+----------+
|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
+------+----------+---------+-------------+------+-----------+----------+
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|
+------+----------+---------+-------------+------+-----------+----------+

Using UDF in Spark SQL:

#Using UDF in Spark SQL - Option 2
#Register a function as a UDF, you can optionally set the return type of your UDF
#The default return type is StringType
spark.udf.register("revisedSalary", addRevisedSalaryUDF, FloatType())

sampleSalaryDF.createOrReplaceTempView("SampleSalary")
sampleSalaryDFUpdated = spark.sql("select a.*, revisedSalary(Salary, `Last % Hike`) as `New Salary` from SampleSalary a")

sampleSalaryDFUpdated.show(5)

Output:

+------+----------+---------+-------------+------+-----------+----------+
|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
+------+----------+---------+-------------+------+-----------+----------+
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|
+------+----------+---------+-------------+------+-----------+----------+


Using UDF annotation syntax


#UDF using annotation syntax - Option 3
@udf()
def addRevisedSalaryUDF(salary, lastHike):
    lastHike = int(lastHike.replace("%", ""))
    revisedSalary = float(salary) + (float(salary) * lastHike / 100)
    return revisedSalary

sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', addRevisedSalaryUDF("Salary", "Last % Hike"))
sampleSalaryDFUpdated.show(5)

Output:

+------+----------+---------+-------------+------+-----------+----------+
|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
+------+----------+---------+-------------+------+-----------+----------+
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|
+------+----------+---------+-------------+------+-----------+----------+



UDF is recommended for small datasets operations but not for large datasets.

Sample Data: courtesy to eforexcel.com


Copyright - There is no copyright on the code. You can copy, change and distribute it freely. Just mentioning this site should be fair
(C) November 2020, manivelcode

No comments: