Skip to content

Commit

Permalink
fixed end of file
Browse files Browse the repository at this point in the history
  • Loading branch information
vighnesh-wednesday committed Dec 15, 2023
1 parent 99fad47 commit 174d61a
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 250 deletions.
9 changes: 9 additions & 0 deletions app/.custom-env
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# this is my custom file for read & write path based on environment

GLUE_READ_PATH = "s3://glue-bucket-vighnesh/rawdata/"
GLUE_WRITE_PATH = "s3://glue-bucket-vighnesh/transformed/"

DATABRICKS_READ_PATH = "/mnt/rawdata/"
DATABRICKS_WRITE_PATH = "/mnt/transformed/"

KAGGLE_PATH = "mastmustu/insurance-claims-fraud-data"
4 changes: 0 additions & 4 deletions app/connect_databricks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import os


def get_param_value(dbutils, param_key: str):
return dbutils.widgets.get(param_key)


def create_mount(dbutils, container_name, mount_path):
storage_name = os.environ["storage_account_name"]
storage_key = os.environ["datalake_access_key"]
Expand Down
15 changes: 12 additions & 3 deletions app/connect_glue.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import sys
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions, GlueArgumentError
from awsglue.context import GlueContext
from awsglue.job import Job


def init_glue():
try:
args = getResolvedOptions(
sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY"]
)
print("\nRunning Glue Online\n")
except GlueArgumentError:
print("\nRunning Glue Locally\n")
args = {"JOB_NAME": "local"}

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

return glueContext, spark, job
return spark, args
65 changes: 65 additions & 0 deletions app/enviroment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
import subprocess
import dotenv
import app.connect_databricks as cd
import app.connect_glue as cg
import app.spark_wrapper as sw


def set_keys_get_spark(databricks: bool, dbutils, spark):
if databricks:
os.environ["KAGGLE_USERNAME"] = dbutils.widgets.get("kaggle_username")

os.environ["KAGGLE_KEY"] = dbutils.widgets.get("kaggle_token")

os.environ["storage_account_name"] = dbutils.widgets.get("storage_account_name")

os.environ["datalake_access_key"] = dbutils.widgets.get("datalake_access_key")

cd.create_mount(dbutils, "rawdata", "/mnt/rawdata/")
cd.create_mount(dbutils, "transformed", "/mnt/transformed/")

else:
spark, args = cg.init_glue()
if args["JOB_NAME"] == "local":
dotenv.load_dotenv()
else:
os.environ["KAGGLE_USERNAME"] = args["KAGGLE_USERNAME"]
os.environ["KAGGLE_KEY"] = args["KAGGLE_KEY"]

return spark


def get_dataframes(databricks: bool, spark, directory_path: str):
dfs = []

if databricks:
csv_files = [
file for file in os.listdir(directory_path) if file.endswith(".csv")
]
else:
cmd = f"aws s3 ls {directory_path}"
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
check=True,
)
lines = result.stdout.split("\n")
csv_files = [line.split()[-1] for line in lines if line.endswith(".csv")]

for csv_file in csv_files:
file_path = os.path.join(directory_path, csv_file)
df = sw.create_frame(spark, file_path)
dfs.append(df)

return dfs


def get_paths(databricks: bool):
if databricks:
return os.getenv("DATABRICKS_READ_PATH"), os.getenv("DATABRICKS_WRITE_PATH")

return os.getenv("GLUE_READ_PATH"), os.getenv("GLUE_WRITE_PATH")
27 changes: 10 additions & 17 deletions app/extraction.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
import os
import kaggle

os.system("pip install kaggle")
import kaggle # pylint: disable=wrong-import-position


def extract_from_kaggle(flag: bool):
if flag:
read_path = "/dbfs/mnt/rawdata/"
write_path = "/mnt/transformed/"
def extract_from_kaggle(databricks: bool, extraction_path: str):
if databricks:
temp_path = "/dbfs" + extraction_path
else:
read_path = "temp/"
write_path = "s3://glue-bucket-vighnesh/transformed/"
temp_path = "temp/"

