Menu

PySpark Exercises – 101 PySpark Exercises for Data Analysis

Written by Jagdeesh | 38 min read

101 PySpark exercises are designed to challenge your logical muscle and to help internalize data manipulation with python’s favorite package for data analysis. The questions are of 3 levels of difficulties with L1 being the easiest to L3 being the hardest.

You might also like to try out:

  1. 101 Pandas Exercises for Data Analysis
  2. 101 NumPy Exercises for Data Analysis

1. How to import PySpark and check the version?

Difficulty Level: L1

Show Solution

python

import findspark
findspark.init()

# Creating a SparkSession: A SparkSession is the entry point for using the PySpark DataFrame and SQL API.
# To create a SparkSession, use the following code
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 101 Exercises").getOrCreate()

# Get version details
print(spark.version)
python

#> 3.3.2

2. How to convert the index of a PySpark DataFrame into a column?

Difficulty Level: L1

Hint: The PySpark DataFrame doesn’t have an explicit concept of an index like Pandas DataFrame. However, if you have a DataFrame and you’d like to add a new column that is basically a row number.

Input:

python
# Assuming df is your DataFrame
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
| Name|Value|
+-------+-----+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
+-------+-----+

Expected Output:

python
+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+
Show Solution

python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()
python

+-------+-----+-----+
| Name|Value|index|
+-------+-----+-----+
| Alice| 1| 0|
| Bob| 2| 1|
|Charlie| 3| 2|
+-------+-----+-----+

3. How to combine many lists to form a PySpark DataFrame?

Difficulty Level: L1

Create a PySpark DataFrame from list1 and list2

Hint: For Creating DataFrame from multiple lists, first create an RDD (Resilient Distributed Dataset) from those lists and then convert the RDD to a DataFrame.

Input:

python
# Define your lists
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
Show Solution

python
# Create an RDD from the lists and convert it to a DataFrame
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))
df = rdd.toDF(["Column1", "Column2"])

# Show the DataFrame
df.show()

python

+-------+-------+
|Column1|Column2|
+-------+-------+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+-------+-------+

4. How to get the items of list A not present in list B?

Difficulty Level: L2

Get the items of list_A not present in list_B in PySpark, you can use the subtract operation on RDDs (Resilient Distributed Datasets).

Input:

python
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

Expected Output:

python
#> [1, 2, 3]
Show Solution

python
sc = spark.sparkContext

# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

# Perform subtract operation
result_rdd = rdd_A.subtract(rdd_B)

# Collect result
result_list = result_rdd.collect()
print(result_list)
python
 #> [1, 2, 3]

5. How to get the items not common to both list A and list B?

Difficulty Level: L2

Get all items of list_A and list_B not common to both.

Input:

python
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
Show Solution

python
sc = spark.sparkContext

# Convert lists to RDD
rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

# Perform subtract operation
result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)

# Union the two RDDs
result_rdd = result_rdd_A.union(result_rdd_B)

# Collect result
result_list = result_rdd.collect()

print(result_list)
python
[1, 2, 3, 8, 6, 7]

6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?

Difficulty Level: L2

Compute the minimum, 25th percentile, median, 75th, and maximum of column Age

input

python
# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()
python
+----+---+
|Name|Age|
+----+---+
| A| 10|
| B| 20|
| C| 30|
| D| 40|
| E| 50|
| F| 15|
| G| 28|
| H| 54|
| I| 41|
| J| 86|
+----+---+
Show Solution

python
# Calculate percentiles
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])
python
Min: 10.0
25th percentile: 20.0
Median: 30.0
75th percentile: 50.0
Max: 86.0

7. How to get frequency counts of unique items of a column?

Difficulty Level: L1

Calculte the frequency counts of each unique value

Input

python
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
python
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solution

python
df.groupBy("job").count().show()
python

+---------+-----+
| job|count|
+---------+-----+
| Engineer| 4|
|Scientist| 2|
| Doctor| 1|
+---------+-----+

8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

Difficulty Level: L3

Input

python
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
python
+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Doctor|
+----+---------+
Show Solution

python
from pyspark.sql.functions import col, when

# Get the top 2 most frequent jobs
top_2_jobs = df.groupBy('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()

# Replace all but the top 2 most frequent jobs with 'Other'
df = df.withColumn('job', when(col('job').isin(top_2_jobs), col('job')).otherwise('Other'))

# show DataFrame
df.show()
python

+----+---------+
|name| job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam| Other|
+----+---------+

9. How to Drop rows with NA values specific to a particular column?

Difficulty Level: L1

input

python
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()
python
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solution

python
df_2 = df.dropna(subset=['Value'])

df_2.show()
python

+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| 3| 456|
+----+-----+----+

10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

Difficulty Level: L2

Input

python
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()
python
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
+----+----+----+
Show Solution

python
# renaming
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)

df.show()
python

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
| 1| 2| 3|
| 4| 5| 6|
+--------+--------+--------+

11. How to bin a numeric list to 10 groups of equal size?

Difficulty Level: L2

Input

python
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))

df.show(5)
python
+-------------------+
| values|
+-------------------+
| 0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows
Show Solution

python
# Define the bucket boundaries
num_buckets = 10
quantiles = df.stat.approxQuantile("values", [i/num_buckets for i in range(num_buckets+1)], 0.01)

# Create the Bucketizer
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")

# Apply the Bucketizer
df_buck = bucketizer.transform(df)

#Frequency table
df_buck.groupBy("buckets").count().show()

# Show the original and bucketed values
df_buck.show(5)
python

+-------+-----+
|buckets|count|
+-------+-----+
| 8.0| 10|
| 0.0| 8|
| 7.0| 10|
| 1.0| 10|
| 4.0| 10|
| 3.0| 10|
| 2.0| 10|
| 6.0| 10|
| 5.0| 10|
| 9.0| 12|
+-------+-----+

+-------------------+-------+
| values|buckets|
+-------------------+-------+
| 0.619189370225301| 5.0|
| 0.5096018842446481| 3.0|
| 0.8325259388871524| 8.0|
|0.26322809041172357| 2.0|
| 0.6702867696264135| 6.0|
+-------------------+-------+
only showing top 5 rows

