From 174d61aeab6ad7684d9e5fa40f7bb61e75d845a6 Mon Sep 17 00:00:00 2001 From: vighnesh_wednesday Date: Fri, 15 Dec 2023 12:22:33 +0530 Subject: [PATCH] fixed end of file --- app/.custom-env | 9 ++ app/connect_databricks.py | 4 - app/connect_glue.py | 15 +++- app/enviroment.py | 65 ++++++++++++++ app/extraction.py | 27 +++--- app/spark_wrapper.py | 15 ++++ main.py | 147 ++++--------------------------- tests/test_connect_databricks.py | 29 +----- tests/test_connect_glue.py | 72 ++++++++++++--- tests/test_extraction.py | 114 ++++++++++++------------ 10 files changed, 247 insertions(+), 250 deletions(-) create mode 100644 app/.custom-env create mode 100644 app/enviroment.py diff --git a/app/.custom-env b/app/.custom-env new file mode 100644 index 0000000..80e6e18 --- /dev/null +++ b/app/.custom-env @@ -0,0 +1,9 @@ +# this is my custom file for read & write path based on environment + +GLUE_READ_PATH = "s3://glue-bucket-vighnesh/rawdata/" +GLUE_WRITE_PATH = "s3://glue-bucket-vighnesh/transformed/" + +DATABRICKS_READ_PATH = "/mnt/rawdata/" +DATABRICKS_WRITE_PATH = "/mnt/transformed/" + +KAGGLE_PATH = "mastmustu/insurance-claims-fraud-data" diff --git a/app/connect_databricks.py b/app/connect_databricks.py index bd6a20d..93b2c26 100644 --- a/app/connect_databricks.py +++ b/app/connect_databricks.py @@ -1,10 +1,6 @@ import os -def get_param_value(dbutils, param_key: str): - return dbutils.widgets.get(param_key) - - def create_mount(dbutils, container_name, mount_path): storage_name = os.environ["storage_account_name"] storage_key = os.environ["datalake_access_key"] diff --git a/app/connect_glue.py b/app/connect_glue.py index d18b928..88224cf 100644 --- a/app/connect_glue.py +++ b/app/connect_glue.py @@ -1,12 +1,21 @@ +import sys from pyspark.context import SparkContext +from awsglue.utils import getResolvedOptions, GlueArgumentError from awsglue.context import GlueContext -from awsglue.job import Job def init_glue(): + try: + args = getResolvedOptions( + sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY"] + ) + print("\nRunning Glue Online\n") + except GlueArgumentError: + print("\nRunning Glue Locally\n") + args = {"JOB_NAME": "local"} + sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session - job = Job(glueContext) - return glueContext, spark, job + return spark, args diff --git a/app/enviroment.py b/app/enviroment.py new file mode 100644 index 0000000..64a8444 --- /dev/null +++ b/app/enviroment.py @@ -0,0 +1,65 @@ +import os +import subprocess +import dotenv +import app.connect_databricks as cd +import app.connect_glue as cg +import app.spark_wrapper as sw + + +def set_keys_get_spark(databricks: bool, dbutils, spark): + if databricks: + os.environ["KAGGLE_USERNAME"] = dbutils.widgets.get("kaggle_username") + + os.environ["KAGGLE_KEY"] = dbutils.widgets.get("kaggle_token") + + os.environ["storage_account_name"] = dbutils.widgets.get("storage_account_name") + + os.environ["datalake_access_key"] = dbutils.widgets.get("datalake_access_key") + + cd.create_mount(dbutils, "rawdata", "/mnt/rawdata/") + cd.create_mount(dbutils, "transformed", "/mnt/transformed/") + + else: + spark, args = cg.init_glue() + if args["JOB_NAME"] == "local": + dotenv.load_dotenv() + else: + os.environ["KAGGLE_USERNAME"] = args["KAGGLE_USERNAME"] + os.environ["KAGGLE_KEY"] = args["KAGGLE_KEY"] + + return spark + + +def get_dataframes(databricks: bool, spark, directory_path: str): + dfs = [] + + if databricks: + csv_files = [ + file for file in os.listdir(directory_path) if file.endswith(".csv") + ] + else: + cmd = f"aws s3 ls {directory_path}" + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + shell=True, + check=True, + ) + lines = result.stdout.split("\n") + csv_files = [line.split()[-1] for line in lines if line.endswith(".csv")] + + for csv_file in csv_files: + file_path = os.path.join(directory_path, csv_file) + df = sw.create_frame(spark, file_path) + dfs.append(df) + + return dfs + + +def get_paths(databricks: bool): + if databricks: + return os.getenv("DATABRICKS_READ_PATH"), os.getenv("DATABRICKS_WRITE_PATH") + + return os.getenv("GLUE_READ_PATH"), os.getenv("GLUE_WRITE_PATH") diff --git a/app/extraction.py b/app/extraction.py index 13bfaf6..f09ae88 100644 --- a/app/extraction.py +++ b/app/extraction.py @@ -1,26 +1,19 @@ import os +import kaggle -os.system("pip install kaggle") -import kaggle # pylint: disable=wrong-import-position - -def extract_from_kaggle(flag: bool): - if flag: - read_path = "/dbfs/mnt/rawdata/" - write_path = "/mnt/transformed/" +def extract_from_kaggle(databricks: bool, extraction_path: str): + if databricks: + temp_path = "/dbfs" + extraction_path else: - read_path = "temp/" - write_path = "s3://glue-bucket-vighnesh/transformed/" + temp_path = "temp/" api = kaggle.KaggleApi() api.authenticate() - api.dataset_download_cli( - "mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path - ) + api.dataset_download_cli(os.getenv("KAGGLE_PATH"), unzip=True, path=temp_path) - if flag: - read_path = read_path[5:] - else: - read_path = "s3://glue-bucket-vighnesh/rawdata/" + if databricks is False: + copy_command = f"aws s3 cp {temp_path} {extraction_path} --recursive" + os.system(copy_command) - return read_path, write_path + print(f"Extracted Data Successfully in path: {extraction_path}") diff --git a/app/spark_wrapper.py b/app/spark_wrapper.py index d28b9e6..9dc6224 100644 --- a/app/spark_wrapper.py +++ b/app/spark_wrapper.py @@ -33,3 +33,18 @@ def make_window( return ( Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to) ) + + +# this is my custom cleaning function specific for mastmustu/insurance-claims-fraud-data data +def rename_same_columns(df: DataFrame, prefix: str) -> DataFrame: + cols_dict = { + "ADDRESS_LINE1": f"{prefix}_ADDRESS_LINE1", + "ADDRESS_LINE2": f"{prefix}_ADDRESS_LINE2", + "CITY": f"{prefix}_CITY", + "STATE": f"{prefix}_STATE", + "POSTAL_CODE": f"{prefix}_POSTAL_CODE", + } + + df = rename_columns(df, cols_dict) + + return df diff --git a/main.py b/main.py index b65cf8f..b857eca 100644 --- a/main.py +++ b/main.py @@ -1,159 +1,49 @@ # Databricks notebook source -import os -import subprocess +from dotenv import load_dotenv from pyspark.sql.functions import when, col from pyspark.sql.functions import round as sp_round from pyspark.sql import Window import pyspark.sql.functions as F +import app.enviroment as env import app.spark_wrapper as sw -os.system("pip install python-dotenv") -import dotenv # pylint: disable=wrong-import-position, disable=wrong-import-order +load_dotenv("app/.custom-env") # COMMAND ---------- -# try: -# import app.connect_databricks as cd # pylint: disable=ungrouped-imports -# import json - -# # Comment the following line if running directly in cloud notebook -# spark, dbutils = cd.init_databricks() - -# with open("/dbfs/mnt/config/keys.json", encoding="utf-8") as file: -# keys = json.load(file) - -# flag = keys["flag"] -# except: # pylint: disable=bare-except -# flag = "False" - - -# flag = bool(flag == "True") - if "dbutils" in locals(): - flag = True + databricks = True else: spark = None dbutils = None - flag = False - + databricks = False +read_path, write_path = env.get_paths(databricks) # COMMAND ---------- -if flag: - import app.connect_databricks as cd - - os.environ["KAGGLE_USERNAME"] = cd.get_param_value(dbutils, "kaggle_username") - - os.environ["KAGGLE_KEY"] = cd.get_param_value(dbutils, "kaggle_token") - - os.environ["storage_account_name"] = cd.get_param_value( - dbutils, "storage_account_name" - ) - - os.environ["datalake_access_key"] = cd.get_param_value( - dbutils, "datalake_access_key" - ) - +spark = env.set_keys_get_spark(databricks, dbutils, spark) # COMMAND ---------- -if flag: - import app.connect_databricks as cd - # creating mounts - cd.create_mount(dbutils, "zipdata", "/mnt/zipdata/") - cd.create_mount(dbutils, "rawdata", "/mnt/rawdata/") - cd.create_mount(dbutils, "transformed", "/mnt/transformed/") +# Comment below 2 lines if you don't want to extract from kaggle -else: - import app.connect_glue as cg - from awsglue.utils import getResolvedOptions - import sys - - # initiating glue spark - try: - print("Setting up params...") - args = getResolvedOptions( - sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY", "FLAG"] - ) - except: # pylint: disable=bare-except - args = {"JOB_NAME": "local"} - - glueContext, spark, job = cg.init_glue() - job.init("sample") - if args["JOB_NAME"] == "local": - dotenv.load_dotenv() - else: - os.environ["KAGGLE_USERNAME"] = args["KAGGLE_USERNAME"] - os.environ["KAGGLE_KEY"] = args["KAGGLE_KEY"] - - -# COMMAND ---------- from app.extraction import extract_from_kaggle # pylint: disable=wrong-import-position -# COMMAND ---------- - -read_path, write_path = extract_from_kaggle(flag) - -if flag is False: - copy_command = f"aws s3 cp temp/ {read_path} --recursive" - result = subprocess.run( - copy_command, - shell=True, - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - print("Output:", result.stdout) +extract_from_kaggle(databricks, read_path) # COMMAND ---------- -# reading data in different frames - -employee = sw.create_frame(spark, read_path + "employee_data.csv") +# fmt: off +[employee, insurance, vendor] = env.get_dataframes(databricks, spark, read_path) # pylint: disable=unbalanced-tuple-unpacking -employee = sw.rename_columns( - employee, - { - "ADDRESS_LINE1": "AGENT_ADDRESS_LINE1", - "ADDRESS_LINE2": "AGENT_ADDRESS_LINE2", - "CITY": "AGENT_CITY", - "STATE": "AGENT_STATE", - "POSTAL_CODE": "AGENT_POSTAL_CODE", - }, -) - -# COMMAND ---------- +# fmt: on -insurance = sw.create_frame(spark, read_path + "insurance_data.csv") - -insurance = sw.rename_columns( - insurance, - { - "ADDRESS_LINE1": "CUSTOMER_ADDRESS_LINE1", - "ADDRESS_LINE2": "CUSTOMER_ADDRESS_LINE2", - "CITY": "CUSTOMER_CITY", - "STATE": "CUSTOMER_STATE", - "POSTAL_CODE": "CUSTOMER_POSTAL_CODE", - }, -) - -# COMMAND ---------- - -vendor = sw.create_frame(spark, read_path + "vendor_data.csv") - -vendor = sw.rename_columns( - vendor, - { - "ADDRESS_LINE1": "VENDOR_ADDRESS_LINE1", - "ADDRESS_LINE2": "VENDOR_ADDRESS_LINE2", - "CITY": "VENDOR_CITY", - "STATE": "VENDOR_STATE", - "POSTAL_CODE": "VENDOR_POSTAL_CODE", - }, -) +# performing cleaning +employee = sw.rename_same_columns(employee, "AGENT") +insurance = sw.rename_same_columns(insurance, "CUSTOMER") +vendor = sw.rename_same_columns(vendor, "VENDOR") # COMMAND ---------- @@ -315,9 +205,4 @@ def get_cond(type1, type2): # finally writting the data in transformed container df.coalesce(1).write.csv(write_path + "final_data.csv", header=True, mode="overwrite") -# COMMAND ---------- - -if flag is False: - job.commit() - print("Execution Complete") diff --git a/tests/test_connect_databricks.py b/tests/test_connect_databricks.py index c843548..dcea081 100644 --- a/tests/test_connect_databricks.py +++ b/tests/test_connect_databricks.py @@ -1,6 +1,6 @@ import unittest from unittest.mock import MagicMock, patch -from app.connect_databricks import create_mount, get_param_value +from app.connect_databricks import create_mount def constructor(self, mount_point): @@ -63,30 +63,3 @@ def test_create_mount_already_mounted(self): # Assertions self.dbutils.fs.mount.assert_not_called() self.dbutils.fs.refreshMounts.assert_called_once() - - def test_get_param_value_success(self): - param_key = "mock_param_key" - mock_param_value = "mock_param_value" - - # Mocking dbutils.widgets.get() to return a value - self.dbutils.widgets.get.return_value = mock_param_value - - # Call the function to test - result = get_param_value(self.dbutils, param_key) - - # Assertions - self.assertEqual(result, mock_param_value) - self.dbutils.widgets.get.assert_called_once_with(param_key) - - def test_get_param_value_failure(self): - param_key = "mock_param_key" - - # Mocking dbutils.widgets.get() to return None (indicating failure) - self.dbutils.widgets.get.return_value = None - - # Call the function to test - result = get_param_value(self.dbutils, param_key) - - # Assertions - self.assertIsNone(result) - self.dbutils.widgets.get.assert_called_once_with(param_key) diff --git a/tests/test_connect_glue.py b/tests/test_connect_glue.py index 16a837a..c7051fd 100644 --- a/tests/test_connect_glue.py +++ b/tests/test_connect_glue.py @@ -1,4 +1,5 @@ import unittest +import sys from unittest.mock import patch, MagicMock from app.connect_glue import init_glue @@ -6,47 +7,92 @@ class TestInitGlue(unittest.TestCase): @patch("app.connect_glue.SparkContext") @patch("app.connect_glue.GlueContext") - @patch("app.connect_glue.Job") - def test_init_glue(self, mock_job, mock_glue_context, mock_spark_context): - # Mock the SparkContext, GlueContext, and Job + @patch("app.connect_glue.getResolvedOptions") + def test_init_glue( + self, mock_get_resolved_options, mock_glue_context, mock_spark_context + ): + # Mock the SparkContext, GlueContext + mock_spark_context_instance = MagicMock() + mock_glue_context_instance = MagicMock() + + # Set up the behavior of the mock instances + mock_spark_context.return_value = mock_spark_context_instance + mock_glue_context.return_value = mock_glue_context_instance + + # Set up the behavior of the getResolvedOptions mock + mock_get_resolved_options.return_value = { + "JOB_NAME": "test", + "KAGGLE_USERNAME": "test_username", + "KAGGLE_KEY": "test_key", + } + + # Call the function to test + spark, args = init_glue() + + # Assertions + mock_spark_context.assert_called_once() + mock_glue_context.assert_called_once_with(mock_spark_context_instance) + mock_get_resolved_options.assert_called_once_with( + sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY"] + ) + + # Check if the returned values are correct + self.assertEqual(spark, mock_glue_context_instance.spark_session) + self.assertEqual( + args, + { + "JOB_NAME": "test", + "KAGGLE_USERNAME": "test_username", + "KAGGLE_KEY": "test_key", + }, + ) + + @patch("app.connect_glue.SparkContext") + @patch("app.connect_glue.GlueContext") + def test_init_glue_local(self, mock_glue_context, mock_spark_context): + # Mock the SparkContext, GlueContext mock_spark_context_instance = MagicMock() mock_glue_context_instance = MagicMock() - mock_job_instance = MagicMock() # Set up the behavior of the mock instances mock_spark_context.return_value = mock_spark_context_instance mock_glue_context.return_value = mock_glue_context_instance - mock_job.return_value = mock_job_instance # Call the function to test - glue_context, spark, job = init_glue() + spark, args = init_glue() # Assertions mock_spark_context.assert_called_once() mock_glue_context.assert_called_once_with(mock_spark_context_instance) - mock_job.assert_called_once_with(mock_glue_context_instance) # Check if the returned values are correct - self.assertEqual(glue_context, mock_glue_context_instance) self.assertEqual(spark, mock_glue_context_instance.spark_session) - self.assertEqual(job, mock_job_instance) + self.assertEqual(args, {"JOB_NAME": "local"}) @patch("app.connect_glue.SparkContext") @patch("app.connect_glue.GlueContext") - @patch("app.connect_glue.Job") - def test_init_glue_failure(self, mock_job, mock_glue_context, mock_spark_context): + @patch("app.connect_glue.getResolvedOptions") + def test_init_glue_spark_context_failure( + self, mock_get_resolved_options, mock_glue_context, mock_spark_context + ): # Simulate a ValueError during SparkContext initialization error_statement = "Simulated SparkContext initialization failure" mock_spark_context.side_effect = ValueError(error_statement) + # Set up the behavior of the getResolvedOptions mock + mock_get_resolved_options.return_value = { + "JOB_NAME": "test", + "KAGGLE_USERNAME": "test_username", + "KAGGLE_KEY": "test_key", + } + # Call the function to test with self.assertRaises(ValueError) as context: init_glue() # Assertions mock_spark_context.assert_called_once() - mock_glue_context.assert_not_called() # GlueContext should not be called if SparkContext initialization fails - mock_job.assert_not_called() # Job should not be called if SparkContext initialization fails + mock_glue_context.assert_not_called() # Check if the error displayed correctly self.assertEqual(str(context.exception), error_statement) diff --git a/tests/test_extraction.py b/tests/test_extraction.py index eefb4e2..e5f95cd 100644 --- a/tests/test_extraction.py +++ b/tests/test_extraction.py @@ -3,72 +3,78 @@ from app.extraction import extract_from_kaggle -class TestExtraction(unittest.TestCase): +class TestExtractFromKaggle(unittest.TestCase): + @patch("app.extraction.os") @patch("app.extraction.kaggle") - def test_extract_from_kaggle_success(self, mock_kaggle): + def test_successful_extraction_databricks(self, mock_kaggle, mock_os): + # creating mock instances mock_kaggle_instance = mock_kaggle - mock_api_instance = mock_kaggle_instance.KaggleApi.return_value - # Mocking authenticate and dataset_download_cli methods - mock_api_instance.authenticate.return_value = None - mock_api_instance.dataset_download_cli.return_value = None - - # Call the function to test with flag=True for success case - result = extract_from_kaggle(True) - - # Assertions - mock_kaggle_instance.KaggleApi.assert_called_once() - mock_api_instance.authenticate.assert_called_once() - mock_api_instance.dataset_download_cli.assert_called_once_with( - "mastmustu/insurance-claims-fraud-data", - unzip=True, - path="/dbfs/mnt/rawdata/", - ) + mock_kaggle_api = mock_kaggle_instance.KaggleApi.return_value + mock_kaggle_api.authenticate.return_value = None + + mock_os.getenv.return_value = "path/to/kaggle/data" + + mock_kaggle_api.dataset_download_cli.return_value = None - self.assertEqual(result, ("/mnt/rawdata/", "/mnt/transformed/")) + # calling the function + extract_from_kaggle(databricks=True, extraction_path="/some/path") + mock_kaggle.KaggleApi.assert_called_once() + mock_kaggle_api.authenticate.assert_called_once() + mock_kaggle_api.dataset_download_cli.assert_called_once_with( + "path/to/kaggle/data", unzip=True, path="/dbfs/some/path" + ) + mock_os.getenv.assert_called_once() + mock_os.system.assert_not_called() # since it's databricks os.system shouldn't be called + + @patch("app.extraction.os") @patch("app.extraction.kaggle") - def test_extract_from_kaggle_success_false(self, mock_kaggle): + def test_successful_extraction_local(self, mock_kaggle, mock_os): + # creating mock instances mock_kaggle_instance = mock_kaggle - mock_api_instance = mock_kaggle_instance.KaggleApi.return_value - # Mocking authenticate and dataset_download_cli methods - mock_api_instance.authenticate.return_value = None - mock_api_instance.dataset_download_cli.return_value = None - - # Call the function to test with flag=True for success case - result = extract_from_kaggle(False) - - # Assertions - mock_kaggle_instance.KaggleApi.assert_called_once() - mock_api_instance.authenticate.assert_called_once() - mock_api_instance.dataset_download_cli.assert_called_once_with( - "mastmustu/insurance-claims-fraud-data", unzip=True, path="temp/" - ) + mock_kaggle_api = mock_kaggle_instance.KaggleApi.return_value + mock_kaggle_api.authenticate.return_value = None - self.assertEqual( - result, - ( - "s3://glue-bucket-vighnesh/rawdata/", - "s3://glue-bucket-vighnesh/transformed/", - ), + mock_os.getenv.return_value = "path/to/kaggle/data" + + mock_kaggle_api.dataset_download_cli.return_value = None + + # calling the function + extract_from_kaggle(databricks=False, extraction_path="path/for/extraction") + + mock_kaggle.KaggleApi.assert_called_once() + mock_kaggle_api.authenticate.assert_called_once() + mock_kaggle_api.dataset_download_cli.assert_called_once_with( + "path/to/kaggle/data", unzip=True, path="temp/" + ) + mock_os.getenv.assert_called_once() + mock_os.system.assert_called_once_with( + "aws s3 cp temp/ path/for/extraction --recursive" ) + @patch("app.extraction.os") @patch("app.extraction.kaggle") - def test_extract_from_kaggle_failure(self, mock_kaggle): + def test_failed_extraction_local(self, mock_kaggle, mock_os): + # creating mock instances mock_kaggle_instance = mock_kaggle - mock_api_instance = mock_kaggle_instance.KaggleApi.return_value - # Mocking authenticate and dataset_download_cli methods - mock_api_instance.authenticate.side_effect = Exception( - "Simulated authentication failure" + mock_kaggle_api = mock_kaggle_instance.KaggleApi.return_value + mock_kaggle_api.authenticate.side_effect = SyntaxError( + "Simulated Error in testing" ) - # Call the function to test with flag=False for failure case - with self.assertRaises(Exception) as context: - extract_from_kaggle(False) + mock_kaggle_api.dataset_download_cli.return_value = None + + # calling the function + with self.assertRaises(SyntaxError) as context: + extract_from_kaggle(databricks=False, extraction_path="/invalid/path") + + mock_kaggle.KaggleApi.assert_called_once() + mock_kaggle_api.authenticate.assert_called_once() - # Assertions - mock_kaggle_instance.KaggleApi.assert_called_once() - mock_api_instance.authenticate.assert_called_once() - mock_api_instance.dataset_download_cli.assert_not_called() + # since authentication fialed followign will not be called + mock_kaggle_api.dataset_download_cli.assert_not_called() + mock_os.getenv.assert_not_called() + mock_os.system.assert_not_called() - # Check if the correct exception is raised - self.assertEqual(str(context.exception), "Simulated authentication failure") + actual_error_message = str(context.exception) + self.assertTrue("Simulated Error" in actual_error_message)