Reading CSV with bad records on Spark

This video dives deep into the critical aspect of handling malformed data during Spark data processing. We'll explore three distinct approaches using a test CSV file :
Permissive Mode: to read data while allowing malformed records to pass through. Ideal for exploratory analysis or when data quality isn't a primary concern.
Drop Malformed Mode: to gracefully handle malformed records by simply discarding them. Suitable for scenarios where data accuracy is crucial, and you can afford to lose a small portion of the data.
Fail Fast Mode: to enforce strict data quality checks, causing the job to fail immediately upon encountering a malformed record. This mode is essential for mission-critical applications where data integrity is paramount.

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

First, create a SparkSession. This session allows us to interact with Spark and process data. More details on session parameters For most scenarios, it's recommended to disable spark.sql.csv.parser.columnPruning.enabled when dealing with potentially bad CSV data. This ensures that all columns are loaded for every row, allowing you to identify and handle bad records effectively.

# Create a SparkSession
spark = SparkSession.builder \
    .appName("read-csv-with-bad-lines") \
    .config("spark.sql.csv.parser.columnPruning.enabled", False) \
    .getOrCreate()

data_path = "~/data/users_data.csv"

Permissive Mode

Define the expected structure of and path to our data. We create a StructType with fields for each column. This schema is important for parsing the CSV correctly. We also add an extra field called "_CORRUPT_RECORD" as a string. This will be used later to store information about bad lines.

