Apache Spark Procedures in BigQuery

BigQuery provides an opportunity to run Spark procedures from PySpark and SQL Query Editor with the boundaries of the BigQuery Studio. You can now offload computationally intensive task to Dataproc, as well as running jobs on a custom image stepping beyond the operations natively supported by BigQuery. Potential operational cost savings and handling of complex data structures and algorithms are among a few benefits you could enjoy.

About Spark Procedures

Spark procedures could be viewed as user-defined functions written for Apache Spark that execute within a Dataproc cluster. More details on spark procedures can be found in GCP documentation. It extends BigQuery capabilities enabling:

  • Potential cost savings by offloading heavy processing to Dataproc.
  • Improved performance for computationally intensive tasks.
  • Ability to handle complex data structures and algorithms not natively supported by BigQuery.

Required IAM

To work with spark procedures as an admin or a user, make sure you have required IAM roles assigned. Google provides detailed information about required IAM permissions in the official documentation. Here you can find required predefined roles to create and call stored procedures, as well as the list of required permissions to create bigquery connection, and create or call procedures for spark.

Creating a Dataproc PHS Cluster

A few words on the Dataproc Persistent History Server. PHS provides web interfaces to view history for jobs that run on active or deleted Dataproc clusters. According to GCP documentation, PHS is available in Dataproc image from version 1.5 on, and runs on a single node Dataproc cluster. One of the limitations of PHS is that its cluster image version and the Dataproc job cluster image version need to match. It means, that if you use a Dataproc 2.0 image version on PHS cluster, to view job history files of spark jobs, you need to run Dataproc 2.0 image version on job clusters as well, and locate them in the project where the history server cluster is. To preserve the spark procedures run history, you would need to reference the PHS cluster, when creating a connection from BigQuery to Apache Spark.

Creating Connection from BigQuery to Apache Spark

To create a Spark connection, you need to have the BigQuery Connection Admin IAM role on the project. Connection can be created from the console or bq command.

Here the example how to create a connection with a bq command.

bq mk --connection \
	--connection_type='SPARK' \
	--project_id='gcp-poc-playground' \
 	--location='US' \
 	mysparkconnection

Connection was successfully created. Now let's add IAM permissions to the service account in order to be able to run spark procedures. In our demo we will add the following list of permissions to the bq connection associated service account:

  • Artifact Registry Reader
  • BigQuery Admin
  • BigQuery Job User
  • Dataproc Worker
  • Secret Manager Secret Accessor
  • Storage Admin

Note, that roles added in this example are for demo purposes. In prod environment, IAM policy should be carefully reviewed and evaluated depending on the use case. Now, as we granted access to our connection service account and it can perform what we need, let's wrap up the compenets set up and dive into spark procedures.

Example 1: run directly from PySpark editor

Let's run the first spark procedure. We are going to run it derectly from the Pyspark editor. In order to do that, we need to specify the spark connection and stored procedure invocation settings, as well as provide the code of the spark procedure itself.

import os
import json
import logging
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, udf
from pyspark.sql.types import StringType

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

INPUT_DATA = "gs://automlops-poc/mock_data.csv";
TMP_TRANSFORM = "automlops-poc/tmp/";
GCP_PROJECT = "gcp-poc-playground";
UPLOAD_DATASET = "bq_spark";
UPLOAD_TABLE = "customers_raw";
WRITE_DISPOSITION = "overwrite";


# Define a UDF to hash data.
@udf(returnType=StringType())
def hash_data(data):
    if data is None:
        return data
    hashed_data = hashlib.sha256(str(data.lower()).encode("utf8")).hexdigest()
    return hashed_data


# Load data to BigQuery.
def write_to_bq(data: str,
                tmp_transform: str,
                gcp_project: str,
                upload_dataset: str,
                upload_table: str,
                write_disposition: str) -> None:

    try:
        data.write \
            .format("bigquery") \
            .option("temporaryGcsBucket", tmp_transform) \
            .option("project", gcp_project) \
            .option("createDisposition", "CREATE_IF_NEEDED") \
            .option("partitionField", "load_time") \
            .option("partitionType", "DAY") \
            .mode(write_disposition) \
            .save(f"{upload_dataset}.{upload_table}")

    except Exception as ex:
        logging.error(f"Exception uploading to BQ: {ex}")
        raise