12. How to create contigency table?

Difficulty Level: L1

Input

python
# Example DataFrame
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.show()
python
+---------+---------+
|category1|category2|
+---------+---------+
| A| X|
| A| Y|
| A| X|
| B| Y|
| B| X|
| C| X|
| C| X|
| C| Y|
+---------+---------+
Show Solution

python
# Frequency
df.cube("category1").count().show()

# Contingency table
df.crosstab('category1', 'category2').show()
python

+---------+-----+
|category1|count|
+---------+-----+
| null| 8|
| A| 3|
| B| 2|
| C| 3|
+---------+-----+

+-------------------+---+---+
|category1_category2| X| Y|
+-------------------+---+---+
| A| 2| 1|
| B| 1| 1|
| C| 2| 1|
+-------------------+---+---+

13. How to find the numbers that are multiples of 3 from a column?

Difficulty Level: L2

Input

python
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()
python
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solution

python
from pyspark.sql.functions import col, when

# Assuming df is your DataFrame and "your_column" is the column with the numbers
df = df.withColumn("is_multiple_of_3", when(col("random") % 3 == 0, 1).otherwise(0))

df.show()
python

+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 1|
| 3| 7| 0|
| 4| 3| 1|
| 5| 8| 0|
| 6| 9| 1|
| 7| 8| 0|
| 8| 3| 1|
| 9| 8| 0|
+---+------+----------------+

14. How to extract items at given positions from a column?

Difficulty Level: L2

Input

python
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

pos = [0, 4, 8, 5]
python
+---+------+
| id|random|
+---+------+
| 0| 7|
| 1| 6|
| 2| 9|
| 3| 7|
| 4| 3|
| 5| 8|
| 6| 9|
| 7| 8|
| 8| 3|
| 9| 8|
+---+------+
Show Solution

python
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

pos = [0, 4, 8, 5]

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

# Filter the DataFrame based on the specified positions
df_filtered = df.filter(df.index.isin(pos))

df_filtered.show()
python

+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 1| 6| 1|
| 2| 9| 2|
| 3| 7| 3|
| 4| 3| 4|
| 5| 8| 5|
| 6| 9| 6|
| 7| 8| 7|
| 8| 3| 8|
| 9| 8| 9|
+---+------+-----+

+---+------+-----+
| id|random|index|
+---+------+-----+
| 0| 7| 0|
| 4| 3| 4|
| 5| 8| 5|
| 8| 3| 8|
+---+------+-----+

15. How to stack two DataFrames vertically ?

Difficulty Level: L1

Input

python
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()
python
+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
+------+-----+-----+

+------+-----+-----+
| Name|Col_1|Col_3|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+
Show Solution

python
df_A.union(df_B).show()
python

+------+-----+-----+
| Name|Col_1|Col_2|
+------+-----+-----+
| apple| 3| 5|
|banana| 1| 10|
|orange| 2| 8|
| apple| 3| 5|
|banana| 1| 15|
| grape| 4| 6|
+------+-----+-----+

16. How to compute the mean squared error on a truth and predicted columns?

Difficulty Level: L2

Input

python
# Assume you have a DataFrame df with two columns "actual" and "predicted"
# For the sake of example, we'll create a sample DataFrame
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])

df.show()
python
+------+---------+
|actual|predicted|
+------+---------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
| 5| 25|
+------+---------+
Show Solution

python
# Calculate the squared differences
df = df.withColumn("squared_error", pow((col("actual") - col("predicted")), 2))

# Calculate the mean squared error
mse = df.agg({"squared_error": "avg"}).collect()[0][0]

print(f"Mean Squared Error (MSE) = {mse}")
python

Mean Squared Error (MSE) = 116.8

17. How to convert the first character of each element in a series to uppercase?

Difficulty Level: L1

python
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()
python
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solution

python
from pyspark.sql.functions import initcap

# Convert the first character to uppercase
df = df.withColumn("name", initcap(df["name"]))

df.show()
python

+-----+
| name|
+-----+
| John|
|Alice|
| Bob|
+-----+

18. How to compute summary statistics for all columns in a dataframe

Difficulty Level: L1

python
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
python
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solution

python
# Summary statistics
summary = df.summary()

# Show the summary statistics
summary.show()
python

+-------+------+------------------+-----------------+
|summary| name| age| salary|
+-------+------+------------------+-----------------+
| count| 5| 5| 5|
| mean| null| 32.4| 66000.0|
| stddev| null|3.2093613071762417|9617.692030835673|
| min| James| 29| 55000|
| 25%| null| 30| 60000|
| 50%| null| 32| 65000|
| 75%| null| 34| 70000|
| max|Robert| 37| 80000|
+-------+------+------------------+-----------------+

19. How to calculate the number of characters in each word in a column?

Difficulty Level: L1

python
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()
python
+-----+
| name|
+-----+
| john|
|alice|
| bob|
+-----+
Show Solution

python
from pyspark.sql import functions as F

df = df.withColumn('word_length', F.length(df.name))
df.show()
python

+-----+-----------+
| name|word_length|
+-----+-----------+
| john| 4|
|alice| 5|
| bob| 3|
+-----+-----------+

20 How to compute difference of differences between consecutive numbers of a column?

Difficulty Level: L2

python
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
python
+-------+---+------+
| name|age|salary|
+-------+---+------+
| James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
| Maria| 29| 80000|
| Jen| 32| 65000|
+-------+---+------+
Show Solution

python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define window specification
df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")

# Generate the lag of the variable
df = df.withColumn("prev_value", F.lag(df.salary).over(window))

# Compute the difference with lag
df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")

df.show()
python

+-------+---+------+----------+------+
| name|age|salary|prev_value| diff|
+-------+---+------+----------+------+
| James| 34| 55000| null| 0|
|Michael| 30| 70000| 55000| 15000|
| Robert| 37| 60000| 70000|-10000|
| Maria| 29| 80000| 60000| 20000|
| Jen| 32| 65000| 80000|-15000|
+-------+---+------+----------+------+

