01 transform presidio
Databricks notebook source
MAGIC %md
MAGIC # Anonymize PII Entities with Presidio
MAGIC
MAGIC
Using Presidio, anonymize PII content in text or csv files.
MAGIC
MAGIC
The following code sample:
MAGIC
-
MAGIC
- Imports the content of a single csv file, or a collection of text files, from a mounted folder MAGIC
- Anonymizes the content of the text files, or a single column in the csv dataset, using Presidio MAGIC
- Writes the anonymized content back to the mounted folder, as csv set, under the output folder. MAGIC The output set from text files anonymization includes a column with the original file path MAGIC
Input Parameters (widgets): MAGIC
-
MAGIC
- Input File Format (file_format) - Input file format, can be either csv or text. MAGIC
- Input path (storage_input_path) - Folder name in case of text file, a path to a single file in case of csv. MAGIC
- Output Folder Name (storage_output_folder) - Output folder name MAGIC
- Column to Anonymize (anonymized_column) - Name of column to anonymize in case of csv. NA for text. MAGIC
COMMAND ----------
In [ ]:
Copied!
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name, regexp_replace
from pyspark.sql.functions import col, pandas_udf
import pandas as pd
import os
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from pyspark.sql.types import StringType
from pyspark.sql.functions import input_file_name, regexp_replace
from pyspark.sql.functions import col, pandas_udf
import pandas as pd
import os
In [ ]:
Copied!
dbutils.widgets.dropdown(
"file_format", "text", ["text", "csv"], "Input File Format (csv/text)"
)
dbutils.widgets.text("storage_input_path", "input", "Input path (file or folder)")
dbutils.widgets.text("storage_output_folder", "output", "Output Folder Name")
dbutils.widgets.text("anonymized_column", "value", "Column to Anonymize")
dbutils.widgets.dropdown(
"file_format", "text", ["text", "csv"], "Input File Format (csv/text)"
)
dbutils.widgets.text("storage_input_path", "input", "Input path (file or folder)")
dbutils.widgets.text("storage_output_folder", "output", "Output Folder Name")
dbutils.widgets.text("anonymized_column", "value", "Column to Anonymize")
COMMAND ----------
MAGIC %md MAGIC # Import the text files from mounted folder
COMMAND ----------
In [ ]:
Copied!
storage_mount_name = os.environ["STORAGE_MOUNT_NAME"]
storage_input_path = dbutils.widgets.get("storage_input_path")
storage_output_folder = dbutils.widgets.get("storage_output_folder")
file_format = dbutils.widgets.get("file_format")
anonymized_column = dbutils.widgets.get("anonymized_column")
storage_mount_name = os.environ["STORAGE_MOUNT_NAME"]
storage_input_path = dbutils.widgets.get("storage_input_path")
storage_output_folder = dbutils.widgets.get("storage_output_folder")
file_format = dbutils.widgets.get("file_format")
anonymized_column = dbutils.widgets.get("anonymized_column")
In [ ]:
Copied!
if file_format == "csv":
input_df = spark.read.option("header", "true").csv(
storage_mount_name + "/" + storage_input_path
)
elif file_format == "text":
input_df = (
spark.read.text(storage_mount_name + "/" + storage_input_path + "/*")
.withColumn("filename", input_file_name())
.withColumn(
"filename",
regexp_replace("filename", "^.*(" + storage_mount_name + "/)", ""),
)
)
if file_format == "csv":
input_df = spark.read.option("header", "true").csv(
storage_mount_name + "/" + storage_input_path
)
elif file_format == "text":
input_df = (
spark.read.text(storage_mount_name + "/" + storage_input_path + "/*")
.withColumn("filename", input_file_name())
.withColumn(
"filename",
regexp_replace("filename", "^.*(" + storage_mount_name + "/)", ""),
)
)
In [ ]:
Copied!
# load the files
display(input_df)
# load the files
display(input_df)
COMMAND ----------
MAGIC %md MAGIC # Anonymize text using Presidio
COMMAND ----------
In [ ]:
Copied!
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
define a pandas UDF function and a series function over it. Note that analyzer and anonymizer are broadcasted.
In [ ]:
Copied!
def anonymize_text(text: str) -> str:
analyzer = broadcasted_analyzer.value
anonymizer = broadcasted_anonymizer.value
analyzer_results = analyzer.analyze(text=text, language="en")
anonymized_results = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={"DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})},
)
return anonymized_results.text
def anonymize_text(text: str) -> str:
analyzer = broadcasted_analyzer.value
anonymizer = broadcasted_anonymizer.value
analyzer_results = analyzer.analyze(text=text, language="en")
anonymized_results = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={"DEFAULT": OperatorConfig("replace", {"new_value": ""})},
)
return anonymized_results.text
In [ ]:
Copied!
def anonymize_series(s: pd.Series) -> pd.Series:
return s.apply(anonymize_text)
def anonymize_series(s: pd.Series) -> pd.Series:
return s.apply(anonymize_text)
In [ ]:
Copied!
# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())
# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())
In [ ]:
Copied!
# apply the udf
anonymized_df = input_df.withColumn(
anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)
# apply the udf
anonymized_df = input_df.withColumn(
anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)
COMMAND ----------
MAGIC %md MAGIC # Write the anonymized content back to mounted folder
COMMAND ----------
write the dataset to output folder
In [ ]:
Copied!
anonymized_df.write.option("header", "true").csv(
storage_mount_name + "/" + storage_output_folder
)
anonymized_df.write.option("header", "true").csv(
storage_mount_name + "/" + storage_output_folder
)
COMMAND ----------