# Process execution.
def main(input_data: str,
        tmp_transform: str,
        gcp_project: str,
        upload_dataset: str,
        upload_table: str,
        write_disposition: str) -> None:

    logging.info("-----Build Spark Session------")
    spark = SparkSession.builder \
    .appName("bq_spark_job") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

    logging.info("-----Read Data------")

    data = spark.read.option("ignoreCorruptFiles", "true").csv(input_data, header=True, inferSchema=True)
    logging.info(f"----STATS data num partitions: {data.rdd.getNumPartitions()}")
    logging.info(f"----STATS data count: {data.count()}")

    # Get file columns list
    data_columns = data.columns

    # List of columns to hash
    hash_columns = [column_name for column_name in data_columns if "email" in column_name]
    
    logging.info(f"Identified columns containing email in name: {hash_columns}")
    
    # Add hashed email columns
    if hash_columns:
        for data_hash_column in hash_columns:
            data = data.withColumn(f'hashed_{data_hash_column}', hash_data(data[data_hash_column]))

    # Add column with processing timestamp
    data = data.withColumn("load_time", current_timestamp())

    logging.info(f"Loading data to BigQuery:{gcp_project}.{upload_dataset}.{upload_table}")

    write_to_bq(
        data=data,
        tmp_transform=tmp_transform,
        gcp_project=gcp_project,
        upload_dataset=upload_dataset,
        upload_table=upload_table,
        write_disposition=write_disposition)

    spark.stop()

main(input_data=INPUT_DATA,
    tmp_transform=TMP_TRANSFORM,
    gcp_project=GCP_PROJECT,
    upload_dataset=UPLOAD_DATASET,
    upload_table=UPLOAD_TABLE,
    write_disposition=WRITE_DISPOSITION)

In this example we are going to load the data from a csv file in GCS. As defined in the code, spark shall read a csv file, check dataframe columns, hash email columns and load the data to the big query table. Further in the code execution function, you can see, that spark session is created, data in columns which name contains "email" is hashed and added as a new column, load_time column is added and populated with a processing timestamp.

Example 2: PySpark editor save and trigger with params

The spark job from the previous example need to be updated to depict the parameters from environment variables:

INPUT_DATA = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.INPUT_DATA"]))
TMP_TRANSFORM = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.TMP_TRANSFORM"]))
GCP_PROJECT = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.GCP_PROJECT"]))
UPLOAD_DATASET = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.UPLOAD_DATASET"]))
UPLOAD_TABLE = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.UPLOAD_TABLE"]))
WRITE_DISPOSITION = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.WRITE_DISPOSITION"]))

Let's consider how we can create a procedure that takes values provided as parameters. Variables defined in a previous procedure shall become parameters now. Also, we are going to define settings to run spark procedure with a custom service account, instead of spark connection one. Add expected 6 PARAMS, not properties:

INPUT_DATA: STRING, DEFAULT, NULL;
TMP_TRANSFORM: STRING, DEFAULT, NULL;
GCP_PROJECT: STRING, DEFAULT, NULL;
UPLOAD_DATASET: STRING, DEFAULT, NULL;
UPLOAD_TABLE: STRING, DEFAULT, NULL;
WRITE_DISPOSITION: STRING, DEFAULT, NULL;

and save the procedure as spark_with_params. Here is the example invoke code you can use to call it:

DECLARE INPUT_DATA STRING DEFAULT NULL;
DECLARE TMP_TRANSFORM STRING DEFAULT NULL;
DECLARE GCP_PROJECT STRING DEFAULT NULL;
DECLARE UPLOAD_DATASET STRING DEFAULT NULL;
DECLARE UPLOAD_TABLE STRING DEFAULT NULL;
DECLARE WRITE_DISPOSITION STRING DEFAULT NULL;

SET INPUT_DATA = "gs://automlops-poc/mock_data.csv";
SET TMP_TRANSFORM = "automlops-poc/tmp/";
SET GCP_PROJECT = "gcp-poc-playground";
SET UPLOAD_DATASET = "bq_spark";
SET UPLOAD_TABLE = "customers_raw";
SET WRITE_DISPOSITION = "overwrite";

SET @@spark_proc_properties.service_account='bq-spark-encryption-sa@gcp-poc-playground.iam.gserviceaccount.com';
CALL `gcp-poc-playground.spark_infra.spark_with_params`(INPUT_DATA, TMP_TRANSFORM, GCP_PROJECT, UPLOAD_DATASET, UPLOAD_TABLE, WRITE_DISPOSITION);

Example 3: create and trigger with sql

To continue our journey, lets switch gears to sql editor. In the following example we are going to create a spark procedure using SQL query statement.

