PySpark setup in IntelliJ idea


This article demonstrates how to setup PySpark in Intellij Idea.

Download and Setup Intellij IDEA for PySpark

Step 1: Download Intellij Idea

You can download and install Intellij Idea from below location 

Step2: Prerequisite for PySpark

Makesure below softwares are installed and configured properly 
1. Java 2. Python 3. Spark 4. Hadoop Home for winutils.exe

Step3: Install Python plugin in Intellij Idea

Step4: Create new Python project in Intellij Idea

Step5: Setup PySpark required files in Project Structure

Step6: Create sample PySpark program

from pyspark.sql import SparkSession

appName = "AddColumnUsingUDF"
#Spark Session
spark = SparkSession.Builder().appName(appName).getOrCreate()

filePath = 'C:/tools/data/sampleEmpSalaryData.csv'
#Read csv file
sampleSalaryDF ='csv').options(header='true').load(filePath)

#DF has many columns, so restricting to limited columns for better display
#Here I am using Python list to pass column names, you may pass column names as directly in select method.
colList = ['Emp ID', 'First Name', 'Last Name', 'Date of Birth', 'Salary', 'Last % Hike']
sampleSalaryDF =*colList)


Step7: run PySpark program

This program will be executed without any issues if you configured Env property and PySpark files as specified above

Step8: Results Window

If program get executed without any issues, you can find results as like below

Simple Program created to read csv file, and tested its working with Intellij idea.

PySpark: Add column in dataframe using UDF


This article demonstrates how to add column in dataframe using UDF in PySpark.
The recommended or rdd.mapPartitions for large datasets processing instead of UDF because its slow in large datasets.
You can refer this link for and rdd.mapPartitions

Add new column in DataFrame using UDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

appName = "AddColumnUsingUDF"

#Spark Session
spark = SparkSession.Builder().appName(appName).getOrCreate()

filePath = 'C:/tools/data/sampleEmpSalaryData.csv'

#Read csv file
sampleSalaryDF ='csv').options(header='true').load(filePath)

#DF has many columns, so restricting to limited columns for better display
#Here I am using Python list to pass column names, you may pass column names as directly in select method.
colList = ['Emp ID', 'First Name', 'Last Name', 'Date of Birth', 'Salary', 'Last % Hike']
sampleSalaryDF =*colList)


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%|

using UDF in DataFrame:

#Using UDF in DataFrame - Option 1
#Python function for UDF
def addRevisedSalaryUDF(salary, lastHike):
    lastHike = int(lastHike.replace("%", ""))
    revisedSalary = float(salary) + (float(salary) * lastHike / 100)
    return revisedSalary

#create UDF function variable to use in DataFrame
revisedSalaryUDF = udf(addRevisedSalaryUDF, FloatType())

sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', revisedSalaryUDF("Salary", "Last % Hike"))


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|

Using UDF in Spark SQL:

#Using UDF in Spark SQL - Option 2
#Register a function as a UDF, you can optionally set the return type of your UDF
#The default return type is StringType
spark.udf.register("revisedSalary", addRevisedSalaryUDF, FloatType())

sampleSalaryDFUpdated = spark.sql("select a.*, revisedSalary(Salary, `Last % Hike`) as `New Salary` from SampleSalary a")


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|

Using UDF annotation syntax

#UDF using annotation syntax - Option 3
def addRevisedSalaryUDF(salary, lastHike):
    lastHike = int(lastHike.replace("%", ""))
    revisedSalary = float(salary) + (float(salary) * lastHike / 100)
    return revisedSalary

sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', addRevisedSalaryUDF("Salary", "Last % Hike"))


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|

UDF is recommended for small datasets operations but not for large datasets.

PySpark: Add column in dataframe using Map and Map Partitions


This article demonstrates how to add column in dataframe using Map and Map Partitions