21. How to get the day of month, week number, day of year and day of week from a date strings?

Difficulty Level: L2

python
# example data
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()
python
+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+
Show Solution

python
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

# Convert date string to date format
df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))

df.show()
python

+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2| date_1| date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01| 18| 20| 138| 5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01| 31| 52| 365| 1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+

22. How to convert year-month string to dates corresponding to the 4th day of the month?

Difficulty Level: L2

python
# example dataframe
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])

df.show()
python
+---------+
|MonthYear|
+---------+
| Jan 2010|
| Feb 2011|
| Mar 2012|
+---------+
Show Solution

python
from pyspark.sql.functions import expr, col

# convert YearMonth to date (default to first day of the month)
df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))

df.show()

# replace day with 4
df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))

df.show()
python

+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+

+---------+----------+
|MonthYear| Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+

23 How to filter words that contain atleast 2 vowels from a series?

Difficulty Level: L3

python
# example dataframe
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])

df.show()
python
+------+
| Word|
+------+
| Apple|
|Orange|
| Plan|
|Python|
| Money|
+------+
Show Solution

python
from pyspark.sql.functions import col, length, translate

# Filter words that contain at least 2 vowels
df_filtered = df.where((length(col('Word')) - length(translate(col('Word'), 'AEIOUaeiou', ''))) >= 2)
df_filtered.show()
python

+------+
| Word|
+------+
| Apple|
|Orange|
| Money|
+------+

24. How to filter valid emails from a list?

Difficulty Level: L3

python
# Create a list
data = ['buying books at amazom.com', 'rameses@egypt.com', 'matt@t.co', 'narendra@modi.com']

# Convert the list to DataFrame
df = spark.createDataFrame(data, "string")
df.show(truncate =False)
python
+--------------------------+
|value |
+--------------------------+
|buying books at amazom.com|
|rameses@egypt.com |
|matt@t.co |
|narendra@modi.com |
+--------------------------+
Show Solution

python
# Define a regular expression pattern for emails
pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"

# Apply filter operation to keep only valid emails
df_filtered = df.filter(F.col("value").rlike(pattern))

# Show the DataFrame
df_filtered.show()
python

+-----------------+
| value|
+-----------------+
|rameses@egypt.com|
| matt@t.co|
|narendra@modi.com|
+-----------------+

25. How to Pivot PySpark DataFrame?

Convert region categories to Columns and sum the revenue

Difficulty Level: L3

python
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()
python
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 1| US| 5000|
|2021| 1| EU| 4000|
|2021| 2| US| 5500|
|2021| 2| EU| 4500|
|2021| 3| US| 6000|
|2021| 3| EU| 5000|
|2021| 4| US| 7000|
|2021| 4| EU| 6000|
+----+-------+------+-------+
Show Solution

python
# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()
python

+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

26. How to get the mean of a variable grouped by another variable?

Difficulty Level: L3

python
# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]

# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

df.show()
python
+-------+----------+-----+
|OrderID| Product|Price|
+-------+----------+-----+
| 1001| Laptop| 1000|
| 1002| Mouse| 50|
| 1003| Laptop| 1200|
| 1004| Mouse| 30|
| 1005|Smartphone| 700|
+-------+----------+-----+
Show Solution

python
from pyspark.sql.functions import mean

# GroupBy and aggregate
result = df.groupBy("Product").agg(mean("Price").alias("Total_Sales"))

# Show results
result.show()
python

+----------+-----------+
| Product|Total_Sales|
+----------+-----------+
| Laptop| 1100.0|
| Mouse| 40.0|
|Smartphone| 700.0|
+----------+-----------+

27. How to compute the euclidean distance between two columns?

Difficulty Level: L3

Compute the euclidean distance between series (points) p and q, without using a packaged formula.

python
# Define your series
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]

# Convert list to DataFrame
df = spark.createDataFrame(data, ["series1", "series2"])

df.show()
python
+-------+-------+
|series1|series2|
+-------+-------+
| 1| 10|
| 2| 9|
| 3| 8|
| 4| 7|
| 5| 6|
| 6| 5|
| 7| 4|
| 8| 3|
| 9| 2|
| 10| 1|
+-------+-------+
Show Solution

python
from pyspark.sql.functions import expr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Convert series to vectors
vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)

# Calculate squared differences
df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))

# Sum squared differences and take square root
df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()
python

+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+

28. How to replace missing spaces in a string with the least frequent character?

Difficulty Level: L3

Replace the spaces in my_str with the least frequent characte

python
#Sample DataFrame
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()
python
+-----------------+
| string|
+-----------------+
|dbc deb abed gade|
+-----------------+

Desired output

python
+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+
Show Solution

python
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType
from collections import Counter

def least_freq_char_replace_spaces(s):
counter = Counter(s.replace(" ", ""))
least_freq_char = min(counter, key = counter.get)
return s.replace(' ', least_freq_char)

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())

df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string'])).show()
python

+-----------------+-----------------+
| string| modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+

29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?

Difficulty Level: L3

Desired output

values can be random

python
+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+
Show Solution

python
from pyspark.sql.functions import expr, explode, sequence, rand

# Start date and end date (start + 10 weekends)
start_date = '2000-01-01'
end_date = '2000-03-04' # Calculated manually: 10 weekends (Saturdays) from start date

# Create a DataFrame with one row containing a sequence from start_date to end_date with a 1 day step
df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)

# Filter out the weekdays (retain weekends)
df = df.filter(expr("dayofweek(date) = 7")) # 7 corresponds to Saturday in Spark

# Add the random numbers column
#df = df.withColumn("random_numbers", rand()*10)
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()
python

+----------+--------------+
| date|random_numbers|
+----------+--------------+
|2000-01-01| 8|
|2000-01-08| 3|
|2000-01-15| 8|
|2000-01-22| 5|
|2000-01-29| 4|
|2000-02-05| 6|
|2000-02-12| 8|
|2000-02-19| 1|
|2000-02-26| 9|
|2000-03-04| 3|
+----------+--------------+

30. How to get the nrows, ncolumns, datatype of a dataframe?

Difficiulty Level: L1