api = kaggle.KaggleApi()
api.authenticate()
api.dataset_download_cli(
"mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path
)
api.dataset_download_cli(os.getenv("KAGGLE_PATH"), unzip=True, path=temp_path)

if flag:
read_path = read_path[5:]
else:
read_path = "s3://glue-bucket-vighnesh/rawdata/"
if databricks is False:
copy_command = f"aws s3 cp {temp_path} {extraction_path} --recursive"
os.system(copy_command)

return read_path, write_path
print(f"Extracted Data Successfully in path: {extraction_path}")
15 changes: 15 additions & 0 deletions app/spark_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,18 @@ def make_window(
return (
Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to)
)


# this is my custom cleaning function specific for mastmustu/insurance-claims-fraud-data data
def rename_same_columns(df: DataFrame, prefix: str) -> DataFrame:
cols_dict = {
"ADDRESS_LINE1": f"{prefix}_ADDRESS_LINE1",
"ADDRESS_LINE2": f"{prefix}_ADDRESS_LINE2",
"CITY": f"{prefix}_CITY",
"STATE": f"{prefix}_STATE",
"POSTAL_CODE": f"{prefix}_POSTAL_CODE",
}

df = rename_columns(df, cols_dict)

return df
147 changes: 16 additions & 131 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,159 +1,49 @@
# Databricks notebook source
import os
import subprocess
from dotenv import load_dotenv

from pyspark.sql.functions import when, col
from pyspark.sql.functions import round as sp_round
from pyspark.sql import Window
import pyspark.sql.functions as F

import app.enviroment as env
import app.spark_wrapper as sw

os.system("pip install python-dotenv")
import dotenv # pylint: disable=wrong-import-position, disable=wrong-import-order
load_dotenv("app/.custom-env")

# COMMAND ----------

# try:
# import app.connect_databricks as cd # pylint: disable=ungrouped-imports
# import json

# # Comment the following line if running directly in cloud notebook
# spark, dbutils = cd.init_databricks()

# with open("/dbfs/mnt/config/keys.json", encoding="utf-8") as file:
# keys = json.load(file)

# flag = keys["flag"]
# except: # pylint: disable=bare-except
# flag = "False"


# flag = bool(flag == "True")

if "dbutils" in locals():
flag = True
databricks = True
else:
spark = None
dbutils = None
flag = False

databricks = False

read_path, write_path = env.get_paths(databricks)
# COMMAND ----------

if flag:
import app.connect_databricks as cd

os.environ["KAGGLE_USERNAME"] = cd.get_param_value(dbutils, "kaggle_username")

os.environ["KAGGLE_KEY"] = cd.get_param_value(dbutils, "kaggle_token")

os.environ["storage_account_name"] = cd.get_param_value(
dbutils, "storage_account_name"
)

os.environ["datalake_access_key"] = cd.get_param_value(
dbutils, "datalake_access_key"
)

spark = env.set_keys_get_spark(databricks, dbutils, spark)

# COMMAND ----------
if flag:
import app.connect_databricks as cd

# creating mounts
cd.create_mount(dbutils, "zipdata", "/mnt/zipdata/")
cd.create_mount(dbutils, "rawdata", "/mnt/rawdata/")
cd.create_mount(dbutils, "transformed", "/mnt/transformed/")
# Comment below 2 lines if you don't want to extract from kaggle

else:
import app.connect_glue as cg
from awsglue.utils import getResolvedOptions
import sys

# initiating glue spark
try:
print("Setting up params...")
args = getResolvedOptions(
sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY", "FLAG"]
)
except: # pylint: disable=bare-except
args = {"JOB_NAME": "local"}

glueContext, spark, job = cg.init_glue()
job.init("sample")
if args["JOB_NAME"] == "local":
dotenv.load_dotenv()
else:
os.environ["KAGGLE_USERNAME"] = args["KAGGLE_USERNAME"]
os.environ["KAGGLE_KEY"] = args["KAGGLE_KEY"]