What is the best way to add new column to DataFrame in PySpark
Here we are going to see adding column to DataFrame using withColumn, Spark SQL and Map function.
In this scenario, not much difference between withColumn and Spark SQL, but Map create huge difference. Try to use Spark SQL wherever applicable and possible because DataFrames and Spark SQL are much more sensitive than using
There are cases you will end up using Spark SQL UDF, during this time will be better option for large datasets processing from my perspective If you have having better hardware you may use mapPartitions.
Let us see example on each scenario.

Traditional approach to add new column in DataFrame

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

appName = "AddColumnUsingMap"

#Spark Session
spark = SparkSession.Builder().appName(appName).getOrCreate()

filePath = 'C:/tools/data/sampleEmpSalaryData.csv'

#Read csv file
sampleSalaryDF ='csv').options(header='true').load(filePath)

#DF has many columns, so restricting to limited columns for better display
#Here I am using Python list to pass column names, you may pass column names as directly in select method.
colList = ['Emp ID', 'First Name', 'Last Name', 'Date of Birth', 'Salary', 'Last % Hike']
sampleSalaryDF =*colList)


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%|


sampleSalaryDFUpdated = sampleSalaryDF.withColumn('New Salary', expr("Salary + (Salary * int(replace(`Last % Hike`, '%', '')) / 100)"))


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|


sampleSalaryDFUpdated = spark.sql("select a.*, Salary + (Salary * int(replace(`Last % Hike`, '%', '')) / 100) as `New Salary` from SampleSalary a")


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|

add new column in DataFrame: using

from pyspark.sql.types import *

#Create Python function to add new column
def addRevisedSalary(row):
    rowDict = row.asDict()
    revisedSalary = float(rowDict.get("Salary")) + \
                    (float(rowDict.get("Salary")) *
                     (int(rowDict.get("Last % Hike").replace("%", "")) / 100))
    rowDict["New Salary"] = str(revisedSalary)
    return rowDict

#Using Map
sampleSalaryUpdRDD =

#we need to pass updated schema while creating DataFrame
inputSchema = sampleSalaryDF.schema
newSchema = inputSchema.add("New Salary", StringType())

sampleSalaryDFUpdated = spark.createDataFrame(sampleSalaryUpdRDD, newSchema)


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|

add new column in DataFrame: using rdd.mapPartitions

#Create Python function to add new column for map partitions 
def addRevisedSalaryUsingPartition(partition):
    for row in partition:
        rowDict = row.asDict()
        revisedSalary = float(rowDict.get("Salary")) + \
                        (float(rowDict.get("Salary")) *
                         (int(rowDict.get("Last % Hike").replace("%", "")) / 100))
        rowDict["New Salary"] = str(revisedSalary)
        yield rowDict

#Using Map Partitions 
sampleSalaryUpdRDD = sampleSalaryDF.rdd.mapPartitions(addRevisedSalaryUsingPartition)

#we need to pass updated schema while creating DataFrame
inputSchema = sampleSalaryDF.schema
newSchema = inputSchema.add("New Salary", StringType())

#Create DataFrame
sampleSalaryDFUpdated = spark.createDataFrame(sampleSalaryUpdRDD, newSchema)


|Emp ID|First Name|Last Name|Date of Birth|Salary|Last % Hike|New Salary|
|677509|      Lois|   Walker|    3/29/1981|168251|        21%| 203583.71|
|940761|    Brenda| Robinson|    7/31/1970| 51063|        27%|  64850.01|
|428945|       Joe| Robinson|    6/16/1963| 50155|        16%|   58179.8|
|408351|     Diane|    Evans|    12/4/1977|180294|         1%| 182096.94|
|193819|  Benjamin|  Russell|    4/17/1977|117642|        13%| 132935.46|
+------+----------+---------+-------------+------+-----------+----------+ operation gives better performance compare to udf for complex transformation in DataFrame.
rdd.mapPartitions is more efficient than if you have good infra, not much benefit using local or single node env.
Spark SQL is recommended option for DataFrame transformations, if any complex transformation which is not possible in Spark SQL (in-build functions/expressions) then you can try using or rdd.mapPartitions.