Get the number of rows, columns, datatype and summary statistics of each column of the Churn_Modelling dataset. Also get the numpy array and list equivalent of the dataframe

python
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling.csv", header=True, inferSchema=True)

df.show(5, truncate=False)
python
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86 |1 |0 |1 |112542.58 |0 |
|3 |15619304 |Onio |502 |France |Female|42 |8 |159660.8 |3 |1 |0 |113931.57 |1 |
|4 |15701354 |Boni |699 |France |Female|39 |1 |0.0 |2 |0 |0 |93826.63 |0 |
|5 |15737888 |Mitchell|850 |Spain |Female|43 |2 |125510.82|1 |1 |1 |79084.1 |0 |
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
only showing top 5 rows
Show Solution

python
# For number of rows
nrows = df.count()
print("Number of Rows: ", nrows)

# For number of columns
ncols = len(df.columns)
print("Number of Columns: ", ncols)

# For data types of each column
datatypes = df.dtypes
print("Data types: ", datatypes)
python

Number of Rows: 10000
Number of Columns: 14
Data types: [('RowNumber', 'int'), ('CustomerId', 'int'), ('Surname', 'string'), ('CreditScore', 'int'), ('Geography', 'string'), ('Gender', 'string'), ('Age', 'int'), ('Tenure', 'int'), ('Balance', 'double'), ('NumOfProducts', 'int'), ('HasCrCard', 'int'), ('IsActiveMember', 'int'), ('EstimatedSalary', 'double'), ('Exited', 'int')]

31. How to rename a specific columns in a dataframe?

Difficiulty Level: L2

Input

python
# Suppose you have the following DataFrame
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])

df.show()

# Rename lists for specific columns
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]
python
+-----+---+---+
| name|age|qty|
+-----+---+---+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+---+---+
Show Solution

python
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]

# You can then rename the columns like this:
for old_name, new_name in zip(old_names, new_names):
df = df.withColumnRenamed(old_name, new_name)

df.show()
python

+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice| 1| 30|
| Bob| 2| 35|
+-----+--------+--------+

32. How to check if a dataframe has any missing values and count of missing values in each column?

Difficulty Level: L2

Input

python
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()
python
+----+-----+----+
|Name|Value| id|
+----+-----+----+
| A| 1|null|
| B| null| 123|
| B| 3| 456|
| D| null|null|
+----+-----+----+
Show Solution

python
from pyspark.sql.functions import col, sum

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)
python

True
{'Name': 0, 'Value': 2, 'id': 2}

33 How to replace missing values of multiple numeric columns with the mean?

Difficulty Level: L2

Input

python
df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])

df.show()
python
+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1|null|
| B|null| 123|
| B| 3| 456|
| D| 6|null|
+----+----+----+
Show Solution

python
from pyspark.ml.feature import Imputer

column_names = ["var1", "var2"]

# Initialize the Imputer
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")

# Fit the Imputer
model = imputer.fit(df)

#Transform the dataset
imputed_df = model.transform(df)

imputed_df.show(5)
python

+----+----+----+
|Name|var1|var2|
+----+----+----+
| A| 1| 289|
| B| 3| 123|
| B| 3| 456|
| D| 6| 289|
+----+----+----+

34. How to change the order of columns of a dataframe?

Difficulty Level: L1

Input

python
# Sample data
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]

# Create DataFrame from the data
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])

# Show the DataFrame
df.show()
python
+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
| John| Doe| 30|
| Jane| Doe| 25|
| Alice| Smith| 22|
+----------+---------+---+
Show Solution

python
new_order = ["Age", "First_Name", "Last_Name"]

# Reorder the columns
df = df.select(*new_order)

# Show the DataFrame with reordered columns
df.show()
python

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30| John| Doe|
| 25| Jane| Doe|
| 22| Alice| Smith|
+---+----------+---------+

35. How to format or suppress scientific notations in a PySpark DataFrame?

python
# Assuming you have a DataFrame df and the column you want to format is 'your_column'
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])

df.show()
python
+---+-----------+
| id|your_column|
+---+-----------+
| 1| 1.23E-7|
| 2| 2.3456E-5|
| 3| 3.45678E-4|
+---+-----------+
Show Solution

python
from pyspark.sql.functions import format_number

# Determine the number of decimal places you want
decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()
python

+---+------------+
| id| your_column|
+---+------------+
| 1|0.0000001230|
| 2|0.0000234560|
| 3|0.0003456780|
+---+------------+

36. How to format all the values in a dataframe as percentages?

Difficulty Level: L2

Input

python
# Sample data
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])

df.show()
python
+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 0.1| 0.08|
| 0.2| 0.06|
| 0.33| 0.02|
+---------+---------+
Show Solution

python
from pyspark.sql.functions import concat, col, lit

columns = ["numbers_1", "numbers_2"]

for col_name in columns:
df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()
python

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
| 10.00%| 8.00%|
| 20.00%| 6.00%|
| 33.00%| 2.00%|
+---------+---------+

37. How to filter every nth row in a dataframe?

Difficulty Level: L2

Input

python
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]

# Create DataFrame
df = spark.createDataFrame(data, ["Name", "Number"])

df.show()
python
+-------+------+
| Name|Number|
+-------+------+
| Alice| 1|
| Bob| 2|
|Charlie| 3|
| Dave| 4|
| Eve| 5|
| Frank| 6|
| Grace| 7|
| Hannah| 8|
| Igor| 9|
| Jack| 10|
+-------+------+
Show Solution

python
# Define window
window = Window.orderBy(monotonically_increasing_id())

# Add row_number to DataFrame
df = df.withColumn("rn", row_number().over(window))

n = 5 # filter every 5th row

# Filter every nth row
df = df.filter((df.rn % n) == 0)

df.show()
python

+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve| 5| 5|
|Jack| 10| 10|
+----+------+---+

38 How to get the row number of the nth largest value in a column?

Difficulty Level: L2

Input

python
from pyspark.sql import Row

# Sample Data
data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]