data_schema_permissive = StructType([
    StructField("CNSMR_ID", StringType(), True),
    StructField("FIRST_NM", StringType(), True),
    StructField("LST_NM", StringType(), True),
    StructField("DOB_MM_YYYY", StringType(), True),
    StructField("ADDR_LN", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("EMAIL", StringType(), True),
    StructField("NEW_USER", StringType(), True),
    StructField("_CORRUPT_RECORD", StringType(), True)
])

Read the CSV using SparkSession's read.csv method. We'll set some options: header=True: Indicates the first line contains column names. sep=",": Comma is the delimiter separating values. mode="PERMISSIVE": This is the key! This mode allows PySpark to process the file even if there are formatting errors. It might fill missing values with nulls or skip bad lines entirely (depending on the error). columnNameOfCorruptRecord="_CORRUPT_RECORD": tells PySpark to store any information about bad lines in the "_CORRUPT_RECORD" column we defined earlier.

# Read csv data
df = spark.read.schema(data_schema_permissive).option("ignoreCorruptFiles", "true") \
      .csv(data_path, header=True, sep=",", mode = "PERMISSIVE", \
      columnNameOfCorruptRecord = "_CORRUPT_RECORD")

df.show(10)

"""
Output:
+-------------+---------+---------+-----------+--------------------+--------------------+--------------------+--------+--------------------+
|     CNSMR_ID| FIRST_NM|   LST_NM|DOB_MM_YYYY|             ADDR_LN|                CITY|               EMAIL|NEW_USER|     _CORRUPT_RECORD|
+-------------+---------+---------+-----------+--------------------+--------------------+--------------------+--------+--------------------+
|  13445546767|   Silvia|  Johnson|    12/1997| 1098 Av. Saint Luis|  La Sagrada Familia|silvia_johnson@gm...|       Y|                NULL|
|   7467536567|   Franco|    Smith|    02/1984|1123 Av. J. Ross,...|         Phenix City|franco_smith@gmai...|       Y|                NULL|
| 346575786809|     Lisa|  Hammond|    08/1986|9823, Lombard Str...|            Estepona|lisa_hammond@gmai...|       Y|                NULL|
|  36456778345|   Daniel|     Ford|    02/1974|Khao San Road, 83...|   dford@hotmail.com|                   N|    NULL|36456778345,Danie...|
|3123453567764|Sylvester|   Alonzo|    02/1987|21189,Royal Golf,...|          Sotogrande|    alonzo@gmail.com|       Y|                NULL|
|  12345235656|    Felix|Rezerford|    05/1976|16233, Saint Angeles|              Boston|     felix@gmail.com|       Y|                NULL|
|  12344243524|    Fabio|   Ribera|    11/1985|809234, Opera Hou...| fabio_ribera@yah...|                   Y|    NULL|12344243524,Fabio...|
|   2342234435|  Michael| Anderson|    02/1979|12464, Andalusia St.|            Marbella|m.anderson@gmail.com|       N|                NULL|
|   1234535656|     Luis|   Dawson|    02/1964|34, Florence St.,...|   ldawson@gmail.com|                   Y|    NULL|1234535656,Luis,D...|
|3123453567679|   Miguel|  Ramirez|    03/1954|10 Av. Saint Roya...| miguel_br@yahoo.com|                   N|    NULL|3123453567679,Mig...|
+-------------+---------+---------+-----------+--------------------+--------------------+--------------------+--------+--------------------+
"""

Next, we filter the data based on the "_CORRUPT_RECORD" flag. We can use "_CORRUPT_RECORD" column to filter only rows where the flag is null, meaning rows are likely good records. Similarly, we can filter rows with potential errors, using isNotNull() function instead.

df.filter(col("_CORRUPT_RECORD").isNull()).show()

"""
Output:
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+---------------+
|     CNSMR_ID| FIRST_NM|   LST_NM|DOB_MM_YYYY|             ADDR_LN|              CITY|               EMAIL|NEW_USER|_CORRUPT_RECORD|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+---------------+
|  13445546767|   Silvia|  Johnson|    12/1997| 1098 Av. Saint Luis|La Sagrada Familia|silvia_johnson@gm...|       Y|           NULL|
|   7467536567|   Franco|    Smith|    02/1984|1123 Av. J. Ross,...|       Phenix City|franco_smith@gmai...|       Y|           NULL|
| 346575786809|     Lisa|  Hammond|    08/1986|9823, Lombard Str...|          Estepona|lisa_hammond@gmai...|       Y|           NULL|
|3123453567764|Sylvester|   Alonzo|    02/1987|21189,Royal Golf,...|        Sotogrande|    alonzo@gmail.com|       Y|           NULL|
|  12345235656|    Felix|Rezerford|    05/1976|16233, Saint Angeles|            Boston|     felix@gmail.com|       Y|           NULL|
|   2342234435|  Michael| Anderson|    02/1979|12464, Andalusia St.|          Marbella|m.anderson@gmail.com|       N|           NULL|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+---------------+
"""

df.filter(col("_CORRUPT_RECORD").isNotNull()).show()

"""
Output:
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
|     CNSMR_ID|FIRST_NM| LST_NM|DOB_MM_YYYY|             ADDR_LN|                CITY|EMAIL|NEW_USER|     _CORRUPT_RECORD|
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
|  36456778345|  Daniel|   Ford|    02/1974|Khao San Road, 83...|   dford@hotmail.com|    N|    NULL|36456778345,Danie...|
|  12344243524|   Fabio| Ribera|    11/1985|809234, Opera Hou...| fabio_ribera@yah...|    Y|    NULL|12344243524,Fabio...|
|   1234535656|    Luis| Dawson|    02/1964|34, Florence St.,...|   ldawson@gmail.com|    Y|    NULL|1234535656,Luis,D...|
|3123453567679|  Miguel|Ramirez|    03/1954|10 Av. Saint Roya...| miguel_br@yahoo.com|    N|    NULL|3123453567679,Mig...|
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
"""

Operations with data in Permissive Mode

We might only be interested in specific columns. Note that when we run select(), as a result we get broken lines included in the output.

df.select(["CNSMR_ID","FIRST_NM","LST_NM","NEW_USER"]).show(truncate=True)
df.count()

"""
Output:
+-------------+---------+---------+--------+
|     CNSMR_ID| FIRST_NM|   LST_NM|NEW_USER|
+-------------+---------+---------+--------+
|  13445546767|   Silvia|  Johnson|       Y|
|   7467536567|   Franco|    Smith|       Y|
| 346575786809|     Lisa|  Hammond|       Y|
|  36456778345|   Daniel|     Ford|    NULL|
|3123453567764|Sylvester|   Alonzo|       Y|
|  12345235656|    Felix|Rezerford|       Y|
|  12344243524|    Fabio|   Ribera|    NULL|
|   2342234435|  Michael| Anderson|       N|
|   1234535656|     Luis|   Dawson|    NULL|
|3123453567679|   Miguel|  Ramirez|    NULL|
+-------------+---------+---------+--------+

10
"""

In order to separate good records we can define a new DataFrame containing only valid records, using "_CORRUPT_RECORD" flag and select() function. When creating a new DataFrame, we can as well drop "_CORRUPT_RECORD" column, as we no longer need it.

# Define dataframe with valid records
good_df = df.select("*").where(col("_CORRUPT_RECORD").isNull()).drop("_CORRUPT_RECORD")
good_df.show()

"""
Output:
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
|     CNSMR_ID| FIRST_NM|   LST_NM|DOB_MM_YYYY|             ADDR_LN|              CITY|               EMAIL|NEW_USER|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
|  13445546767|   Silvia|  Johnson|    12/1997| 1098 Av. Saint Luis|La Sagrada Familia|silvia_johnson@gm...|       Y|
|   7467536567|   Franco|    Smith|    02/1984|1123 Av. J. Ross,...|       Phenix City|franco_smith@gmai...|       Y|
| 346575786809|     Lisa|  Hammond|    08/1986|9823, Lombard Str...|          Estepona|lisa_hammond@gmai...|       Y|
|3123453567764|Sylvester|   Alonzo|    02/1987|21189,Royal Golf,...|        Sotogrande|    alonzo@gmail.com|       Y|
|  12345235656|    Felix|Rezerford|    05/1976|16233, Saint Angeles|            Boston|     felix@gmail.com|       Y|
|   2342234435|  Michael| Anderson|    02/1979|12464, Andalusia St.|          Marbella|m.anderson@gmail.com|       N|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
"""

good_df.select(["CNSMR_ID","FIRST_NM","LST_NM","NEW_USER"]).show(10, truncate=True)

"""
Output:
+-------------+---------+---------+--------+
|     CNSMR_ID| FIRST_NM|   LST_NM|NEW_USER|
+-------------+---------+---------+--------+
|  13445546767|   Silvia|  Johnson|       Y|
|   7467536567|   Franco|    Smith|       Y|
| 346575786809|     Lisa|  Hammond|       Y|
|3123453567764|Sylvester|   Alonzo|       Y|
|  12345235656|    Felix|Rezerford|       Y|
|   2342234435|  Michael| Anderson|       N|
+-------------+---------+---------+--------+
"""

Now, let's say we want to review the bad records separately. We can define another DataFrame containing only rows where the "_CORRUPT_RECORD" flag is not null.

# Define dataframe with invalid records
bad_df=df.select("*").where(col("_CORRUPT_RECORD").isNotNull())
bad_df.show()

"""
Output:
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
|     CNSMR_ID|FIRST_NM| LST_NM|DOB_MM_YYYY|             ADDR_LN|                CITY|EMAIL|NEW_USER|     _CORRUPT_RECORD|
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
|  36456778345|  Daniel|   Ford|    02/1974|Khao San Road, 83...|   dford@hotmail.com|    N|    NULL|36456778345,Danie...|
|  12344243524|   Fabio| Ribera|    11/1985|809234, Opera Hou...| fabio_ribera@yah...|    Y|    NULL|12344243524,Fabio...|
|   1234535656|    Luis| Dawson|    02/1964|34, Florence St.,...|   ldawson@gmail.com|    Y|    NULL|1234535656,Luis,D...|
|3123453567679|  Miguel|Ramirez|    03/1954|10 Av. Saint Roya...| miguel_br@yahoo.com|    N|    NULL|3123453567679,Mig...|
+-------------+--------+-------+-----------+--------------------+--------------------+-----+--------+--------------------+
"""

We can then export this bad_df as a new CSV file using write.csv for manual inspection.

# Export invalid records for review
bad_df.write.csv("~/data/review/", mode="overwrite", sep="|", header=True)

Drop Malformed Mode

While permissive mode allows processing with errors, sometimes we might want stricter handling. We can define a new schema without the "_CORRUPT_RECORD" column and read the CSV again using mode="DROPMALFORMED". This mode will completely skip any rows with formatting issues.

data_schema = StructType([
    StructField("CNSMR_ID", StringType(), True),
    StructField("FIRST_NM", StringType(), True),
    StructField("LST_NM", StringType(), True),
    StructField("DOB_MM_YYYY", StringType(), True),
    StructField("ADDR_LN", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("EMAIL", StringType(), True),
    StructField("NEW_USER", StringType(), True)
])
# Read csv data
df_users = spark.read.schema(data_schema).option("ignoreCorruptFiles", "true") \
      .csv(data_path, header=True, sep=",", mode="DROPMALFORMED")

df_users.show(10)

"""
Output:
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
|     CNSMR_ID| FIRST_NM|   LST_NM|DOB_MM_YYYY|             ADDR_LN|              CITY|               EMAIL|NEW_USER|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
|  13445546767|   Silvia|  Johnson|    12/1997| 1098 Av. Saint Luis|La Sagrada Familia|silvia_johnson@gm...|       Y|
|   7467536567|   Franco|    Smith|    02/1984|1123 Av. J. Ross,...|       Phenix City|franco_smith@gmai...|       Y|
| 346575786809|     Lisa|  Hammond|    08/1986|9823, Lombard Str...|          Estepona|lisa_hammond@gmai...|       Y|
|3123453567764|Sylvester|   Alonzo|    02/1987|21189,Royal Golf,...|        Sotogrande|    alonzo@gmail.com|       Y|
|  12345235656|    Felix|Rezerford|    05/1976|16233, Saint Angeles|            Boston|     felix@gmail.com|       Y|
|   2342234435|  Michael| Anderson|    02/1979|12464, Andalusia St.|          Marbella|m.anderson@gmail.com|       N|
+-------------+---------+---------+-----------+--------------------+------------------+--------------------+--------+
"""

Finally, we can see the clean data. We can also check the the total number of rows using count(). Lastly, we can display specific columns to make sure we no longer see the invalid records in the output.

df_users.count()

"""
Output:
6
"""

df_users.select(["CNSMR_ID","FIRST_NM","LST_NM","NEW_USER"]).show(10, truncate=True)

"""
Output:
+-------------+---------+---------+--------+
|     CNSMR_ID| FIRST_NM|   LST_NM|NEW_USER|
+-------------+---------+---------+--------+
|  13445546767|   Silvia|  Johnson|       Y|
|   7467536567|   Franco|    Smith|       Y|
| 346575786809|     Lisa|  Hammond|       Y|
|3123453567764|Sylvester|   Alonzo|       Y|
|  12345235656|    Felix|Rezerford|       Y|
|   2342234435|  Michael| Anderson|       N|
+-------------+---------+---------+--------+
"""

Fail Fast Mode

This mode throws an exception as soon as it encounters a single bad line in the CSV file. This can be helpful when data integrity is crucial, and you want to stop processing immediately upon encountering an error.

data_schema = StructType([
        StructField("CNSMR_ID", StringType(), True),
        StructField("FIRST_NM", StringType(), True),
        StructField("LST_NM", StringType(), True),
        StructField("DOB_MM_YYYY", StringType(), True),
        StructField("ADDR_LN", StringType(), True),
        StructField("CITY", StringType(), True),
        StructField("EMAIL", StringType(), True),
        StructField("NEW_USER", StringType(), True)
    ])

df_customers = spark.read.schema(data_schema).option("ignoreCorruptFiles", "true") \
    .csv(data_path, header=True, sep=",", mode="FAILFAST")

df_customers.show()

"""
Output:
25/01/16 15:10:46 ERROR Executor: Exception in task 0.0 in stage 16.0 (TID 14)
org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [36456778345,Daniel,Ford,02/1974,Khao San Road, 83920, AZ9086B,dford@hotmail.com,N,null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
...
"""

By using PySpark's functionalities and understanding different reading modes, we can effectively handle CSV files with bad lines. We can choose between permissive mode for processing with potential errors or more strict dropmalformed mode to skip invalid records entirely, while failfast throws an exception upon encountering the first bad line, immediately stopping processing for quality control. The choice of mode depends on your specific data quality requirements and the desired level of error tolerance. By understanding these options, you can tailor your PySpark scripts to handle CSV files efficiently and reliably.