Skip to content

Commit

Permalink
Improve IR for job nodes (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx authored Oct 31, 2024
1 parent 10568b0 commit c70df5e
Show file tree
Hide file tree
Showing 54 changed files with 491 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ package com.databricks.labs.remorph.intermediate.workflows

import com.databricks.labs.remorph.intermediate.TreeNode

abstract class JobNode extends TreeNode[JobNode]
abstract class JobNode extends TreeNode[JobNode]

abstract class LeafJobNode extends JobNode {
override def children: Seq[JobNode] = Seq()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.remorph.intermediate.workflows.clusters

import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.labs.remorph.intermediate.workflows.libraries.Library
import com.databricks.sdk.service.jobs
Expand All @@ -11,8 +12,9 @@ case class ClusterSpec(
newCluster: Option[NewClusterSpec] = None)
extends JobNode {
override def children: Seq[JobNode] = Seq() ++ libraries ++ newCluster
def toSDK: jobs.ClusterSpec = {
val raw = new jobs.ClusterSpec()
raw
}
def toSDK: jobs.ClusterSpec = new jobs.ClusterSpec()
.setExistingClusterId(existingClusterId.orNull)
.setJobClusterKey(jobClusterKey.orNull)
.setLibraries(libraries.map(_.toSDK).asJava)
.setNewCluster(newCluster.map(_.toSDK).orNull)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.remorph.intermediate.workflows.clusters

import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.libraries.DockerImage
import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.sdk.service.compute
Expand All @@ -8,12 +9,12 @@ import com.databricks.sdk.service.compute.{DataSecurityMode, RuntimeEngine}
case class NewClusterSpec(
applyPolicyDefaultValues: Boolean = false,
autoscale: Option[AutoScale] = None,
autoterminationMinutes: Option[Int] = None,
autoterminationMinutes: Option[Long] = None,
awsAttributes: Option[AwsAttributes] = None,
azureAttributes: Option[AzureAttributes] = None,
clusterLogConf: Option[ClusterLogConf] = None,
clusterName: Option[String] = None,
customTags: Option[Map[String, String]] = None,
customTags: Map[String, String] = Map.empty,
dataSecurityMode: Option[DataSecurityMode] = None,
dockerImage: Option[DockerImage] = None,
driverInstancePoolId: Option[String] = None,
Expand All @@ -24,20 +25,44 @@ case class NewClusterSpec(
initScripts: Seq[InitScriptInfo] = Seq.empty,
instancePoolId: Option[String] = None,
nodeTypeId: Option[String] = None,
numWorkers: Option[Int] = None,
numWorkers: Option[Long] = None,
policyId: Option[String] = None,
runtimeEngine: Option[RuntimeEngine] = None,
singleUserName: Option[String] = None,
sparkConf: Option[Map[String, String]] = None,
sparkEnvVars: Option[Map[String, String]] = None,
sparkConf: Map[String, String] = Map.empty,
sparkEnvVars: Map[String, String] = Map.empty,
sparkVersion: Option[String] = None,
sshPublicKeys: Seq[String] = Seq.empty,
workloadType: Option[WorkloadType] = None)
extends JobNode {
override def children: Seq[JobNode] = (Seq() ++ autoscale ++ awsAttributes ++ azureAttributes ++
clusterLogConf ++ gcpAttributes ++ workloadType)
def toSDK: compute.ClusterSpec = {
val raw = new compute.ClusterSpec()
raw
}
override def children: Seq[JobNode] = Seq() ++ autoscale ++ awsAttributes ++ azureAttributes ++
clusterLogConf ++ gcpAttributes ++ workloadType ++ dockerImage ++ initScripts
def toSDK: compute.ClusterSpec = new compute.ClusterSpec()
.setApplyPolicyDefaultValues(applyPolicyDefaultValues)
.setAutoscale(autoscale.map(_.toSDK).orNull)
// .setAutoterminationMinutes(autoterminationMinutes.getOrElse(null))
.setAwsAttributes(awsAttributes.map(_.toSDK).orNull)
.setAzureAttributes(azureAttributes.map(_.toSDK).orNull)
.setGcpAttributes(gcpAttributes.map(_.toSDK).orNull)
.setClusterLogConf(clusterLogConf.map(_.toSDK).orNull)
.setClusterName(clusterName.orNull)
.setCustomTags(customTags.asJava)
.setDataSecurityMode(dataSecurityMode.orNull)
.setDockerImage(dockerImage.map(_.toSDK).orNull)
.setInstancePoolId(instancePoolId.orNull)
.setDriverInstancePoolId(driverInstancePoolId.orNull)
.setDriverNodeTypeId(driverNodeTypeId.orNull)
.setEnableElasticDisk(enableElasticDisk)
.setEnableLocalDiskEncryption(enableLocalDiskEncryption)
.setInitScripts(initScripts.map(_.toSDK).asJava)
.setNodeTypeId(nodeTypeId.orNull)
// .setNumWorkers(numWorkers.orNull)
.setPolicyId(policyId.orNull)
.setRuntimeEngine(runtimeEngine.orNull)
.setSingleUserName(singleUserName.orNull)
.setSparkConf(sparkConf.asJava)
.setSparkEnvVars(sparkEnvVars.asJava)
.setSparkVersion(sparkVersion.orNull)
.setSshPublicKeys(sshPublicKeys.asJava)
.setWorkloadType(workloadType.map(_.toSDK).orNull)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.databricks.labs.remorph.intermediate.workflows.jobs

import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.labs.remorph.intermediate.workflows.clusters.ClusterSpec
import com.databricks.labs.remorph.intermediate.workflows.clusters.NewClusterSpec
import com.databricks.sdk.service.jobs

case class JobCluster(jobClusterKey: String, newCluster: ClusterSpec) extends JobNode {
case class JobCluster(jobClusterKey: String, newCluster: NewClusterSpec) extends JobNode {
override def children: Seq[JobNode] = Seq(newCluster)
def toSDK: jobs.JobCluster = {
val raw = new jobs.JobCluster()
raw
}
def toSDK: jobs.JobCluster = new jobs.JobCluster()
.setJobClusterKey(jobClusterKey)
.setNewCluster(newCluster.toSDK)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.remorph.intermediate.workflows.jobs

import com.databricks.labs.remorph.intermediate.workflows.JobNode
import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode
import com.databricks.sdk.service.jobs

case class JobEmailNotifications(
Expand All @@ -10,10 +11,12 @@ case class JobEmailNotifications(
onStart: Seq[String] = Seq.empty,
onStreamingBacklogExceeded: Seq[String] = Seq.empty,
onSuccess: Seq[String] = Seq.empty)
extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: jobs.JobEmailNotifications = {
val raw = new jobs.JobEmailNotifications()
raw
}
extends LeafJobNode {
def toSDK: jobs.JobEmailNotifications = new jobs.JobEmailNotifications()
.setNoAlertForSkippedRuns(noAlertForSkippedRuns)
.setOnDurationWarningThresholdExceeded(onDurationWarningThresholdExceeded.asJava)
.setOnFailure(onFailure.asJava)
.setOnStart(onStart.asJava)
.setOnStreamingBacklogExceeded(onStreamingBacklogExceeded.asJava)
.setOnSuccess(onSuccess.asJava)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import com.databricks.sdk.service.jobs

case class JobEnvironment(environmentKey: String, spec: Option[Environment] = None) extends JobNode {
override def children: Seq[JobNode] = Seq() ++ spec
def toSDK: jobs.JobEnvironment = {
val raw = new jobs.JobEnvironment()
raw
}
def toSDK: jobs.JobEnvironment = new jobs.JobEnvironment()
.setEnvironmentKey(environmentKey)
.setSpec(spec.map(_.toSDK).orNull)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import com.databricks.sdk.service.jobs
case class JobNotificationSettings(noAlertForCanceledRuns: Boolean = false, noAlertForSkippedRuns: Boolean)
extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: jobs.JobNotificationSettings = {
val raw = new jobs.JobNotificationSettings()
raw
}
def toSDK: jobs.JobNotificationSettings = new jobs.JobNotificationSettings()
.setNoAlertForCanceledRuns(noAlertForCanceledRuns)
.setNoAlertForSkippedRuns(noAlertForSkippedRuns)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,5 @@ import com.databricks.sdk.service.jobs

case class JobParameterDefinition(name: String, default: String) extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: jobs.JobParameterDefinition = {
val raw = new jobs.JobParameterDefinition()
raw
}
def toSDK: jobs.JobParameterDefinition = new jobs.JobParameterDefinition().setName(name).setDefault(default)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import com.databricks.sdk.service.jobs

case class JobRunAs(servicePrincipalName: Option[String], userName: Option[String] = None) extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: jobs.JobRunAs = {
val raw = new jobs.JobRunAs()
raw
}
def toSDK: jobs.JobRunAs = new jobs.JobRunAs()
.setServicePrincipalName(servicePrincipalName.orNull)
.setUserName(userName.orNull)
}
Original file line number Diff line number Diff line change
@@ -1,41 +1,71 @@
package com.databricks.labs.remorph.intermediate.workflows.jobs

import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.schedules.{Continuous, CronSchedule, TriggerSettings}
import com.databricks.labs.remorph.intermediate.workflows.sources.GitSource
import com.databricks.labs.remorph.intermediate.workflows.tasks.Task
import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.labs.remorph.intermediate.workflows.webhooks.WebhookNotifications
import com.databricks.sdk.service.jobs
import com.databricks.sdk.service.jobs.{Format, JobEditMode}

case class JobSettings(
budgetPolicyId: Option[String] = None,
continuous: Option[Continuous] = None,
deployment: Option[JobDeployment] = None,
name: String,
tasks: Seq[Task],
tags: Map[String, String] = Map.empty,
description: Option[String] = None,
editMode: Option[JobEditMode] = None,
emailNotifications: Option[JobEmailNotifications] = None,
environments: Seq[JobEnvironment] = Seq.empty,
format: Option[Format] = None,
gitSource: Option[GitSource] = None,
health: Option[JobsHealthRules] = None,
jobClusters: Seq[JobCluster] = Seq.empty,
maxConcurrentRuns: Option[Int] = None,
name: Option[String] = None,
notificationSettings: Option[JobNotificationSettings] = None,
parameters: Seq[JobParameterDefinition] = Seq.empty,
runAs: Option[JobRunAs] = None,
jobClusters: Seq[JobCluster] = Seq.empty,
continuous: Option[Continuous] = None,
schedule: Option[CronSchedule] = None,
tags: Option[Map[String, String]] = None,
tasks: Seq[Task] = Seq.empty,
timeoutSeconds: Option[Int] = None,
trigger: Option[TriggerSettings] = None,
environments: Seq[JobEnvironment] = Seq.empty,
health: Option[JobsHealthRules] = None,
timeoutSeconds: Option[Long] = None,
maxConcurrentRuns: Option[Long] = None,
runAs: Option[JobRunAs] = None,
emailNotifications: Option[JobEmailNotifications] = None,
notificationSettings: Option[JobNotificationSettings] = None,
webhookNotifications: Option[WebhookNotifications] = None)
extends JobNode {
override def children: Seq[JobNode] = Seq() ++ continuous ++ deployment ++ emailNotifications ++
gitSource ++ health ++ notificationSettings ++ runAs ++ schedule ++ trigger ++ webhookNotifications
def toSDK: jobs.JobSettings = {
val raw = new jobs.JobSettings()
raw
}
override def children: Seq[JobNode] = Seq() ++ continuous ++ emailNotifications ++
health ++ notificationSettings ++ runAs ++ schedule ++ trigger ++ webhookNotifications

def toUpdate: jobs.JobSettings = new jobs.JobSettings()
.setContinuous(continuous.map(_.toSDK).orNull)
.setDescription(description.orNull)
.setEmailNotifications(emailNotifications.map(_.toSDK).orNull)
.setEnvironments(environments.map(_.toSDK).asJava)
.setFormat(jobs.Format.MULTI_TASK)
.setHealth(health.map(_.toSDK).orNull)
.setJobClusters(jobClusters.map(_.toSDK).asJava)
// .setMaxConcurrentRuns(maxConcurrentRuns.orNull)
.setName(name)
.setNotificationSettings(notificationSettings.map(_.toSDK).orNull)
.setParameters(parameters.map(_.toSDK).asJava)
.setRunAs(runAs.map(_.toSDK).orNull)
.setSchedule(schedule.map(_.toSDK).orNull)
.setTags(tags.asJava)
.setTasks(tasks.map(_.toSDK).asJava)
// .setTimeoutSeconds(timeoutSeconds.orNull)
.setTrigger(trigger.map(_.toSDK).orNull)
.setWebhookNotifications(webhookNotifications.map(_.toSDK).orNull)

def toCreate: jobs.CreateJob = new jobs.CreateJob()
.setContinuous(continuous.map(_.toSDK).orNull)
.setDescription(description.orNull)
.setEmailNotifications(emailNotifications.map(_.toSDK).orNull)
.setEnvironments(environments.map(_.toSDK).asJava)
.setFormat(jobs.Format.MULTI_TASK)
.setHealth(health.map(_.toSDK).orNull)
.setJobClusters(jobClusters.map(_.toSDK).asJava)
// .setMaxConcurrentRuns(maxConcurrentRuns.orNull)
.setName(name)
.setNotificationSettings(notificationSettings.map(_.toSDK).orNull)
.setParameters(parameters.map(_.toSDK).asJava)
.setRunAs(runAs.map(_.toSDK).orNull)
.setSchedule(schedule.map(_.toSDK).orNull)
.setTags(tags.asJava)
.setTasks(tasks.map(_.toSDK).asJava)
// .setTimeoutSeconds(timeoutSeconds.orNull)
.setTrigger(trigger.map(_.toSDK).orNull)
.setWebhookNotifications(webhookNotifications.map(_.toSDK).orNull)
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.databricks.labs.remorph.intermediate.workflows.jobs

import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode
import com.databricks.sdk.service.jobs
import com.databricks.sdk.service.jobs.{JobsHealthMetric, JobsHealthOperator}

case class JobsHealthRule(metric: JobsHealthMetric, op: JobsHealthOperator, value: Int) extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: jobs.JobsHealthRule = {
val raw = new jobs.JobsHealthRule()
raw
}
case class JobsHealthRule(metric: JobsHealthMetric, op: JobsHealthOperator, value: Int) extends LeafJobNode {
def toSDK: jobs.JobsHealthRule = new jobs.JobsHealthRule().setMetric(metric).setOp(op).setValue(value)
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.databricks.labs.remorph.intermediate.workflows.jobs

import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.JobNode
import com.databricks.sdk.service.jobs

case class JobsHealthRules(rules: Seq[JobsHealthRule] = Seq.empty) extends JobNode {
case class JobsHealthRules(rules: Seq[JobsHealthRule]) extends JobNode {
override def children: Seq[JobNode] = rules
def toSDK: jobs.JobsHealthRules = {
val raw = new jobs.JobsHealthRules()
raw
}
def toSDK: jobs.JobsHealthRules = new jobs.JobsHealthRules().setRules(rules.map(_.toSDK).asJava)
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.databricks.labs.remorph.intermediate.workflows.libraries

import com.databricks.labs.remorph.intermediate.workflows.JobNode
import scala.collection.JavaConverters._
import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode
import com.databricks.sdk.service.compute

case class Environment(client: String, dependencies: Seq[String] = Seq.empty) extends JobNode {
override def children: Seq[JobNode] = Seq()
def toSDK: compute.Environment = {
val raw = new compute.Environment()
raw
}
case class Environment(client: String, dependencies: Seq[String] = Seq.empty) extends LeafJobNode {
def toSDK: compute.Environment = new compute.Environment().setClient(client).setDependencies(dependencies.asJava)
}
Loading

0 comments on commit c70df5e

Please sign in to comment.