## Create SQL Spark Procedure
CREATE PROCEDURE `gcp-poc-playground.spark_infra.spark_proc`(
  IN input_data STRING,
  IN tmp_transform STRING,
  IN gcp_project STRING,
  IN upload_dataset STRING,
  IN upload_table STRING,
  IN write_disposition STRING
)
WITH CONNECTION `gcp-poc-playground.us.mysparkconnection`
OPTIONS(engine="SPARK",
description="Procedure to load .csv files with hashed email columns",
runtime_version="1.1",
main_file_uri="gs://automlops-poc/bq_spark_proc.py",
jar_uris=["gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.36.4.jar"])
LANGUAGE PYTHON

As we can see from create statement, we have defined procedure name, expected parameters, spark connection and options. Options define the runtime version, provide link to the spark job main file in GCS. Languge is set to PYTHON. Note that according to Google documentation, if the body of a stored procedure is more than 1 MB, it's recommended to put it in a file in a Cloud Storage bucket instead of using inline code. Let's have a look at the spark job main file from GCS.

Here is the code of the spark job stored in GCS bq_spark_proc.py object:

import os
import json
import logging
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, udf
from pyspark.sql.types import StringType

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)


# Define a UDF to hash data.
@udf(returnType=StringType())
def hash_data(data):
    if data is None:
        return data
    hashed_data = hashlib.sha256(str(data.lower()).encode("utf8")).hexdigest()
    return hashed_data


# Load data to BigQuery.
def write_to_bq(data: str,
                tmp_transform: str,
                gcp_project: str,
                upload_dataset: str,
                upload_table: str,
                write_disposition: str) -> None:
    """
    Load data to BigQuery.
    """
    try:
        data.write \
            .format("bigquery") \
            .option("temporaryGcsBucket", tmp_transform) \
            .option("project", gcp_project) \
            .option("createDisposition", "CREATE_IF_NEEDED") \
            .option("partitionField", "load_time") \
            .option("partitionType", "DAY") \
            .mode(write_disposition) \
            .save(f"{upload_dataset}.{upload_table}")

    except Exception as ex:
        logging.error(f"Exception uploading to BQ: {ex}")
        raise


# Process execution.
def main(input_data: str,
        tmp_transform: str,
        gcp_project: str,
        upload_dataset: str,
        upload_table: str,
        write_disposition: str) -> None:
    """
    Pyspark process execution.
    """

    logging.info("-----Build Spark Session------")
    spark = SparkSession.builder \
    .appName("bq_spark_job") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

    logging.info("-----Read Data------")

    data = spark.read.option("ignoreCorruptFiles", "true").csv(input_data, header=True, inferSchema=True)
    logging.info("----STATS source dataframe:")
    data.printSchema()
    logging.info(f"----STATS data num partitions: {data.rdd.getNumPartitions()}")
    logging.info(f"----STATS data count: {data.count()}")

    data.show(10, truncate=True)

    # Get file columns list
    data_columns = data.columns

    # List of columns to hash
    hash_columns = [column_name for column_name in data_columns if "email" in column_name]
    
    logging.info(f"Identified columns containing email in name: {hash_columns}")
    
    # Add hashed email columns
    if hash_columns:
        for data_hash_column in hash_columns:
            data = data.withColumn(f'hashed_{data_hash_column}', hash_data(data[data_hash_column]))

     # Add column with processing timestamp
    data = data.withColumn("load_time", current_timestamp())

    data.count()
    data.show(20, truncate=True)

    logging.info(f"Loading data to BigQuery:{gcp_project}.{upload_dataset}.{upload_table}")

    write_to_bq(
        data=data,
        tmp_transform=tmp_transform,
        gcp_project=gcp_project,
        upload_dataset=upload_dataset,
        upload_table=upload_table,
        write_disposition=write_disposition)

    spark.stop()


if __name__ == "__main__":
    # Get the input parameter num in JSON string and convert to a Python variable
    input_data = json.loads(os.environ["BIGQUERY_PROC_PARAM.input_data"])
    tmp_transform = json.loads(os.environ["BIGQUERY_PROC_PARAM.tmp_transform"])
    gcp_project = json.loads(os.environ["BIGQUERY_PROC_PARAM.gcp_project"])
    upload_dataset = json.loads(os.environ["BIGQUERY_PROC_PARAM.upload_dataset"])
    upload_table = json.loads(os.environ["BIGQUERY_PROC_PARAM.upload_table"])
    write_disposition = json.loads(os.environ["BIGQUERY_PROC_PARAM.write_disposition"])


    main(input_data=input_data,
        tmp_transform=tmp_transform,
        gcp_project=gcp_project,
        upload_dataset=upload_dataset,
        upload_table=upload_table,
        write_disposition=write_disposition)