df = spark.createDataFrame(data)
df.show()
python
+---+-------+
| id|column1|
+---+-------+
| 1| 5|
| 2| 8|
| 3| 12|
| 4| 1|
| 5| 15|
| 6| 7|
+---+-------+
Show Solution

python
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number

window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))

n = 3 # We're interested in the 3rd largest value.
row = df.filter(df.row_number == n).first()

if row:
print("Row number:", row.row_number)
print("Column value:", row.column1)
python

Row number: 3
Column value: 8

39. How to get the last n rows of a dataframe with row sum > 100?

Difficulty Level: L2

Input

python
# Sample data
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Display original DataFrame
df.show()
python
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 10| 25| 70|
| 40| 5| 20|
| 70| 80| 100|
| 10| 2| 60|
| 40| 50| 20|
+----+----+----+
Show Solution

python
from pyspark.sql import functions as F
from functools import reduce

# Add 'row_sum' column
df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [F.col(x) for x in df.columns]))

# Display DataFrame with 'row_sum'
df.show()

# Filter rows where 'row_sum' > 100
df = df.filter(F.col('row_sum') > 100)

# Display filtered DataFrame
df.show()

# Add 'id' column
df = df.withColumn('id', F.monotonically_increasing_id())

# Get the last 2 rows
df_last_2 = df.sort(F.desc('id')).limit(2)

# Display the last 2 rows
df_last_2.show()
python

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 40| 5| 20| 65|
| 70| 80| 100| 250|
| 10| 2| 60| 72|
| 40| 50| 20| 110|
+----+----+----+-------+

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
| 10| 25| 70| 105|
| 70| 80| 100| 250|
| 40| 50| 20| 110|
+----+----+----+-------+

+----+----+----+-------+-----------+
|col1|col2|col3|row_sum| id|
+----+----+----+-------+-----------+
| 40| 50| 20| 110|25769803776|
| 70| 80| 100| 250|17179869184|
+----+----+----+-------+-----------+

40. How to create a column containing the minimum by maximum of each row?

Difficulty Level: L2

Input

python
# Sample Data
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

df.show()
python
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
| 10| 11| 12|
+----+----+----+
Show Solution

python
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

# Define UDF
def min_max_ratio(row):
return float(min(row)) / max(row)

min_max_ratio_udf = udf(min_max_ratio, FloatType())

# Apply UDF to create new column
df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))

df.show()
python

+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
| 1| 2| 3|0.33333334|
| 4| 5| 6| 0.6666667|
| 7| 8| 9| 0.7777778|
| 10| 11| 12| 0.8333333|
+----+----+----+----------+

41. How to create a column that contains the penultimate value in each row?

Difficulty Level: L2

Create a new column ‘penultimate’ which has the second largest value of each row of df

Input

python
data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]

df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

df.show()
python
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 10| 20| 30|
| 40| 60| 50|
| 80| 70| 90|
+-------+-------+-------+
Show Solution

python
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Define UDF to sort array in descending order
sort_array_desc = F.udf(lambda arr: sorted(arr), ArrayType(IntegerType()))

# Create array from columns, sort in descending order and get the penultimate value
df = df.withColumn("row_as_array", sort_array_desc(F.array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')

df.show()
python

+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
| 10| 20| 30| 20|
| 40| 60| 50| 50|
| 80| 70| 90| 80|
+-------+-------+-------+-----------+

42. How to normalize all columns in a dataframe?

Difficulty Level: L2

Normalize all columns of df by subtracting the column mean and divide by standard deviation.

Range all columns of df such that the minimum value in each column is 0 and max is 1.

Input

python
# create a sample dataframe
data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]

df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])

df.show()
python
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 4| 5| 6|
+----+----+----+
Show Solution

python
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

# define the list of columns to be normalized
input_cols = ["Col1", "Col2", "Col3"]

# initialize VectorAssembler with input and output column names
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# transform the data
df_assembled = assembler.transform(df)

# initialize StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# fit and transform the data
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)

# if you want to drop the original 'features' column
df_normalized = df_normalized.drop('features')

df_normalized.show(truncate=False)
python

+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features |
+----+----+----+-------------------------------------------------------------+
|1 |2 |3 |[-1.161895003862225,-1.161895003862225,-1.161895003862225] |
|2 |3 |4 |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3 |4 |5 |[0.3872983346207417,0.3872983346207417,0.3872983346207417] |
|4 |5 |6 |[1.161895003862225,1.161895003862225,1.161895003862225] |
+----+----+----+-------------------------------------------------------------+

43. How to get the positions where values of two columns match?

Difficulty Level: L1

Input

python
# Create sample DataFrame
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])

df.show()
python
+-----+-----+
|Name1|Name2|
+-----+-----+
| John| John|
| Lily| Lucy|
| Sam| Sam|
| Lucy| Lily|
+-----+-----+
Show Solution

python
from pyspark.sql.functions import when
from pyspark.sql.functions import col

# Add new column Match to indicate if Name1 and Name2 match
df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))

# Display DataFrame
df.show()
python

+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
| Sam| Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+

44. How to create lags and leads of a column by group in a dataframe?

Difficulty Level: L2

Input

python
# Create a sample DataFrame
data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]

df = spark.createDataFrame(data, ["Date", "Store", "Sales"])

df.show()
python
+----------+------+-----+
| Date| Store|Sales|
+----------+------+-----+
|2023-01-01|Store1| 100|
|2023-01-02|Store1| 150|
|2023-01-03|Store1| 200|
|2023-01-04|Store1| 250|
|2023-01-05|Store1| 300|
|2023-01-01|Store2| 50|
|2023-01-02|Store2| 60|
|2023-01-03|Store2| 80|
|2023-01-04|Store2| 90|
|2023-01-05|Store2| 120|
+----------+------+-----+
Show Solution

python
from pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window

# Convert the date from string to date type
df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))

# Create a Window partitioned by Store, ordered by Date
windowSpec = Window.partitionBy("Store").orderBy("Date")

# Create lag and lead variables
df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()
python

