Skip to content

Commit

Permalink
Plugin bug fix (#21)
Browse files Browse the repository at this point in the history
* few improvement changes for eks

* minor changes to plugin

* plugin improvement for k8s

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor code changes MANIFEST.in

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor pre-commit changes

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
mpvgithub and pre-commit-ci[bot] authored Feb 23, 2024
1 parent d990e89 commit c823b81
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,4 @@ ipython_config.py
*.terraform.lock.hcl
covalent-eks-cluster_config
*.tfvars
!default.tfvars
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Changed

- Changed the folder structure for the terraform files matching standard plugin folder format
- Minor code changes to kubernetes yml files
- Modified the plugin to generate covalent related files to the **cache_dir** directory
- Modified MANIFEST.in to validate plugin
- Modified **requirements.txt** to remove the exact pins for the version to be installed.
- Modified **requirements.txt** for docker, covalent, kubernetes version to >= 7.0.0, >=0.232.0, >=29.0.0 respectively
- Modified **requirements.txt** to set covalent version minimum from **0.232.0**
-

## [0.2.0] - 2023-09-20

Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ include requirements.txt
include covalent_kubernetes_plugin/assets/infra/*
exclude covalent_kubernetes_plugin/assets/infra/*.swp
include covalent_kubernetes_plugin/assets/infra/*.tf
include covalent_kubernetes_plugin/assets/infra/templates/*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ deploy:

clean:
(cd covalent_kubernetes_plugin/assets/infra && terraform destroy -auto-approve -state $(HOME)/.cache/covalent/terraform.tfstate)
rm -f covalent_kubernetes_plugin/infra/cluster_autoscaler.yml
rm -f covalent_kubernetes_plugin/assets/infra/cluster_autoscaler.yml
30 changes: 15 additions & 15 deletions covalent_kubernetes_plugin/assets/infra/covalent_eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "5.17.0"
source = "hashicorp/aws"
}
}
}
Expand Down Expand Up @@ -74,15 +73,16 @@ module "vpc" {
}

resource "aws_ecr_repository" "ecr_repository" {
name = var.aws_ecr_repo
name = var.aws_ecr_repo
image_tag_mutability = "IMMUTABLE"
force_delete = true
image_scanning_configuration {
scan_on_push = false
}
}

resource "aws_s3_bucket" "s3_bucket" {
bucket = var.aws_s3_bucket
bucket = var.aws_s3_bucket
force_destroy = true
}

Expand All @@ -103,8 +103,8 @@ data "aws_iam_policy_document" "s3_access_document" {
}

resource "aws_iam_policy" "s3_access_policy" {
name = "CovalentEKSS3Access"
path = "/"
name = "CovalentEKSS3Access"
path = "/"
policy = data.aws_iam_policy_document.s3_access_document.json
}

Expand Down Expand Up @@ -156,7 +156,7 @@ resource "aws_iam_role_policy_attachment" "worker_node_policy_attachment" {

resource "aws_iam_role_policy_attachment" "worker_node_s3_attachment" {
policy_arn = aws_iam_policy.s3_access_policy.arn
role = aws_iam_role.eks_node_role.name
role = aws_iam_role.eks_node_role.name
}

resource "aws_iam_role_policy_attachment" "cni_policy_attachment" {
Expand Down Expand Up @@ -202,7 +202,7 @@ resource "aws_eks_node_group" "private_node_group" {
node_group_name = "${local.cluster_name}-private-ng"
node_role_arn = aws_iam_role.eks_node_role.arn

subnet_ids = module.vpc.private_subnets
subnet_ids = module.vpc.private_subnets

ami_type = "AL2_x86_64"
capacity_type = "ON_DEMAND"
Expand All @@ -228,19 +228,19 @@ resource "aws_eks_node_group" "private_node_group" {
}

data "template_file" "config" {
template = file("${path.module}/config.tpl")
template = file("${path.module}/templates/config.tpl")
vars = {
certificate_data = aws_eks_cluster.eks_cluster.certificate_authority[0].data
cluster_endpoint = aws_eks_cluster.eks_cluster.endpoint
aws_region = var.aws_region
cluster_name = local.cluster_name
account_id = data.aws_caller_identity.current.account_id
certificate_data = aws_eks_cluster.eks_cluster.certificate_authority[0].data
cluster_endpoint = aws_eks_cluster.eks_cluster.endpoint
aws_region = var.aws_region
cluster_name = local.cluster_name
account_id = data.aws_caller_identity.current.account_id
}
}

resource "local_file" "config" {
content = data.template_file.config.rendered
filename = "${path.module}/${local.cluster_name}_config"
filename = "${local.cluster_name}_config"
}

data "aws_iam_policy_document" "cluster_autoscaler_sts_policy" {
Expand Down
10 changes: 10 additions & 0 deletions covalent_kubernetes_plugin/assets/infra/default.tfvars
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name = "covalent-eks"
aws_region = "us-east-1"
aws_ecr_repo = "covalent-eks-task"
aws_s3_bucket = "covalent-eks-task"
vpc_cidr = "10.0.0.0/16"
instance_types = ["t2.medium"]
disk_size = 8
min_size = 1
max_size = 6
desired_size = 2
13 changes: 8 additions & 5 deletions covalent_kubernetes_plugin/assets/infra/deploy-eks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export KUBECONFIG=`jq -r '.kubeconfig.value' <<< $outputs`
cluster_name=`jq -r '.cluster_name.value' <<< $outputs`
autoscaler_role=`jq -r '.eks_ca_iam_role_arn.value' <<< $outputs`
sed "s|%CLUSTERNAME%|$cluster_name|;s|%ASROLE%|$autoscaler_role|" < cluster_autoscaler.yml > cluster_autoscaler.yml
sed "s|%CLUSTERNAME%|$cluster_name|;s|%ASROLE%|$autoscaler_role|" < templates/cluster_autoscaler.yml > cluster_autoscaler.yml
echo -e "\nEnabling node autoscaler..."
Expand All @@ -64,13 +64,16 @@ kubectl apply -f eks-admin-service-account.yaml
token=`kubectl -n kube-system describe secret $(kubectl -n kube-system get secret |
grep eks-admin | awk '{print $1}')`
echo
echo "Created Kubernetes cluster: $cluster_name"
echo "Please apply the following to your environment:"
echo "export KUBECONFIG=$KUBECONFIG"
echo
echo "You may view your resources using"
echo " > kubectl get nodes"
echo
echo "View the Kubernetes dashboard at http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#!/login"
echo "Token: $token"
echo
echo
echo "Created Kubernetes cluster: $cluster_name"
mv $KUBECONFIG $STATEPATH/$KUBECONFIG
echo
echo "Please apply the following to your environment by typing"
echo "export KUBECONFIG=$STATEPATH/$KUBECONFIG"
83 changes: 27 additions & 56 deletions covalent_kubernetes_plugin/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def run(self, function: callable, args: List, kwargs: Dict, task_metadata: Dict)

# Load Kubernetes config file
app_log.debug("Loading the Kubernetes configuration")
config.load_kube_config(self.k8s_config_file)
config.load_kube_config(config_file=self.k8s_config_file, context=self.k8s_context)

# Validate the context
app_log.debug("Validating the Kubernetes context")
Expand Down Expand Up @@ -219,52 +219,40 @@ def _format_exec_script(
"""

# Execution preamble
exec_script = """
exec_script = f"""
import os
import cloudpickle as pickle
import os
import cloudpickle as pickle
local_func_filename = os.path.join("{docker_working_dir}", "{func_filename}")
local_result_filename = os.path.join("{docker_working_dir}", "{result_filename}")
local_func_filename = os.path.join("{docker_working_dir}", "{func_filename}")
local_result_filename = os.path.join("{docker_working_dir}", "{result_filename}")
""".format(
docker_working_dir=docker_working_dir,
func_filename=func_filename,
result_filename=result_filename,
)
"""

# Pull from data store
if self.data_store.startswith("s3://"):
exec_script += """
exec_script += f"""
import boto3
s3 = boto3.client("s3")
s3.download_file("{s3_bucket_name}", "{func_filename}", local_func_filename)
""".format(
func_filename=func_filename,
s3_bucket_name=self.data_store[5:].split("/")[0],
)
s3.download_file("{self.data_store[5:].split("/")[0]}", "{func_filename}", local_func_filename)
"""

# Extract and execute the task
exec_script += """
with open(local_func_filename, "rb") as f:
function, args, kwargs = pickle.load(f)
result = function(*args, **kwargs)
with open(local_result_filename, "wb") as f:
pickle.dump(result, f)
"""

# Push to data store
if self.data_store.startswith("s3://"):
exec_script += """
s3.upload_file(local_result_filename, "{s3_bucket_name}", "{result_filename}")
""".format(
result_filename=result_filename,
s3_bucket_name=self.data_store[5:].split("/")[0],
)
exec_script += f"""
s3.upload_file(local_result_filename, "{self.data_store[5:].split("/")[0]}", "{result_filename}")
"""

return exec_script

Expand All @@ -282,24 +270,20 @@ def _format_dockerfile(
dockerfile: String object containing a Dockerfile.
"""

dockerfile = """
FROM {base_image}
dockerfile = f"""
FROM {base_image}
RUN pip install --no-cache-dir cloudpickle==2.0.0 boto3==1.24.73
RUN pip install --no-cache-dir cloudpickle==3.0.0 boto3==1.24.73
RUN pip install covalent
RUN pip install covalent>=0.232.0
WORKDIR {docker_working_dir}
WORKDIR {docker_working_dir}
COPY {func_basename} {docker_working_dir}
COPY {os.path.basename(exec_script_filename)} {docker_working_dir}
ENTRYPOINT [ "python" ]
CMD [ "{docker_working_dir}/{func_basename}" ]
""".format(
base_image=base_image,
func_basename=os.path.basename(exec_script_filename),
docker_working_dir=docker_working_dir,
)
ENTRYPOINT [ "python" ]
CMD [ "{docker_working_dir}/{os.path.basename(exec_script_filename)}" ]
"""

return dockerfile

Expand Down Expand Up @@ -331,7 +315,7 @@ def _package_and_upload(
app_log.debug("Beginning package and upload.")
func_filename = f"func-{image_tag}.pkl"

with tempfile.NamedTemporaryFile(dir=self.cache_dir) as function_file:
with tempfile.NamedTemporaryFile(dir=self.cache_dir, delete=False) as function_file:
# Write serialized function to file
pickle.dump((function, args, kwargs), function_file)
function_file.flush()
Expand All @@ -347,10 +331,10 @@ def _package_and_upload(
)

else:
shutil.copyfile(function_file.name, os.path.join(self.data_store, func_filename))
shutil.copyfile(function_file.name, os.path.join(self.cache_dir, func_filename))

with tempfile.NamedTemporaryFile(
dir=self.cache_dir, mode="w"
dir=self.cache_dir, mode="w", delete=False
) as exec_script_file, tempfile.NamedTemporaryFile(
dir=self.cache_dir, mode="w"
) as dockerfile_file:
Expand All @@ -364,12 +348,6 @@ def _package_and_upload(
exec_script_file.write(exec_script)
exec_script_file.flush()

if self.data_store.startswith("/"):
shutil.copyfile(
exec_script_file.name,
os.path.join(self.data_store, exec_script_file.name.split("/")[-1]),
)

# Write Dockerfile to file
dockerfile = self._format_dockerfile(
exec_script_file.name,
Expand Down Expand Up @@ -450,7 +428,6 @@ def _package_and_upload(
raise Exception(proc.stderr.decode("utf-8"))
else:
app_log.debug("Uploading image to ECR.")
app_log.debug(f"the image URI is {image_uri}")
response = docker_client.images.push(image_uri, tag=image_tag)
app_log.debug(f"Response: {response}")

Expand All @@ -473,10 +450,9 @@ def get_status(self, api_client, name: str, namespace: Optional[str] = "default"

if job.status.succeeded:
return "SUCCEEDED"
elif job.status.failed:
if job.status.failed:
return "FAILED"
else:
return "RUNNING"
return "RUNNING"

def _poll_task(self, api_client, name: str, namespace: Optional[str] = "default") -> None:
"""Poll a Kubernetes task until completion.
Expand Down Expand Up @@ -525,11 +501,6 @@ def _query_result(
result_filename,
os.path.join(self.cache_dir, result_filename),
)
else:
shutil.copyfile(
os.path.join(self.data_store, result_filename),
os.path.join(self.cache_dir, result_filename),
)

with open(os.path.join(self.cache_dir, result_filename), "rb") as f:
result = pickle.load(f)
Expand Down
2 changes: 1 addition & 1 deletion hello_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
cluster_name = "" # Corresponds to 'name' in .tfvars
aws_s3_bucket = "" # Corresponds to 'aws_s3_bucket' in .tfvars

k8s_context = f"arn:aws:eks:{region}:{account}:cluster/{cluster_name}"
k8s_context = f"arn:aws:eks:{region}:{account}:cluster/{cluster_name}-cluster"
registry = f"{account}.dkr.ecr.{region}.amazonaws.com"
data_store = f"s3://{aws_s3_bucket}"

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
covalent>=0.232.0
docker>=6.0.0
kubernetes>=24.2.0
docker>=7.0.0
kubernetes>=29.0.0
31 changes: 31 additions & 0 deletions sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import covalent as ct

from covalent_kubernetes_plugin.k8s import KubernetesExecutor

local_k8s_executor = KubernetesExecutor(
k8s_context="minikube", vcpu="100m", memory="500Mi", data_store="/tmp"
)


# Run on a local cluster
@ct.electron(executor=local_k8s_executor)
def join_words(a, b):
return ", ".join([a, b])


# Run on the cloud
@ct.electron(executor=local_k8s_executor)
def excitement(a):
return f"{a}!"


# Construct a workflow
@ct.lattice
def simple_workflow(a, b):
phrase = join_words(a, b)
return excitement(phrase)


# Dispatch the workflow
dispatch_id = ct.dispatch(simple_workflow)("Hello", "World")
print(dispatch_id)

0 comments on commit c823b81

Please sign in to comment.