Spark job is expected to perform a familiar operation: reade a csv file, process data adding hashed "email" and load_time columns, load the result to a specified bigquery table. Spark job expects 6 parameters to run, defined as environment variables. Everything looks good, so let's get back to our procedure code in sql editor. Procedure got created, so, let's invoke it. In order to do that, we need to declare required parameters, and call the procedure, supplying declared params as arguments.

## Invoke SQL Spark procedure

DECLARE input_data STRING DEFAULT NULL;
DECLARE tmp_transform STRING DEFAULT NULL;
DECLARE gcp_project STRING DEFAULT NULL;
DECLARE upload_dataset STRING DEFAULT NULL;
DECLARE upload_table STRING DEFAULT NULL;
DECLARE write_disposition STRING DEFAULT NULL;

SET input_data = "gs://automlops-poc/mock_data.csv";
SET tmp_transform = "automlops-poc/tmp/";
SET gcp_project = "gcp-poc-playground";
SET upload_dataset = "bq_spark";
SET upload_table = "customers_raw_gcs";
SET write_disposition = "overwrite";

CALL `gcp-poc-playground.spark_infra.spark_proc`(input_data, tmp_transform, gcp_project, upload_dataset, upload_table, write_disposition)

Example 4: create with custom image, trigger with sql

Now, let's try to run procedure as an inline code using a custom image. In this example, we are going to perform encryption, which requires additional libraries, not intalled on a default dataproc image. Therefore, we need to build our own custom image with pre-installed libraries, and push it to the Artifact Registry. Google documentation on custom containers provides an example of a custom image. To build an image for our spark procedure we are going to add required libraries to the code, build an image on Docker and push it to the Artifact registry.

Here is the code we are going to use to build a custom image:

FROM ubuntu:20.04
# Set environment variables
ENV CONTAINER_USER_NAME="bq-spark-encryption-sa@gcp-poc-playground.iam.gserviceaccount.com"

# Install utilities required by Spark scripts.
RUN apt-get update && apt-get install -y wget procps tini coreutils

# Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY *.jar "${SPARK_EXTRA_JARS_DIR}"

# Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PYSPARK_DRIVER_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}

RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_24.11.1-0-Linux-x86_64.sh -O miniconda.sh

RUN ls
RUN bash miniconda.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict

# Install Conda packages.
#
# The following packages are installed in the default image. It is strongly
# recommended to include all of them.
#
# Use mamba to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-bigtable \
      google-cloud-container \
      google-cloud-datacatalog \
      google-cloud-dataproc \
      google-cloud-datastore \
      google-cloud-kms \
      google-cloud-language \
      google-cloud-logging \
      google-cloud-monitoring \
      google-cloud-pubsub \
      google-cloud-redis \
      google-cloud-secret-manager \
      google-cloud-spanner \
      google-cloud-speech \
      google-cloud-storage \
      google-cloud-texttospeech \
      google-cloud-translate \
      google-cloud-vision \
      koalas \
      matplotlib \
      nltk \
      numba \
      numpy \
      openblas \
      orc \
      pycryptodome \
      pandas \
      pandas-gbq \
      pyarrow \
      pysal \
      pytables \
      python \
      regex \
      requests \
      rtree \
      scikit-image \
      scikit-learn \
      scipy \
      seaborn \
      sqlalchemy \
      sympy \
      virtualenv

# Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

To build an emage cd on the folder where the Dockerfile is stored and run:

docker build --platform=linux/amd64 -t  encrypt-py310-spark-image:latest .

docker tag encrypt-py310-spark-image-sa:latest us-docker.pkg.dev/gcp-poc-playground/spark-custom-images/py310-ubuntu-pycryptodom

docker push us-docker.pkg.dev/gcp-poc-playground/spark-custom-images/py310-ubuntu-pycryptodom

Our custom image is now stored in Artifact registry. Let's move on to create a procedure now. Let's get back to Big Query sql editor.