+----------+------+-----+---------+----------+
| Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1| 100| null| 150|
|2023-01-02|Store1| 150| 100| 200|
|2023-01-03|Store1| 200| 150| 250|
|2023-01-04|Store1| 250| 200| 300|
|2023-01-05|Store1| 300| 250| null|
|2023-01-01|Store2| 50| null| 60|
|2023-01-02|Store2| 60| 50| 80|
|2023-01-03|Store2| 80| 60| 90|
|2023-01-04|Store2| 90| 80| 120|
|2023-01-05|Store2| 120| 90| null|
+----------+------+-----+---------+----------+

45. How to get the frequency of unique values in the entire dataframe?

Difficulty Level: L3

Get the frequency of unique values in the entire dataframe df.

Input

python
# Create a numeric DataFrame
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

# Print DataFrame
df.show()
python
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
| 1| 2| 3|
| 2| 3| 4|
| 1| 2| 3|
| 4| 5| 6|
| 2| 3| 4|
+-------+-------+-------+
Show Solution

python
from pyspark.sql.functions import col

# get column names
columns = df.columns

# stack all columns into a single column
df_single = None

for c in columns:
if df_single is None:
df_single = df.select(col(c).alias("single_column"))
else:
df_single = df_single.union(df.select(col(c).alias("single_column")))

# generate frequency table
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)

# show frequency table
frequency_table.show()
python

+-------------+-----+
|single_column|count|
+-------------+-----+
| 3| 4|
| 2| 4|
| 4| 3|
| 1| 2|
| 5| 1|
| 6| 1|
+-------------+-----+

46. How to replace both the diagonals of dataframe with 0?

Difficulty Level: L3

Replace both values in both diagonals of df with 0.

Input

python
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()
python
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solution

python
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.functions import when, col

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("id", row_number().over(w) - 1)

df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

# Create a reverse id column
df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])

df_with_diag_zero = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

df_with_diag_zero.show()
python

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 0| 2| 3| 0|
| 2| 0| 0| 5|
| 1| 0| 0| 4|
| 0| 5| 6| 0|
+-----+-----+-----+-----+

47. How to reverse the rows of a dataframe?

Difficulty Level: L2

Reverse all the rows of dataframe df.

Input

python
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()
python
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 3| 4| 5| 6|
| 4| 5| 6| 7|
+-----+-----+-----+-----+
Show Solution

python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("id", row_number().over(w) - 1)

df_2 = df.orderBy("id", ascending=False).drop("id")

df_2.show()
python

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 4| 5| 6| 7|
| 3| 4| 5| 6|
| 2| 3| 4| 5|
| 1| 2| 3| 4|
+-----+-----+-----+-----+

48. How to create one-hot encodings of a categorical variable (dummy variables)?

Difficulty Level: L2

Get one-hot encodings for column Categories in the dataframe df and append it as columns.

Input

python
data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]

df = spark.createDataFrame(data, columns)
df.show()
python
+----------+-----+
|Categories|Value|
+----------+-----+
| A| 10|
| A| 20|
| B| 30|
| B| 20|
| B| 30|
| C| 40|
| C| 10|
| D| 10|
+----------+-----+
Show Solution

python
from pyspark.ml.feature import StringIndexer, OneHotEncoder
#from pyspark.sql.types import StringType, StructType, StructField

# StringIndexer Initialization
indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)

# Transform the DataFrame using the fitted StringIndexer model
indexed_df = indexerModel.transform(df)
#indexed_df.show()

encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)
python

+----------+-----+-----------------+
|Categories|Value|Categories_onehot|
+----------+-----+-----------------+
|A |10 |(3,[1],[1.0]) |
|A |20 |(3,[1],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|B |20 |(3,[0],[1.0]) |
|B |30 |(3,[0],[1.0]) |
|C |40 |(3,[2],[1.0]) |
|C |10 |(3,[2],[1.0]) |
|D |10 |(3,[],[]) |
+----------+-----+-----------------+

49. How to Pivot the dataframe (converting rows into columns) ?

Difficulty Level: L2

convert region column categories to Column

Input

python
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
Show Solution

python
# Execute the pivot operation
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()
python

+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

50. How to UnPivot the dataframe (converting columns into rows) ?

Difficulty Level: L2

UnPivot EU, US columns and create region, revenue Columns

Input

python
# Sample data
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]

# Create DataFrame
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)

df.show()
python
+----+-------+----+----+
|year|quarter| EU| US|
+----+-------+----+----+
|2021| 2|4500|5500|
|2021| 1|4000|5000|
|2021| 3|5000|6000|
|2021| 4|6000|7000|
+----+-------+----+----+

Expected Output

python
+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+
Show Solution

python
from pyspark.sql.functions import expr

unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"

unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")

unPivotDF.show()
python

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021| 2| EU| 4500|
|2021| 2| US| 5500|
|2021| 1| EU| 4000|
|2021| 1| US| 5000|
|2021| 3| EU| 5000|
|2021| 3| US| 6000|
|2021| 4| EU| 6000|
|2021| 4| US| 7000|
+----+-------+------+-------+

51. How to impute missing values with Zero?

Difficulty Level: L1

Input

python
# Suppose df is your DataFrame
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])

df.show()
python
+----+----+
| a| b|
+----+----+
| 1|null|
|null| 2|
| 3| 4|
| 5|null|
+----+----+
Show Solution

python
df_imputed = df.fillna(0)

df_imputed.show()
python

+---+---+
| a| b|
+---+---+
| 1| 0|
| 0| 2|
| 3| 4|
| 5| 0|
+---+---+

52. How to identify continuous variables in a dataframe and create a list of those column names?

Difficulty Level: L3

Input

python
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling_m.csv", header=True, inferSchema=True)

df.show(2, truncate=False)
python
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1 |15634602 |Hargrave|619 |France |Female|42 |2 |0.0 |1 |1 |1 |101348.88 |1 |
|2 |15647311 |Hill |608 |Spain |Female|41 |1 |83807.86|1 |0 |1 |112542.58 |0 |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows
Show Solution

python
from pyspark.sql.types import IntegerType, StringType, NumericType
from pyspark.sql.functions import approxCountDistinct

