Introduction
This article demonstrates how to add column in dataframe using UDF in PySpark.
The recommended rdd.map or rdd.mapPartitions for large datasets processing instead of UDF because its slow in large datasets.
You can refer this link for rdd.map and rdd.mapPartitions
The recommended rdd.map or rdd.mapPartitions for large datasets processing instead of UDF because its slow in large datasets.
You can refer this link for rdd.map and rdd.mapPartitions
Add new column in DataFrame using UDF
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * appName = "AddColumnUsingUDF" #Spark Session spark = SparkSession.Builder().appName(appName).getOrCreate() filePath = 'C:/tools/data/sampleEmpSalaryData.csv' #Read csv file sampleSalaryDF = spark.read.format('csv').options(header='true').load(filePath) #DF has many columns, so restricting to limited columns for better display #Here I am using Python list to pass column names, you may pass column names as directly in select method. colList = ['Emp ID', 'First Name', 'Last Name', 'Date of Birth', 'Salary', 'Last % Hike'] sampleSalaryDF = sampleSalaryDF.select(*colList) sampleSalaryDF.show(5)
Output:
+------+----------+---------+-------------+------+-----------+ |Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike| +------+----------+---------+-------------+------+-----------+ |677509| Lois| Walker| 3/29/1981|168251| 21%| |940761| Brenda| Robinson| 7/31/1970| 51063| 27%| |428945| Joe| Robinson| 6/16/1963| 50155| 16%| |408351| Diane| Evans| 12/4/1977|180294| 1%| |193819| Benjamin| Russell| 4/17/1977|117642| 13%| +------+----------+---------+-------------+------+-----------+
using UDF in DataFrame:
#Using UDF in DataFrame - Option 1 #Python function for UDF def addRevisedSalaryUDF(salary, lastHike): lastHike = int(lastHike.replace("%", "")) revisedSalary = float(salary) + (float(salary) * lastHike / 100) return revisedSalary #create UDF function variable to use in DataFrame revisedSalaryUDF = udf(addRevisedSalaryUDF, FloatType()) sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', revisedSalaryUDF("Salary", "Last % Hike")) sampleSalaryDFUpdated.show(5)
Output:
+------+----------+---------+-------------+------+-----------+----------+ |Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary| +------+----------+---------+-------------+------+-----------+----------+ |677509| Lois| Walker| 3/29/1981|168251| 21%| 203583.71| |940761| Brenda| Robinson| 7/31/1970| 51063| 27%| 64850.01| |428945| Joe| Robinson| 6/16/1963| 50155| 16%| 58179.8| |408351| Diane| Evans| 12/4/1977|180294| 1%| 182096.94| |193819| Benjamin| Russell| 4/17/1977|117642| 13%| 132935.46| +------+----------+---------+-------------+------+-----------+----------+
Using UDF in Spark SQL:
#Using UDF in Spark SQL - Option 2 #Register a function as a UDF, you can optionally set the return type of your UDF #The default return type is StringType spark.udf.register("revisedSalary", addRevisedSalaryUDF, FloatType()) sampleSalaryDF.createOrReplaceTempView("SampleSalary") sampleSalaryDFUpdated = spark.sql("select a.*, revisedSalary(Salary, `Last % Hike`) as `New Salary` from SampleSalary a") sampleSalaryDFUpdated.show(5)
Output:
+------+----------+---------+-------------+------+-----------+----------+ |Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary| +------+----------+---------+-------------+------+-----------+----------+ |677509| Lois| Walker| 3/29/1981|168251| 21%| 203583.71| |940761| Brenda| Robinson| 7/31/1970| 51063| 27%| 64850.01| |428945| Joe| Robinson| 6/16/1963| 50155| 16%| 58179.8| |408351| Diane| Evans| 12/4/1977|180294| 1%| 182096.94| |193819| Benjamin| Russell| 4/17/1977|117642| 13%| 132935.46| +------+----------+---------+-------------+------+-----------+----------+
Using UDF annotation syntax
#UDF using annotation syntax - Option 3 @udf() def addRevisedSalaryUDF(salary, lastHike): lastHike = int(lastHike.replace("%", "")) revisedSalary = float(salary) + (float(salary) * lastHike / 100) return revisedSalary sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', addRevisedSalaryUDF("Salary", "Last % Hike")) sampleSalaryDFUpdated.show(5)
Output:
+------+----------+---------+-------------+------+-----------+----------+ |Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary| +------+----------+---------+-------------+------+-----------+----------+ |677509| Lois| Walker| 3/29/1981|168251| 21%| 203583.71| |940761| Brenda| Robinson| 7/31/1970| 51063| 27%| 64850.01| |428945| Joe| Robinson| 6/16/1963| 50155| 16%| 58179.8| |408351| Diane| Evans| 12/4/1977|180294| 1%| 182096.94| |193819| Benjamin| Russell| 4/17/1977|117642| 13%| 132935.46| +------+----------+---------+-------------+------+-----------+----------+
UDF is recommended for small datasets operations but not for large datasets.
Sample Data: courtesy to eforexcel.com
Copyright - There is no copyright on the code. You can copy, change and distribute it freely. Just mentioning this site should be fair
(C) November 2020, manivelcode
No comments:
Post a Comment