CREATE OR REPLACE PROCEDURE `gcp-poc-playground.spark_infra.spark_encrypt`()
WITH CONNECTION `gcp-poc-playground.us.mysparkconnection`
OPTIONS(engine="SPARK",
description="Procedure to load aes encrypted data with hashed email column",
runtime_version="1.1",
jar_uris=["gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.36.4.jar"],
container_image="us-docker.pkg.dev/gcp-poc-playground/spark-custom-images/py310-ubuntu-pycryptodom:latest")
LANGUAGE PYTHON AS R"""
import logging
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, udf
from pyspark.sql.types import StringType

from google.cloud import secretmanager
from google.api_core.exceptions import NotFound, InvalidArgument
from Crypto.Cipher import AES
import hashlib
import base64

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

# Define function to extract symmetric key from secret manager
def get_secret(project_id, secret_id, version_id='latest'):
    # Create the Secret Manager client.
    client = secretmanager.SecretManagerServiceClient()

    # Build the resource name of the secret version.
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

    try:
        # Access the secret version.
        response = client.access_secret_version(request={"name": name})

        # Return the secret payload.
        return response.payload.data

    except NotFound:
        return f"Error: The requested secret {secret_id} was not found."
    except InvalidArgument:
        return "Error: The request had invalid parameters."
    except Exception as e:
        return f"Error: {str(e)}"

# Define a UDF to encrypt data
@udf(returnType=StringType())
def encrypt_data(data):
    if data is None:
        return data

    cipher = AES.new(symmetric_key, AES.MODE_EAX)
    nonce = cipher.nonce
    ciphertext, tag = cipher.encrypt_and_digest(data.encode('utf-8'))
    encrypted_data = base64.urlsafe_b64encode(nonce + tag + ciphertext).decode('utf-8')
    return encrypted_data

# Define a UDF to hash data.
@udf(returnType=StringType())
def hash_data(data):
    if data is None:
        return data
    hashed_data = hashlib.sha256(str(data.lower()).encode("utf8")).hexdigest()
    return hashed_data

columns_to_hash=['email']
columns_to_encrypt=['first_name', 'last_name']
sql="SELECT first_name, last_name, email FROM `gcp-poc-playground.bq_spark.customers_raw` LIMIT 10"
symmetric_key_project = 'gcp-poc-playground'
symmetric_key_name = 'aes-symmetric-key'

# pull symmetric_key from secret manager
symmetric_key = get_secret(symmetric_key_project, symmetric_key_name)

spark = SparkSession.builder \
    .appName("bq_spark_job") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.memory.fraction", "0.8") \
    .config("viewsEnabled", "true") \
    .config("materializationDataset", "bq_spark") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Load the specified columns from BigQuery
data = spark.read.format("bigquery").load(sql)

# Hash columns
if columns_to_hash:
    for data_hash_column in columns_to_hash:
        data = data.withColumn(data_hash_column, hash_data(data[data_hash_column]))

if columns_to_encrypt:
    for data_column in columns_to_encrypt:
        data = data.withColumn(data_column, encrypt_data(data[data_column]))

# Add column with processing timestamp
data = data.withColumn("load_time", current_timestamp())

data.write \
    .format("bigquery") \
    .option("writeMethod", "direct") \
    .option("project", 'gcp-poc-playground') \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("partitionField", "load_time") \
    .option("partitionType", "DAY") \
    .option("schema", "first_name:STRING, last_name:STRING, email:STRING, load_time:TIMESTAMP") \
    .mode('OVERWRITE') \
    .save("bq_spark.spark_encrypted")
"""

Let's have a look at procedure CREATE statment. Custom container is defined in the OPTIONS. To Pull the image from the artifact registry, make sure service account has access to it. Further down, we have the code for the spark execution. We have defined function pulling encryption key from secret manager, two udfs to perform hashing and encryption. We provide a list of columns to hash and encrypt: email column should be hashed, while last_name and first_name columns shall be encrypted. Source data is obtained as a slq query, spark session is defined with a few properties. When hashing and encryption are performed, the result is written to another bigquery table. For simplicity, we won't define paramters here, however, it can be done as demontrated in previous examples. To create the procedure run the code in sql editor.

As procedure is now created, let's invoke it using the statement:

CALL `gcp-poc-playground.spark_infra.spark_encrypt`();

Spark successfully processed data and loaded the result. First_name and last_name columns were encrypted, emails hashed, loat_time column was added. This particular use case could be interesting, if you are working with PII data, when dealing with big data volume ofloading encryption to Dataproc, can save a lot of time.

Summary

Offloading heavy processing from BigQuery to Dataproc can offer a considerable benefit of cost saving as well as improved performance for computationally intensive tasks. In other words, for example, if you work with PII and need to encrypt a big data volumne, it is both computationally intensive, and not natively supported by BigQuery. This is where spark procedure could be a potential solution. If you are running out of BigQuery slots handling complex ETL transformations with big data volume, you could try spark procedure instead, potentially lowering the cost on BigQuery. Spark procedures can be conveniently created using Pyspark or SQL editor in BigQuery Studio, which provides various opportunities and convenience for planty of use cases. Passing INPUT/OUTPUT parameters allows code reusability, simplified interaction, code optimization and flexibility. Use of custom containers and service accounts, opens up the possibility to pre-intall required libraries and control the use of the created procedures.