def detect_continuous_variables(df, distinct_threshold):
"""
Identify continuous variables in a PySpark DataFrame.
:param df: The input PySpark DataFrame
:param distinct_threshold: Threshold to qualify as continuous variables - Count of distinct values > distinct_threshold
:return: A List containing names of continuous variables
"""
continuous_columns = []
for column in df.columns:
dtype = df.schema[column].dataType
if isinstance(dtype, (IntegerType, NumericType)):
distinct_count = df.select(approxCountDistinct(column)).collect()[0][0]
if distinct_count > distinct_threshold:
continuous_columns.append(column)
return continuous_columns

continuous_variables = detect_continuous_variables(df, 10)
print(continuous_variables)
python

['RowNumber', 'CustomerId', 'CreditScore', 'Age', 'Tenure', 'Balance', 'EstimatedSalary']

53. How to calculate Mode of a PySpark DataFrame column?

Difficulty Level: L1

Input

python
# Create a sample DataFrame
data = [(1, 2, 3), (2, 2, 3), (2, 2, 4), (1, 2, 3), (1, 1, 3)]
columns = ["col1", "col2", "col3"]

df = spark.createDataFrame(data, columns)

df.show()
python
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 3|
| 2| 2| 3|
| 2| 2| 4|
| 1| 2| 3|
| 1| 1| 3|
+----+----+----+
Show Solution

python
from pyspark.sql.functions import col

df_grouped = df.groupBy('col2').count()
mode_df = df_grouped.orderBy(col('count').desc()).limit(1)

mode_df.show()
python

+----+-----+
|col2|count|
+----+-----+
| 2| 4|
+----+-----+

54. How to find installed location of Apache Spark and PySpark?

Difficulty Level: L1

Show Solution

python
import findspark
findspark.init()

print(findspark.find())

import os
import pyspark

print(os.path.dirname(pyspark.__file__))
python

C:\spark\spark-3.3.2-bin-hadoop2
C:\spark\spark-3.3.2-bin-hadoop2\python\pyspark

55. How to convert a column to lower case using UDF?

Difficulty Level: L2

Input

python
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

df = spark.createDataFrame(data, ['Name', 'City'])

df.show()
python
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solution

python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define your UDF function
def to_lower(s):
if s is not None:
return s.lower()

# Convert your Python function to a Spark UDF
udf_to_lower = udf(to_lower, StringType())

# Apply your UDF to the DataFrame
df = df.withColumn('City_lower', udf_to_lower(df['City']))

# Show the DataFrame
df.show()
python

+------------+-------------+-------------+
| Name| City| City_lower|
+------------+-------------+-------------+
| John Doe| NEW YORK| new york|
| Jane Doe| LOS ANGELES| los angeles|
|Mike Johnson| CHICAGO| chicago|
| Sara Smith|SAN FRANCISCO|san francisco|
+------------+-------------+-------------+

56. How to convert PySpark data frame to pandas dataframe?

Difficulty Level: L1

Input

python
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

pysparkDF = spark.createDataFrame(data, ['Name', 'City'])

pysparkDF.show()
python
+------------+-------------+
| Name| City|
+------------+-------------+
| John Doe| NEW YORK|
| Jane Doe| LOS ANGELES|
|Mike Johnson| CHICAGO|
| Sara Smith|SAN FRANCISCO|
+------------+-------------+
Show Solution

python
# convert PySpark data frame to pandas
pandasDF = pysparkDF.toPandas()

print(pandasDF)
python

Name City
0 John Doe NEW YORK
1 Jane Doe LOS ANGELES
2 Mike Johnson CHICAGO
3 Sara Smith SAN FRANCISCO

57. How to View PySpark Cluster Details?

Difficulty Level: L1

Show Solution

python
print(spark.sparkContext.uiWebUrl)
python

http://DESKTOP-UL3QT3E.mshome.net:4040

58. How to View PySpark Cluster Configuration Details?

Difficulty Level: L1

Show Solution

python
# Print all configurations
for k,v in spark.sparkContext.getConf().getAll():
print(f"{k} : {v}")
python

spark.app.name : PySpark 101 Exercises
spark.driver.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.app.startTime : 1684510291553
spark.app.id : local-1684510293468
spark.driver.host : DESKTOP-UL3QT3E.mshome.net
spark.executor.id : driver
spark.sql.warehouse.dir : file:/C:/Users/RajeshVaddi/Documents/MLPlus/6_PySpark%20101%20Exercises/spark-warehouse
spark.driver.port : 50321
spark.app.submitTime : 1684510291319
spark.rdd.compress : True
spark.executor.extraJavaOptions : -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
spark.serializer.objectStreamReset : 100
spark.master : local[*]
spark.submit.pyFiles :
spark.submit.deployMode : client
spark.ui.showConsoleProgress : true

59. How to restrict the PySpark to use the number of cores in the system?

Difficulty Level: L1

Show Solution

python
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set("spark.executor.cores", "2") # set the number of cores you want here
sc = SparkContext(conf=conf)

60. How to cache PySpark DataFrame or objects and delete cache?

Difficulty Level: L2

In PySpark, caching or persisting data is done to speed up data retrieval during iterative and interactive computations.

Show Solution

python
# Caching the DataFrame
df.cache()

# un-cache or unpersist data using the unpersist() method.
df.unpersist()
python

DataFrame[Name: string, City: string, City_lower: string]

61. How to Divide a PySpark DataFrame randomly in a given ratio (0.8, 0.2)?

Difficulty Level: L1

Show Solution

python
# Randomly split data (0.8, 0.2)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

62. How to build logistic regression in PySpark?

Difficulty Level: L2

Input

python
# Create a sample dataframe
data = spark.createDataFrame([
(0, 1.0, -1.0),
(1, 2.0, 1.0),
(1, 3.0, -2.0),
(0, 4.0, 1.0),
(1, 5.0, -3.0),
(0, 6.0, 2.0),
(1, 7.0, -1.0),
(0, 8.0, 3.0),
(1, 9.0, -2.0),
(0, 10.0, 2.0),
(1, 11.0, -3.0),
(0, 12.0, 1.0),
(1, 13.0, -1.0),
(0, 14.0, 2.0),
(1, 15.0, -2.0),
(0, 16.0, 3.0),
(1, 17.0, -3.0),
(0, 18.0, 1.0),
(1, 19.0, -1.0),
(0, 20.0, 2.0)
], ["label", "feat1", "feat2"])