# COMMAND ----------
from app.extraction import extract_from_kaggle # pylint: disable=wrong-import-position

# COMMAND ----------

read_path, write_path = extract_from_kaggle(flag)

if flag is False:
copy_command = f"aws s3 cp temp/ {read_path} --recursive"
result = subprocess.run(
copy_command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
print("Output:", result.stdout)
extract_from_kaggle(databricks, read_path)

# COMMAND ----------

# reading data in different frames

employee = sw.create_frame(spark, read_path + "employee_data.csv")
# fmt: off
[employee, insurance, vendor] = env.get_dataframes(databricks, spark, read_path) # pylint: disable=unbalanced-tuple-unpacking

employee = sw.rename_columns(
employee,
{
"ADDRESS_LINE1": "AGENT_ADDRESS_LINE1",
"ADDRESS_LINE2": "AGENT_ADDRESS_LINE2",
"CITY": "AGENT_CITY",
"STATE": "AGENT_STATE",
"POSTAL_CODE": "AGENT_POSTAL_CODE",
},
)

# COMMAND ----------
# fmt: on

insurance = sw.create_frame(spark, read_path + "insurance_data.csv")

insurance = sw.rename_columns(
insurance,
{
"ADDRESS_LINE1": "CUSTOMER_ADDRESS_LINE1",
"ADDRESS_LINE2": "CUSTOMER_ADDRESS_LINE2",
"CITY": "CUSTOMER_CITY",
"STATE": "CUSTOMER_STATE",
"POSTAL_CODE": "CUSTOMER_POSTAL_CODE",
},
)

# COMMAND ----------

vendor = sw.create_frame(spark, read_path + "vendor_data.csv")

vendor = sw.rename_columns(
vendor,
{
"ADDRESS_LINE1": "VENDOR_ADDRESS_LINE1",
"ADDRESS_LINE2": "VENDOR_ADDRESS_LINE2",
"CITY": "VENDOR_CITY",
"STATE": "VENDOR_STATE",
"POSTAL_CODE": "VENDOR_POSTAL_CODE",
},
)
# performing cleaning
employee = sw.rename_same_columns(employee, "AGENT")
insurance = sw.rename_same_columns(insurance, "CUSTOMER")
vendor = sw.rename_same_columns(vendor, "VENDOR")

# COMMAND ----------

Expand Down Expand Up @@ -315,9 +205,4 @@ def get_cond(type1, type2):
# finally writting the data in transformed container
df.coalesce(1).write.csv(write_path + "final_data.csv", header=True, mode="overwrite")

# COMMAND ----------

if flag is False:
job.commit()

print("Execution Complete")
29 changes: 1 addition & 28 deletions tests/test_connect_databricks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
from unittest.mock import MagicMock, patch
from app.connect_databricks import create_mount, get_param_value
from app.connect_databricks import create_mount


def constructor(self, mount_point):
Expand Down Expand Up @@ -63,30 +63,3 @@ def test_create_mount_already_mounted(self):
# Assertions
self.dbutils.fs.mount.assert_not_called()
self.dbutils.fs.refreshMounts.assert_called_once()

def test_get_param_value_success(self):
param_key = "mock_param_key"
mock_param_value = "mock_param_value"

# Mocking dbutils.widgets.get() to return a value
self.dbutils.widgets.get.return_value = mock_param_value

# Call the function to test
result = get_param_value(self.dbutils, param_key)

# Assertions
self.assertEqual(result, mock_param_value)
self.dbutils.widgets.get.assert_called_once_with(param_key)

def test_get_param_value_failure(self):
param_key = "mock_param_key"

# Mocking dbutils.widgets.get() to return None (indicating failure)
self.dbutils.widgets.get.return_value = None

# Call the function to test
result = get_param_value(self.dbutils, param_key)

# Assertions
self.assertIsNone(result)
self.dbutils.widgets.get.assert_called_once_with(param_key)
Loading

0 comments on commit 174d61a

Please sign in to comment.