Show Solution

python
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# convert the feature columns into a single vector column using VectorAssembler
vecAssembler = VectorAssembler(inputCols=['feat1', 'feat2'], outputCol="features")
data = vecAssembler.transform(data)

# fit the logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(data)

# look at the coefficients and intercept of the logistic regression model
print(f"Coefficients: {str(lr_model.coefficients)}")
print(f"Intercept: {str(lr_model.intercept)}")
python

Coefficients: [0.020277740475786673,-1.612960940022365]
Intercept: -0.2209292751829534

63. How to convert the categorical string data into numerical data or index?

Difficulty Level: L2

Input

python
# Create a sample DataFrame
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])

df.show()
python
+------+
|animal|
+------+
| cat|
| dog|
| mouse|
| fish|
| dog|
| cat|
| mouse|
+------+
Show Solution

python
from pyspark.ml.feature import StringIndexer

# Initialize a StringIndexer
indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')

# Fit the indexer to the DataFrame and transform the data
indexed = indexer.fit(df).transform(df)
indexed.show()
python

+------+-----------+
|animal|animalIndex|
+------+-----------+
| cat| 0.0|
| dog| 1.0|
| mouse| 2.0|
| fish| 3.0|
| dog| 1.0|
| cat| 0.0|
| mouse| 2.0|
+------+-----------+

64. How to calculate Correlation of two variables in a DataFrame?

Difficulty Level: L1

Input

python
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
python
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution

python
# Calculate correlation
correlation = df.corr("feature1", "feature2")

print("Correlation between feature1 and feature2 :", correlation)
python

Correlation between feature1 and feature2 : 0.9

65. How to calculate Correlation Matrix?

Difficulty Level: L2

Input

python
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
python
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution

python
# Calculate Correlation Using Using MLlib
from pyspark.ml.stat import Correlation

# Assemble feature vector
# Define the feature and label columns & Assemble the feature vector
vector_assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data_vector = vector_assembler.transform(df).select("features")

# Calculate correlation
correlation_matrix = Correlation.corr(data_vector, "features").head()[0]

print(correlation_matrix)
python

DenseMatrix([[1. , 0.9 , 0.91779992],
[0.9 , 1. , 0.67837385],
[0.91779992, 0.67837385, 1. ]])

66. How to calculate VIF (Variance Inflation Factor ) for set of variables in a DataFrame?

Difficulty Level: L3

Input

python
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()
python
+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
| 5| 10| 25|
| 6| 15| 35|
| 7| 25| 30|
| 8| 20| 60|
| 9| 30| 70|
+--------+--------+--------+
Show Solution

python
from pyspark.sql import SparkSession, Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

def calculate_vif(data, features):
vif_dict = {}

for feature in features:
non_feature_cols = [col for col in features if col != feature]
assembler = VectorAssembler(inputCols=non_feature_cols, outputCol="features")
lr = LinearRegression(featuresCol='features', labelCol=feature)

model = lr.fit(assembler.transform(data))
vif = 1 / (1 - model.summary.r2)

vif_dict[feature] = vif

return vif_dict

features = ['feature1', 'feature2', 'feature3']
vif_values = calculate_vif(df, features)

for feature, vif in vif_values.items():
print(f'VIF for {feature}: {vif}')
python

VIF for feature1: 66.2109375000003
VIF for feature2: 19.33593749999992
VIF for feature3: 23.30468749999992

67. How to perform Chi-Square test?

Difficulty Level: L2

Input

python
# Create a sample dataframe
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]

df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])

df.show()
python
+---+--------+--------+--------+-----+
| id|feature1|feature2|feature3|label|
+---+--------+--------+--------+-----+
| 1| 0| 0| 1| 1|
| 2| 0| 1| 0| 0|
| 3| 1| 0| 0| 0|
| 4| 0| 0| 1| 1|
| 5| 0| 1| 1| 0|
+---+--------+--------+--------+-----+
Show Solution

python
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)

from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))
python

pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]

68. How to calculate the Standard Deviation?

Difficulty Level: L1

Input

python
# Sample data
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]

# Create DataFrame
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])

df.show()
python
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+--------+----------+------+
Show Solution

python
from pyspark.sql.functions import stddev

salary_stddev = df.select(stddev("Salary").alias("stddev"))

salary_stddev.show()
python

+-----------------+
| stddev|
+-----------------+
|765.9416862050705|
+-----------------+

69. How to calculate missing value percentage in each column?

Difficulty Level: L3

Input

python
# Create a sample dataframe
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]

df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])

df.show()
python
+---------+--------+--------+
|FirstName|LastName| City|
+---------+--------+--------+
| John| Doe| null|
| null| Smith|New York|
| Mike| Smith| null|
| Anna| Smith| Boston|
| null| null| null|
+---------+--------+--------+
Show Solution

python
# Calculate the total number of rows in the dataframe
total_rows = df.count()

# For each column calculate the number of null values and then calculate the percentage
for column in df.columns:
null_values = df.filter(df[column].isNull()).count()
missing_percentage = (null_values / total_rows) * 100
print(f"Missing values in {column}: {missing_percentage}%")
python

Missing values in FirstName: 40.0%
Missing values in LastName: 20.0%
Missing values in City: 60.0%

70. How to get the names of DataFrame objects that have been created in an environment?

Difficulty Level: L2

Show Solution

python
dataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]

for name in dataframe_names:
print(name)

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Jagdeesh
Written by
Related Course
Master PySpark — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Get the full course,
completely free.
Join 57,000+ students learning Python, SQL & ML. One year of access, all resources included.
📚 10 Courses
🐍 Python & ML
🗄️ SQL
📦 Downloads
📅 1 Year Access
No thanks
🎓
Free AI/ML Starter Kit
Python · SQL · ML · 10 Courses · 57,000+ students
🎉   You're in! Check your inbox (or Promotions/Spam) for the access link.
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science