From c70df5e5bf8cac8bc7e170a8b428b889e25ae766 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 31 Oct 2024 18:39:17 +0100 Subject: [PATCH] Improve IR for job nodes (#1041) --- .../intermediate/workflows/JobNode.scala | 6 +- .../workflows/clusters/ClusterSpec.scala | 10 +- .../workflows/clusters/NewClusterSpec.scala | 47 +++++++--- .../intermediate/workflows/jobs/Job.scala | 19 ---- .../workflows/jobs/JobCluster.scala | 11 +-- .../workflows/jobs/JobDeployment.scala | 13 --- .../jobs/JobEmailNotifications.scala | 17 ++-- .../workflows/jobs/JobEnvironment.scala | 7 +- .../jobs/JobNotificationSettings.scala | 7 +- .../jobs/JobParameterDefinition.scala | 5 +- .../workflows/jobs/JobRunAs.scala | 7 +- .../workflows/jobs/JobSettings.scala | 80 +++++++++++----- .../workflows/jobs/JobsHealthRule.scala | 10 +- .../workflows/jobs/JobsHealthRules.scala | 8 +- .../workflows/libraries/Environment.scala | 11 +-- .../workflows/libraries/Library.scala | 16 +++- .../workflows/libraries/MavenLibrary.scala | 14 +-- .../libraries/PythonPyPiLibrary.scala | 10 +- .../workflows/libraries/RCranLibrary.scala | 10 +- .../workflows/schedules/Continuous.scala | 10 +- .../workflows/schedules/CronSchedule.scala | 8 +- .../workflows/schedules/TriggerSettings.scala | 10 +- .../workflows/sources/GitSnapshot.scala | 12 --- .../workflows/sources/GitSource.scala | 22 ----- .../workflows/sql/SqlTaskAlert.scala | 14 +-- .../workflows/sql/SqlTaskDashboard.scala | 10 +- .../workflows/sql/SqlTaskFile.scala | 12 +-- .../workflows/sql/SqlTaskQuery.scala | 10 +- .../workflows/sql/SqlTaskSubscription.scala | 12 +-- .../workflows/tasks/CodeAsset.scala | 3 + .../workflows/tasks/ConditionTask.scala | 13 ++- .../workflows/tasks/DbtTask.scala | 19 ++-- .../workflows/tasks/ForEachTask.scala | 10 +- .../workflows/tasks/NeedsWarehouse.scala | 5 + .../workflows/tasks/NotebookTask.scala | 15 +-- .../workflows/tasks/PipelineTask.scala | 12 +-- .../workflows/tasks/PythonWheelTask.scala | 16 ++-- .../workflows/tasks/RunConditionTask.scala | 14 +-- .../workflows/tasks/RunJobTask.scala | 31 +++++++ .../workflows/tasks/SparkJarTask.scala | 14 +-- .../workflows/tasks/SparkPythonTask.scala | 15 +-- .../workflows/tasks/SparkSubmitTask.scala | 11 +-- .../workflows/tasks/SqlTask.scala | 15 ++- .../workflows/tasks/SubmitTask.scala | 40 -------- .../intermediate/workflows/tasks/Task.scala | 91 ++++++++++++++----- .../workflows/tasks/TaskDependency.scala | 10 +- .../tasks/TaskEmailNotifications.scala | 17 ++-- .../intermediate/PythonNotebookTask.scala | 13 +++ .../intermediate/RunNotebookJobTask.scala | 10 ++ .../tasks/intermediate/SqlNotebookTask.scala | 14 +++ .../intermediate/SqlWorkspaceFileTask.scala | 23 +++++ .../tasks/intermediate/ToNotebookTask.scala | 14 +++ .../workflows/tasks/intermediate/ToTask.scala | 7 ++ .../webhooks/WebhookNotifications.scala | 11 ++- 54 files changed, 491 insertions(+), 380 deletions(-) delete mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/Job.scala delete mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobDeployment.scala delete mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSnapshot.scala delete mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSource.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/CodeAsset.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NeedsWarehouse.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunJobTask.scala delete mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SubmitTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/PythonNotebookTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/RunNotebookJobTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlNotebookTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlWorkspaceFileTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToNotebookTask.scala create mode 100644 core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToTask.scala diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/JobNode.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/JobNode.scala index 3f198edc6..69d8e2c9d 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/JobNode.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/JobNode.scala @@ -2,4 +2,8 @@ package com.databricks.labs.remorph.intermediate.workflows import com.databricks.labs.remorph.intermediate.TreeNode -abstract class JobNode extends TreeNode[JobNode] \ No newline at end of file +abstract class JobNode extends TreeNode[JobNode] + +abstract class LeafJobNode extends JobNode { + override def children: Seq[JobNode] = Seq() +} \ No newline at end of file diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/ClusterSpec.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/ClusterSpec.scala index 0a8326729..b70c90975 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/ClusterSpec.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/ClusterSpec.scala @@ -1,5 +1,6 @@ package com.databricks.labs.remorph.intermediate.workflows.clusters +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.labs.remorph.intermediate.workflows.libraries.Library import com.databricks.sdk.service.jobs @@ -11,8 +12,9 @@ case class ClusterSpec( newCluster: Option[NewClusterSpec] = None) extends JobNode { override def children: Seq[JobNode] = Seq() ++ libraries ++ newCluster - def toSDK: jobs.ClusterSpec = { - val raw = new jobs.ClusterSpec() - raw - } + def toSDK: jobs.ClusterSpec = new jobs.ClusterSpec() + .setExistingClusterId(existingClusterId.orNull) + .setJobClusterKey(jobClusterKey.orNull) + .setLibraries(libraries.map(_.toSDK).asJava) + .setNewCluster(newCluster.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/NewClusterSpec.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/NewClusterSpec.scala index 31996602f..d45437455 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/NewClusterSpec.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/clusters/NewClusterSpec.scala @@ -1,5 +1,6 @@ package com.databricks.labs.remorph.intermediate.workflows.clusters +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.libraries.DockerImage import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.sdk.service.compute @@ -8,12 +9,12 @@ import com.databricks.sdk.service.compute.{DataSecurityMode, RuntimeEngine} case class NewClusterSpec( applyPolicyDefaultValues: Boolean = false, autoscale: Option[AutoScale] = None, - autoterminationMinutes: Option[Int] = None, + autoterminationMinutes: Option[Long] = None, awsAttributes: Option[AwsAttributes] = None, azureAttributes: Option[AzureAttributes] = None, clusterLogConf: Option[ClusterLogConf] = None, clusterName: Option[String] = None, - customTags: Option[Map[String, String]] = None, + customTags: Map[String, String] = Map.empty, dataSecurityMode: Option[DataSecurityMode] = None, dockerImage: Option[DockerImage] = None, driverInstancePoolId: Option[String] = None, @@ -24,20 +25,44 @@ case class NewClusterSpec( initScripts: Seq[InitScriptInfo] = Seq.empty, instancePoolId: Option[String] = None, nodeTypeId: Option[String] = None, - numWorkers: Option[Int] = None, + numWorkers: Option[Long] = None, policyId: Option[String] = None, runtimeEngine: Option[RuntimeEngine] = None, singleUserName: Option[String] = None, - sparkConf: Option[Map[String, String]] = None, - sparkEnvVars: Option[Map[String, String]] = None, + sparkConf: Map[String, String] = Map.empty, + sparkEnvVars: Map[String, String] = Map.empty, sparkVersion: Option[String] = None, sshPublicKeys: Seq[String] = Seq.empty, workloadType: Option[WorkloadType] = None) extends JobNode { - override def children: Seq[JobNode] = (Seq() ++ autoscale ++ awsAttributes ++ azureAttributes ++ - clusterLogConf ++ gcpAttributes ++ workloadType) - def toSDK: compute.ClusterSpec = { - val raw = new compute.ClusterSpec() - raw - } + override def children: Seq[JobNode] = Seq() ++ autoscale ++ awsAttributes ++ azureAttributes ++ + clusterLogConf ++ gcpAttributes ++ workloadType ++ dockerImage ++ initScripts + def toSDK: compute.ClusterSpec = new compute.ClusterSpec() + .setApplyPolicyDefaultValues(applyPolicyDefaultValues) + .setAutoscale(autoscale.map(_.toSDK).orNull) + // .setAutoterminationMinutes(autoterminationMinutes.getOrElse(null)) + .setAwsAttributes(awsAttributes.map(_.toSDK).orNull) + .setAzureAttributes(azureAttributes.map(_.toSDK).orNull) + .setGcpAttributes(gcpAttributes.map(_.toSDK).orNull) + .setClusterLogConf(clusterLogConf.map(_.toSDK).orNull) + .setClusterName(clusterName.orNull) + .setCustomTags(customTags.asJava) + .setDataSecurityMode(dataSecurityMode.orNull) + .setDockerImage(dockerImage.map(_.toSDK).orNull) + .setInstancePoolId(instancePoolId.orNull) + .setDriverInstancePoolId(driverInstancePoolId.orNull) + .setDriverNodeTypeId(driverNodeTypeId.orNull) + .setEnableElasticDisk(enableElasticDisk) + .setEnableLocalDiskEncryption(enableLocalDiskEncryption) + .setInitScripts(initScripts.map(_.toSDK).asJava) + .setNodeTypeId(nodeTypeId.orNull) + // .setNumWorkers(numWorkers.orNull) + .setPolicyId(policyId.orNull) + .setRuntimeEngine(runtimeEngine.orNull) + .setSingleUserName(singleUserName.orNull) + .setSparkConf(sparkConf.asJava) + .setSparkEnvVars(sparkEnvVars.asJava) + .setSparkVersion(sparkVersion.orNull) + .setSshPublicKeys(sshPublicKeys.asJava) + .setWorkloadType(workloadType.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/Job.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/Job.scala deleted file mode 100644 index 78eccec7f..000000000 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/Job.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.databricks.labs.remorph.intermediate.workflows.jobs - -import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.sdk.service.jobs - -case class Job( - createdTime: Option[Int] = None, - creatorUserName: Option[String] = None, - effectiveBudgetPolicyId: Option[String] = None, - jobId: Option[Int] = None, - runAsUserName: Option[String] = None, - settings: Option[JobSettings] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() ++ settings - def toSDK: jobs.Job = { - val raw = new jobs.Job() - raw - } -} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobCluster.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobCluster.scala index 9ad2e59f7..39a551f4d 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobCluster.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobCluster.scala @@ -1,13 +1,12 @@ package com.databricks.labs.remorph.intermediate.workflows.jobs import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.labs.remorph.intermediate.workflows.clusters.ClusterSpec +import com.databricks.labs.remorph.intermediate.workflows.clusters.NewClusterSpec import com.databricks.sdk.service.jobs -case class JobCluster(jobClusterKey: String, newCluster: ClusterSpec) extends JobNode { +case class JobCluster(jobClusterKey: String, newCluster: NewClusterSpec) extends JobNode { override def children: Seq[JobNode] = Seq(newCluster) - def toSDK: jobs.JobCluster = { - val raw = new jobs.JobCluster() - raw - } + def toSDK: jobs.JobCluster = new jobs.JobCluster() + .setJobClusterKey(jobClusterKey) + .setNewCluster(newCluster.toSDK) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobDeployment.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobDeployment.scala deleted file mode 100644 index 03562a98e..000000000 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobDeployment.scala +++ /dev/null @@ -1,13 +0,0 @@ -package com.databricks.labs.remorph.intermediate.workflows.jobs - -import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.sdk.service.jobs -import com.databricks.sdk.service.jobs.JobDeploymentKind - -case class JobDeployment(kind: JobDeploymentKind, metadataFilePath: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobDeployment = { - val raw = new jobs.JobDeployment() - raw - } -} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEmailNotifications.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEmailNotifications.scala index 93fb053d5..d488b9e2e 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEmailNotifications.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEmailNotifications.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.jobs -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs case class JobEmailNotifications( @@ -10,10 +11,12 @@ case class JobEmailNotifications( onStart: Seq[String] = Seq.empty, onStreamingBacklogExceeded: Seq[String] = Seq.empty, onSuccess: Seq[String] = Seq.empty) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobEmailNotifications = { - val raw = new jobs.JobEmailNotifications() - raw - } + extends LeafJobNode { + def toSDK: jobs.JobEmailNotifications = new jobs.JobEmailNotifications() + .setNoAlertForSkippedRuns(noAlertForSkippedRuns) + .setOnDurationWarningThresholdExceeded(onDurationWarningThresholdExceeded.asJava) + .setOnFailure(onFailure.asJava) + .setOnStart(onStart.asJava) + .setOnStreamingBacklogExceeded(onStreamingBacklogExceeded.asJava) + .setOnSuccess(onSuccess.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEnvironment.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEnvironment.scala index 10a2af133..58e6096e1 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEnvironment.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobEnvironment.scala @@ -6,8 +6,7 @@ import com.databricks.sdk.service.jobs case class JobEnvironment(environmentKey: String, spec: Option[Environment] = None) extends JobNode { override def children: Seq[JobNode] = Seq() ++ spec - def toSDK: jobs.JobEnvironment = { - val raw = new jobs.JobEnvironment() - raw - } + def toSDK: jobs.JobEnvironment = new jobs.JobEnvironment() + .setEnvironmentKey(environmentKey) + .setSpec(spec.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobNotificationSettings.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobNotificationSettings.scala index 8f8e5a5fd..55c610c7e 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobNotificationSettings.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobNotificationSettings.scala @@ -6,8 +6,7 @@ import com.databricks.sdk.service.jobs case class JobNotificationSettings(noAlertForCanceledRuns: Boolean = false, noAlertForSkippedRuns: Boolean) extends JobNode { override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobNotificationSettings = { - val raw = new jobs.JobNotificationSettings() - raw - } + def toSDK: jobs.JobNotificationSettings = new jobs.JobNotificationSettings() + .setNoAlertForCanceledRuns(noAlertForCanceledRuns) + .setNoAlertForSkippedRuns(noAlertForSkippedRuns) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobParameterDefinition.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobParameterDefinition.scala index 7b47ad9b2..650dc911c 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobParameterDefinition.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobParameterDefinition.scala @@ -5,8 +5,5 @@ import com.databricks.sdk.service.jobs case class JobParameterDefinition(name: String, default: String) extends JobNode { override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobParameterDefinition = { - val raw = new jobs.JobParameterDefinition() - raw - } + def toSDK: jobs.JobParameterDefinition = new jobs.JobParameterDefinition().setName(name).setDefault(default) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobRunAs.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobRunAs.scala index e387fd8a4..2cd004e7e 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobRunAs.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobRunAs.scala @@ -5,8 +5,7 @@ import com.databricks.sdk.service.jobs case class JobRunAs(servicePrincipalName: Option[String], userName: Option[String] = None) extends JobNode { override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobRunAs = { - val raw = new jobs.JobRunAs() - raw - } + def toSDK: jobs.JobRunAs = new jobs.JobRunAs() + .setServicePrincipalName(servicePrincipalName.orNull) + .setUserName(userName.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobSettings.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobSettings.scala index 2ee641a40..d0e908feb 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobSettings.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobSettings.scala @@ -1,41 +1,71 @@ package com.databricks.labs.remorph.intermediate.workflows.jobs +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.schedules.{Continuous, CronSchedule, TriggerSettings} -import com.databricks.labs.remorph.intermediate.workflows.sources.GitSource import com.databricks.labs.remorph.intermediate.workflows.tasks.Task import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.labs.remorph.intermediate.workflows.webhooks.WebhookNotifications import com.databricks.sdk.service.jobs -import com.databricks.sdk.service.jobs.{Format, JobEditMode} case class JobSettings( - budgetPolicyId: Option[String] = None, - continuous: Option[Continuous] = None, - deployment: Option[JobDeployment] = None, + name: String, + tasks: Seq[Task], + tags: Map[String, String] = Map.empty, description: Option[String] = None, - editMode: Option[JobEditMode] = None, - emailNotifications: Option[JobEmailNotifications] = None, - environments: Seq[JobEnvironment] = Seq.empty, - format: Option[Format] = None, - gitSource: Option[GitSource] = None, - health: Option[JobsHealthRules] = None, - jobClusters: Seq[JobCluster] = Seq.empty, - maxConcurrentRuns: Option[Int] = None, - name: Option[String] = None, - notificationSettings: Option[JobNotificationSettings] = None, parameters: Seq[JobParameterDefinition] = Seq.empty, - runAs: Option[JobRunAs] = None, + jobClusters: Seq[JobCluster] = Seq.empty, + continuous: Option[Continuous] = None, schedule: Option[CronSchedule] = None, - tags: Option[Map[String, String]] = None, - tasks: Seq[Task] = Seq.empty, - timeoutSeconds: Option[Int] = None, trigger: Option[TriggerSettings] = None, + environments: Seq[JobEnvironment] = Seq.empty, + health: Option[JobsHealthRules] = None, + timeoutSeconds: Option[Long] = None, + maxConcurrentRuns: Option[Long] = None, + runAs: Option[JobRunAs] = None, + emailNotifications: Option[JobEmailNotifications] = None, + notificationSettings: Option[JobNotificationSettings] = None, webhookNotifications: Option[WebhookNotifications] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() ++ continuous ++ deployment ++ emailNotifications ++ - gitSource ++ health ++ notificationSettings ++ runAs ++ schedule ++ trigger ++ webhookNotifications - def toSDK: jobs.JobSettings = { - val raw = new jobs.JobSettings() - raw - } + override def children: Seq[JobNode] = Seq() ++ continuous ++ emailNotifications ++ + health ++ notificationSettings ++ runAs ++ schedule ++ trigger ++ webhookNotifications + + def toUpdate: jobs.JobSettings = new jobs.JobSettings() + .setContinuous(continuous.map(_.toSDK).orNull) + .setDescription(description.orNull) + .setEmailNotifications(emailNotifications.map(_.toSDK).orNull) + .setEnvironments(environments.map(_.toSDK).asJava) + .setFormat(jobs.Format.MULTI_TASK) + .setHealth(health.map(_.toSDK).orNull) + .setJobClusters(jobClusters.map(_.toSDK).asJava) + // .setMaxConcurrentRuns(maxConcurrentRuns.orNull) + .setName(name) + .setNotificationSettings(notificationSettings.map(_.toSDK).orNull) + .setParameters(parameters.map(_.toSDK).asJava) + .setRunAs(runAs.map(_.toSDK).orNull) + .setSchedule(schedule.map(_.toSDK).orNull) + .setTags(tags.asJava) + .setTasks(tasks.map(_.toSDK).asJava) + // .setTimeoutSeconds(timeoutSeconds.orNull) + .setTrigger(trigger.map(_.toSDK).orNull) + .setWebhookNotifications(webhookNotifications.map(_.toSDK).orNull) + + def toCreate: jobs.CreateJob = new jobs.CreateJob() + .setContinuous(continuous.map(_.toSDK).orNull) + .setDescription(description.orNull) + .setEmailNotifications(emailNotifications.map(_.toSDK).orNull) + .setEnvironments(environments.map(_.toSDK).asJava) + .setFormat(jobs.Format.MULTI_TASK) + .setHealth(health.map(_.toSDK).orNull) + .setJobClusters(jobClusters.map(_.toSDK).asJava) + // .setMaxConcurrentRuns(maxConcurrentRuns.orNull) + .setName(name) + .setNotificationSettings(notificationSettings.map(_.toSDK).orNull) + .setParameters(parameters.map(_.toSDK).asJava) + .setRunAs(runAs.map(_.toSDK).orNull) + .setSchedule(schedule.map(_.toSDK).orNull) + .setTags(tags.asJava) + .setTasks(tasks.map(_.toSDK).asJava) + // .setTimeoutSeconds(timeoutSeconds.orNull) + .setTrigger(trigger.map(_.toSDK).orNull) + .setWebhookNotifications(webhookNotifications.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRule.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRule.scala index 2585f0da0..db23add0e 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRule.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRule.scala @@ -1,13 +1,9 @@ package com.databricks.labs.remorph.intermediate.workflows.jobs -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.{JobsHealthMetric, JobsHealthOperator} -case class JobsHealthRule(metric: JobsHealthMetric, op: JobsHealthOperator, value: Int) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.JobsHealthRule = { - val raw = new jobs.JobsHealthRule() - raw - } +case class JobsHealthRule(metric: JobsHealthMetric, op: JobsHealthOperator, value: Int) extends LeafJobNode { + def toSDK: jobs.JobsHealthRule = new jobs.JobsHealthRule().setMetric(metric).setOp(op).setValue(value) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRules.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRules.scala index 410c6ed0b..0744f865f 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRules.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/jobs/JobsHealthRules.scala @@ -1,12 +1,10 @@ package com.databricks.labs.remorph.intermediate.workflows.jobs +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.sdk.service.jobs -case class JobsHealthRules(rules: Seq[JobsHealthRule] = Seq.empty) extends JobNode { +case class JobsHealthRules(rules: Seq[JobsHealthRule]) extends JobNode { override def children: Seq[JobNode] = rules - def toSDK: jobs.JobsHealthRules = { - val raw = new jobs.JobsHealthRules() - raw - } + def toSDK: jobs.JobsHealthRules = new jobs.JobsHealthRules().setRules(rules.map(_.toSDK).asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Environment.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Environment.scala index 050d8c2ef..177e05600 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Environment.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Environment.scala @@ -1,12 +1,9 @@ package com.databricks.labs.remorph.intermediate.workflows.libraries -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.compute -case class Environment(client: String, dependencies: Seq[String] = Seq.empty) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: compute.Environment = { - val raw = new compute.Environment() - raw - } +case class Environment(client: String, dependencies: Seq[String] = Seq.empty) extends LeafJobNode { + def toSDK: compute.Environment = new compute.Environment().setClient(client).setDependencies(dependencies.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Library.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Library.scala index 85f13cd47..9a9deff29 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Library.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/Library.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.libraries import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.tasks.CodeAsset import com.databricks.sdk.service.compute case class Library( @@ -11,10 +12,15 @@ case class Library( pypi: Option[PythonPyPiLibrary] = None, requirements: Option[String] = None, whl: Option[String] = None) - extends JobNode { + extends JobNode + with CodeAsset { override def children: Seq[JobNode] = Seq() ++ cran ++ maven ++ pypi - def toSDK: compute.Library = { - val raw = new compute.Library() - raw - } + def toSDK: compute.Library = new compute.Library() + .setCran(cran.map(_.toSDK).orNull) + .setEgg(egg.orNull) + .setJar(jar.orNull) + .setMaven(maven.map(_.toSDK).orNull) + .setPypi(pypi.map(_.toSDK).orNull) + .setRequirements(requirements.orNull) + .setWhl(whl.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/MavenLibrary.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/MavenLibrary.scala index 45e31c1bc..f6f72d5ee 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/MavenLibrary.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/MavenLibrary.scala @@ -1,13 +1,13 @@ package com.databricks.labs.remorph.intermediate.workflows.libraries -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.compute case class MavenLibrary(coordinates: String, exclusions: Seq[String] = Seq.empty, repo: Option[String] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: compute.MavenLibrary = { - val raw = new compute.MavenLibrary() - raw - } + extends LeafJobNode { + def toSDK: compute.MavenLibrary = new compute.MavenLibrary() + .setCoordinates(coordinates) + .setExclusions(exclusions.asJava) + .setRepo(repo.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/PythonPyPiLibrary.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/PythonPyPiLibrary.scala index 8cb3b38e2..ec50bf4f7 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/PythonPyPiLibrary.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/PythonPyPiLibrary.scala @@ -1,12 +1,8 @@ package com.databricks.labs.remorph.intermediate.workflows.libraries -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.compute -case class PythonPyPiLibrary(spec: String, repo: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: compute.PythonPyPiLibrary = { - val raw = new compute.PythonPyPiLibrary() - raw - } +case class PythonPyPiLibrary(spec: String, repo: Option[String] = None) extends LeafJobNode { + def toSDK: compute.PythonPyPiLibrary = new compute.PythonPyPiLibrary().setPackage(spec).setRepo(repo.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/RCranLibrary.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/RCranLibrary.scala index b7afa3371..5850e3c7f 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/RCranLibrary.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/libraries/RCranLibrary.scala @@ -1,12 +1,8 @@ package com.databricks.labs.remorph.intermediate.workflows.libraries -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.compute -case class RCranLibrary(spec: String, repo: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: compute.RCranLibrary = { - val raw = new compute.RCranLibrary() - raw - } +case class RCranLibrary(spec: String, repo: Option[String] = None) extends LeafJobNode { + def toSDK: compute.RCranLibrary = new compute.RCranLibrary().setPackage(spec).setRepo(repo.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/Continuous.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/Continuous.scala index 87b04e7cb..68c186774 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/Continuous.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/Continuous.scala @@ -1,13 +1,9 @@ package com.databricks.labs.remorph.intermediate.workflows.schedules -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.PauseStatus -case class Continuous(pauseStatus: Option[PauseStatus] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.Continuous = { - val raw = new jobs.Continuous() - raw - } +case class Continuous(pauseStatus: Option[PauseStatus] = None) extends LeafJobNode { + def toSDK: jobs.Continuous = new jobs.Continuous().setPauseStatus(pauseStatus.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/CronSchedule.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/CronSchedule.scala index f282ee08f..ce94d3703 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/CronSchedule.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/CronSchedule.scala @@ -7,8 +7,8 @@ import com.databricks.sdk.service.jobs.PauseStatus case class CronSchedule(quartzCronExpression: String, timezoneId: String, pauseStatus: Option[PauseStatus] = None) extends JobNode { override def children: Seq[JobNode] = Seq() - def toSDK: jobs.CronSchedule = { - val raw = new jobs.CronSchedule() - raw - } + def toSDK: jobs.CronSchedule = new jobs.CronSchedule() + .setQuartzCronExpression(quartzCronExpression) + .setTimezoneId(timezoneId) + .setPauseStatus(pauseStatus.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/TriggerSettings.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/TriggerSettings.scala index d0a894178..1c4a003a3 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/TriggerSettings.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/schedules/TriggerSettings.scala @@ -12,8 +12,10 @@ case class TriggerSettings( tableUpdate: Option[TableUpdateTriggerConfiguration] = None) extends JobNode { override def children: Seq[JobNode] = Seq() ++ fileArrival ++ periodic ++ table ++ tableUpdate - def toSDK: jobs.TriggerSettings = { - val raw = new jobs.TriggerSettings() - raw - } + def toSDK: jobs.TriggerSettings = new jobs.TriggerSettings() + .setFileArrival(fileArrival.map(_.toSDK).orNull) + .setPauseStatus(pauseStatus.orNull) + .setPeriodic(periodic.map(_.toSDK).orNull) + .setTable(table.map(_.toSDK).orNull) + .setTableUpdate(tableUpdate.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSnapshot.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSnapshot.scala deleted file mode 100644 index 331cbf8a2..000000000 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSnapshot.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.databricks.labs.remorph.intermediate.workflows.sources - -import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.sdk.service.jobs - -case class GitSnapshot(usedCommit: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.GitSnapshot = { - val raw = new jobs.GitSnapshot() - raw - } -} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSource.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSource.scala deleted file mode 100644 index 76c63cb31..000000000 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sources/GitSource.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.databricks.labs.remorph.intermediate.workflows.sources - -import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.labs.remorph.intermediate.workflows.jobs.JobSource -import com.databricks.sdk.service.jobs -import com.databricks.sdk.service.jobs.GitProvider - -case class GitSource( - gitUrl: String, - gitProvider: GitProvider, - gitBranch: Option[String] = None, - gitCommit: Option[String] = None, - gitSnapshot: Option[GitSnapshot] = None, - gitTag: Option[String] = None, - jobSource: Option[JobSource] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.GitSource = { - val raw = new jobs.GitSource() - raw - } -} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskAlert.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskAlert.scala index 061b99a66..c792243ad 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskAlert.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskAlert.scala @@ -1,16 +1,16 @@ package com.databricks.labs.remorph.intermediate.workflows.sql -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.jdk.CollectionConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs case class SqlTaskAlert( alertId: String, pauseSubscriptions: Boolean = false, subscriptions: Seq[SqlTaskSubscription] = Seq.empty) - extends JobNode { - override def children: Seq[JobNode] = Seq() ++ subscriptions - def toSDK: jobs.SqlTaskAlert = { - val raw = new jobs.SqlTaskAlert() - raw - } + extends LeafJobNode { + def toSDK: jobs.SqlTaskAlert = new jobs.SqlTaskAlert() + .setAlertId(alertId) + .setPauseSubscriptions(pauseSubscriptions) + .setSubscriptions(subscriptions.map(_.toSDK).asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskDashboard.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskDashboard.scala index 0fb5bb1fa..742414df8 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskDashboard.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskDashboard.scala @@ -1,5 +1,6 @@ package com.databricks.labs.remorph.intermediate.workflows.sql +import scala.jdk.CollectionConverters._ import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.sdk.service.jobs @@ -10,8 +11,9 @@ case class SqlTaskDashboard( subscriptions: Seq[SqlTaskSubscription] = Seq.empty) extends JobNode { override def children: Seq[JobNode] = Seq() ++ subscriptions - def toSDK: jobs.SqlTaskDashboard = { - val raw = new jobs.SqlTaskDashboard() - raw - } + def toSDK: jobs.SqlTaskDashboard = new jobs.SqlTaskDashboard() + .setDashboardId(dashboardId) + .setCustomSubject(customSubject.orNull) + .setPauseSubscriptions(pauseSubscriptions) + .setSubscriptions(subscriptions.map(_.toSDK).asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskFile.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskFile.scala index ee36b7e07..b2b6c469c 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskFile.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskFile.scala @@ -1,13 +1,11 @@ package com.databricks.labs.remorph.intermediate.workflows.sql -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.Source -case class SqlTaskFile(path: String, source: Option[Source] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SqlTaskFile = { - val raw = new jobs.SqlTaskFile() - raw - } +case class SqlTaskFile(path: String, source: Option[Source] = None) extends LeafJobNode { + def toSDK: jobs.SqlTaskFile = new jobs.SqlTaskFile() + .setPath(path) + .setSource(source.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskQuery.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskQuery.scala index 3ea7de822..ef785b2d8 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskQuery.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskQuery.scala @@ -1,12 +1,8 @@ package com.databricks.labs.remorph.intermediate.workflows.sql -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs -case class SqlTaskQuery(queryId: String) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SqlTaskQuery = { - val raw = new jobs.SqlTaskQuery() - raw - } +case class SqlTaskQuery(queryId: String) extends LeafJobNode { + def toSDK: jobs.SqlTaskQuery = new jobs.SqlTaskQuery().setQueryId(queryId) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskSubscription.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskSubscription.scala index 6b56ed286..aa23715f4 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskSubscription.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/sql/SqlTaskSubscription.scala @@ -1,12 +1,10 @@ package com.databricks.labs.remorph.intermediate.workflows.sql -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs -case class SqlTaskSubscription(destinationId: Option[String], userName: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SqlTaskSubscription = { - val raw = new jobs.SqlTaskSubscription() - raw - } +case class SqlTaskSubscription(destinationId: Option[String], userName: Option[String] = None) extends LeafJobNode { + def toSDK: jobs.SqlTaskSubscription = new jobs.SqlTaskSubscription() + .setUserName(userName.orNull) + .setDestinationId(destinationId.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/CodeAsset.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/CodeAsset.scala new file mode 100644 index 000000000..04347f436 --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/CodeAsset.scala @@ -0,0 +1,3 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks + +trait CodeAsset diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ConditionTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ConditionTask.scala index 92d25c149..e4ca5f3c0 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ConditionTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ConditionTask.scala @@ -1,13 +1,12 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.ConditionTaskOp -case class ConditionTask(op: ConditionTaskOp, left: String, right: String) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.ConditionTask = { - val raw = new jobs.ConditionTask() - raw - } +case class ConditionTask(op: ConditionTaskOp, left: String, right: String) extends LeafJobNode { + def toSDK: jobs.ConditionTask = new jobs.ConditionTask() + .setOp(op) + .setLeft(left) + .setRight(right) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/DbtTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/DbtTask.scala index 291b93951..6b14314cd 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/DbtTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/DbtTask.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.jdk.CollectionConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.Source @@ -12,10 +13,14 @@ case class DbtTask( schema: Option[String] = None, source: Option[Source] = None, warehouseId: Option[String] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.DbtTask = { - val raw = new jobs.DbtTask() - raw - } + extends LeafJobNode + with NeedsWarehouse { + def toSDK: jobs.DbtTask = new jobs.DbtTask() + .setCommands(commands.asJava) + .setCatalog(catalog.orNull) + .setProfilesDirectory(profilesDirectory.orNull) + .setProjectDirectory(projectDirectory.orNull) + .setSchema(schema.orNull) + .setSource(source.orNull) + .setWarehouseId(warehouseId.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ForEachTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ForEachTask.scala index 47047c0ee..e377653e4 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ForEachTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/ForEachTask.scala @@ -3,10 +3,10 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.sdk.service.jobs -case class ForEachTask(inputs: String, task: Task, concurrency: Option[Int] = None) extends JobNode { +case class ForEachTask(inputs: String, task: Task, concurrency: Long = 20) extends JobNode { override def children: Seq[JobNode] = Seq(task) - def toSDK: jobs.ForEachTask = { - val raw = new jobs.ForEachTask() - raw - } + def toSDK: jobs.ForEachTask = new jobs.ForEachTask() + .setTask(task.toSDK) + .setInputs(inputs) + .setConcurrency(concurrency) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NeedsWarehouse.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NeedsWarehouse.scala new file mode 100644 index 000000000..75173c194 --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NeedsWarehouse.scala @@ -0,0 +1,5 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks + +trait NeedsWarehouse { + final val DEFAULT_WAREHOUSE_ID = sys.env.getOrElse("DATABRICKS_WAREHOUSE_ID", "__DEFAULT_WAREHOUSE_ID__") +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NotebookTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NotebookTask.scala index ee9f4db80..b732a6d95 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NotebookTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/NotebookTask.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.jdk.CollectionConverters._ +import com.databricks.labs.remorph.intermediate.workflows.{JobNode, LeafJobNode} import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.Source @@ -9,10 +10,12 @@ case class NotebookTask( baseParameters: Option[Map[String, String]] = None, source: Option[Source] = None, warehouseId: Option[String] = None) - extends JobNode { + extends LeafJobNode + with CodeAsset { override def children: Seq[JobNode] = Seq() - def toSDK: jobs.NotebookTask = { - val raw = new jobs.NotebookTask() - raw - } + def toSDK: jobs.NotebookTask = new jobs.NotebookTask() + .setNotebookPath(notebookPath) + .setBaseParameters(baseParameters.map(_.asJava).orNull) + .setSource(source.orNull) + .setWarehouseId(warehouseId.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PipelineTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PipelineTask.scala index e18af6f2a..fd20598bb 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PipelineTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PipelineTask.scala @@ -1,12 +1,10 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs -case class PipelineTask(pipelineId: String, fullRefresh: Boolean) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.PipelineTask = { - val raw = new jobs.PipelineTask() - raw - } +case class PipelineTask(pipelineId: String, fullRefresh: Boolean) extends LeafJobNode with CodeAsset { + def toSDK: jobs.PipelineTask = new jobs.PipelineTask() + .setPipelineId(pipelineId) + .setFullRefresh(fullRefresh) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PythonWheelTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PythonWheelTask.scala index 3b4faf163..916138d4c 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PythonWheelTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/PythonWheelTask.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs case class PythonWheelTask( @@ -8,10 +9,11 @@ case class PythonWheelTask( entryPoint: String, namedParameters: Option[Map[String, String]] = None, parameters: Seq[String] = Seq.empty) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.PythonWheelTask = { - val raw = new jobs.PythonWheelTask() - raw - } + extends LeafJobNode + with CodeAsset { + def toSDK: jobs.PythonWheelTask = new jobs.PythonWheelTask() + .setPackageName(packageName) + .setEntryPoint(entryPoint) + .setNamedParameters(namedParameters.getOrElse(Map.empty).asJava) + .setParameters(parameters.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunConditionTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunConditionTask.scala index 8f588b58a..01f16ae47 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunConditionTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunConditionTask.scala @@ -1,14 +1,14 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.ConditionTaskOp case class RunConditionTask(op: ConditionTaskOp, left: String, right: String, outcome: Option[String] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.RunConditionTask = { - val raw = new jobs.RunConditionTask() - raw - } + extends LeafJobNode { + def toSDK: jobs.RunConditionTask = new jobs.RunConditionTask() + .setOp(op) + .setLeft(left) + .setRight(right) + .setOutcome(outcome.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunJobTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunJobTask.scala new file mode 100644 index 000000000..f4aa4da72 --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/RunJobTask.scala @@ -0,0 +1,31 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks + +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode +import com.databricks.sdk.service.jobs +import com.databricks.sdk.service.jobs.PipelineParams + +case class RunJobTask( + jobId: Long, + jobParams: Map[String, String] = Map.empty, + notebookParams: Map[String, String] = Map.empty, + pythonNamedParams: Map[String, String] = Map.empty, + sqlParams: Map[String, String] = Map.empty, + dbtArgs: Seq[String] = Seq.empty, + jarParams: Seq[String] = Seq.empty, + pythonArgs: Seq[String] = Seq.empty, + sparkSubmitArgs: Seq[String] = Seq.empty, + fullPipelineRefresh: Boolean = false) + extends LeafJobNode { + def toSDK: jobs.RunJobTask = new jobs.RunJobTask() + .setJobId(jobId) + .setDbtCommands(dbtArgs.asJava) + .setJarParams(jarParams.asJava) + .setJobParameters(jobParams.asJava) + .setNotebookParams(notebookParams.asJava) + .setPipelineParams(if (fullPipelineRefresh) new PipelineParams().setFullRefresh(true) else null) + .setPythonNamedParams(pythonNamedParams.asJava) + .setPythonParams(pythonArgs.asJava) + .setSparkSubmitParams(sparkSubmitArgs.asJava) + .setSqlParams(sqlParams.asJava) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkJarTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkJarTask.scala index 1ddd82953..b66ad6b16 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkJarTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkJarTask.scala @@ -1,13 +1,13 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs case class SparkJarTask(jarUri: Option[String], mainClassName: Option[String], parameters: Seq[String] = Seq.empty) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SparkJarTask = { - val raw = new jobs.SparkJarTask() - raw - } + extends LeafJobNode { + def toSDK: jobs.SparkJarTask = new jobs.SparkJarTask() + .setJarUri(jarUri.orNull) + .setMainClassName(mainClassName.orNull) + .setParameters(parameters.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkPythonTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkPythonTask.scala index 1a79ed021..fac7e006d 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkPythonTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkPythonTask.scala @@ -1,14 +1,15 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs import com.databricks.sdk.service.jobs.Source case class SparkPythonTask(pythonFile: String, parameters: Seq[String] = Seq.empty, source: Option[Source] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SparkPythonTask = { - val raw = new jobs.SparkPythonTask() - raw - } + extends LeafJobNode + with CodeAsset { + def toSDK: jobs.SparkPythonTask = new jobs.SparkPythonTask() + .setPythonFile(pythonFile) + .setParameters(parameters.asJava) + .setSource(source.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkSubmitTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkSubmitTask.scala index 3027f41db..11f6fd8c8 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkSubmitTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SparkSubmitTask.scala @@ -1,12 +1,9 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs -case class SparkSubmitTask(parameters: Seq[String] = Seq.empty) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SparkSubmitTask = { - val raw = new jobs.SparkSubmitTask() - raw - } +case class SparkSubmitTask(parameters: Seq[String] = Seq.empty) extends LeafJobNode { + def toSDK: jobs.SparkSubmitTask = new jobs.SparkSubmitTask().setParameters(parameters.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SqlTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SqlTask.scala index 742d0a2a3..f163bc67d 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SqlTask.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SqlTask.scala @@ -1,5 +1,6 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks +import scala.jdk.CollectionConverters._ import com.databricks.labs.remorph.intermediate.workflows._ import com.databricks.labs.remorph.intermediate.workflows.sql.{SqlTaskAlert, SqlTaskDashboard, SqlTaskFile, SqlTaskQuery} import com.databricks.sdk.service.jobs @@ -11,10 +12,14 @@ case class SqlTask( file: Option[SqlTaskFile] = None, parameters: Option[Map[String, String]] = None, query: Option[SqlTaskQuery] = None) - extends JobNode { + extends JobNode + with NeedsWarehouse { override def children: Seq[JobNode] = Seq() ++ alert ++ dashboard ++ file ++ query - def toSDK: jobs.SqlTask = { - val raw = new jobs.SqlTask() - raw - } + def toSDK: jobs.SqlTask = new jobs.SqlTask() + .setWarehouseId(warehouseId) + .setAlert(alert.map(_.toSDK).orNull) + .setDashboard(dashboard.map(_.toSDK).orNull) + .setFile(file.map(_.toSDK).orNull) + .setParameters(parameters.map(_.asJava).orNull) + .setQuery(query.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SubmitTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SubmitTask.scala deleted file mode 100644 index 6d578b138..000000000 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/SubmitTask.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.databricks.labs.remorph.intermediate.workflows.tasks - -import com.databricks.labs.remorph.intermediate.workflows.jobs.{JobEmailNotifications, JobsHealthRules} -import com.databricks.labs.remorph.intermediate.workflows.JobNode -import com.databricks.labs.remorph.intermediate.workflows.webhooks.WebhookNotifications -import com.databricks.sdk.service.jobs.{RunIf, RunJobTask} -import com.databricks.sdk.service.{compute, jobs} - -case class SubmitTask( - taskKey: String, - conditionTask: Option[ConditionTask] = None, - dbtTask: Option[DbtTask] = None, - dependsOn: Seq[TaskDependency] = Seq.empty, - description: Option[String] = None, - emailNotifications: Option[JobEmailNotifications] = None, - environmentKey: Option[String] = None, - existingClusterId: Option[String] = None, - forEachTask: Option[ForEachTask] = None, - health: Option[JobsHealthRules] = None, - libraries: Option[Seq[compute.Library]] = None, - newCluster: Option[compute.ClusterSpec] = None, - notebookTask: Option[NotebookTask] = None, - notificationSettings: Option[TaskNotificationSettings] = None, - pipelineTask: Option[PipelineTask] = None, - pythonWheelTask: Option[PythonWheelTask] = None, - runIf: Option[RunIf] = None, - runJobTask: Option[RunJobTask] = None, - sparkJarTask: Option[SparkJarTask] = None, - sparkPythonTask: Option[SparkPythonTask] = None, - sparkSubmitTask: Option[SparkSubmitTask] = None, - sqlTask: Option[SqlTask] = None, - timeoutSeconds: Option[Int] = None, - webhookNotifications: Option[WebhookNotifications] = None) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.SubmitTask = { - val raw = new jobs.SubmitTask() - raw - } -} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/Task.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/Task.scala index 8cecb80a2..efe2d58cd 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/Task.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/Task.scala @@ -1,48 +1,95 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.clusters.NewClusterSpec import com.databricks.labs.remorph.intermediate.workflows.jobs.JobsHealthRules import com.databricks.labs.remorph.intermediate.workflows.libraries.Library import com.databricks.labs.remorph.intermediate.workflows._ import com.databricks.labs.remorph.intermediate.workflows.webhooks.WebhookNotifications import com.databricks.sdk.service.jobs -import com.databricks.sdk.service.jobs.{RunIf, RunJobTask} case class Task( taskKey: String, - conditionTask: Option[ConditionTask] = None, - dbtTask: Option[DbtTask] = None, - dependsOn: Seq[TaskDependency] = Seq.empty, description: Option[String] = None, - disableAutoOptimization: Boolean = false, - emailNotifications: Option[TaskEmailNotifications] = None, - environmentKey: Option[String] = None, - existingClusterId: Option[String] = None, + dependsOn: Seq[TaskDependency] = Seq.empty, + dbtTask: Option[DbtTask] = None, + conditionTask: Option[ConditionTask] = None, forEachTask: Option[ForEachTask] = None, - health: Option[JobsHealthRules] = None, - jobClusterKey: Option[String] = None, - libraries: Seq[Library] = Seq.empty, - maxRetries: Option[Int] = None, - minRetryIntervalMillis: Option[Int] = None, - newCluster: Option[NewClusterSpec] = None, notebookTask: Option[NotebookTask] = None, - notificationSettings: Option[TaskNotificationSettings] = None, pipelineTask: Option[PipelineTask] = None, pythonWheelTask: Option[PythonWheelTask] = None, - retryOnTimeout: Boolean = false, - runIf: Option[RunIf] = None, runJobTask: Option[RunJobTask] = None, sparkJarTask: Option[SparkJarTask] = None, sparkPythonTask: Option[SparkPythonTask] = None, sparkSubmitTask: Option[SparkSubmitTask] = None, sqlTask: Option[SqlTask] = None, + libraries: Seq[Library] = Seq.empty, + newCluster: Option[NewClusterSpec] = None, + existingClusterId: Option[String] = None, + jobClusterKey: Option[String] = None, + runIf: Option[jobs.RunIf] = None, + disableAutoOptimization: Boolean = false, + environmentKey: Option[String] = None, + maxRetries: Option[Int] = None, + minRetryIntervalMillis: Option[Int] = None, + health: Option[JobsHealthRules] = None, + retryOnTimeout: Boolean = false, timeoutSeconds: Option[Int] = None, + notificationSettings: Option[TaskNotificationSettings] = None, + emailNotifications: Option[TaskEmailNotifications] = None, webhookNotifications: Option[WebhookNotifications] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() // TODO: Add all the children - def toSDK: jobs.Task = { - val raw = new jobs.Task() - raw - } + override def children: Seq[JobNode] = Seq() ++ + conditionTask ++ + dbtTask ++ + dependsOn ++ + emailNotifications ++ + forEachTask ++ + health ++ + libraries ++ + newCluster ++ + notebookTask ++ + notificationSettings ++ + pipelineTask ++ + pythonWheelTask ++ + runJobTask ++ + sparkJarTask ++ + sparkPythonTask ++ + sparkSubmitTask ++ + sqlTask ++ + webhookNotifications + + def dependOn(task: Task): Task = copy(dependsOn = dependsOn :+ TaskDependency(task.taskKey)) + + def toSDK: jobs.Task = new jobs.Task() + .setTaskKey(taskKey) + .setConditionTask(conditionTask.map(_.toSDK).orNull) + .setDbtTask(dbtTask.map(_.toSDK).orNull) + .setDependsOn(dependsOn.map(_.toSDK).asJava) + .setDescription(description.orNull) + .setDisableAutoOptimization(disableAutoOptimization) + .setEmailNotifications(emailNotifications.map(_.toSDK).orNull) + .setEnvironmentKey(environmentKey.orNull) + .setExistingClusterId(existingClusterId.orNull) + .setForEachTask(forEachTask.map(_.toSDK).orNull) + .setHealth(health.map(_.toSDK).orNull) + .setJobClusterKey(jobClusterKey.orNull) + .setLibraries(libraries.map(_.toSDK).asJava) + .setMaxRetries(maxRetries.map(_.asInstanceOf[java.lang.Long]).orNull) + .setMinRetryIntervalMillis(minRetryIntervalMillis.map(_.asInstanceOf[java.lang.Long]).orNull) + .setNewCluster(newCluster.map(_.toSDK).orNull) + .setNotebookTask(notebookTask.map(_.toSDK).orNull) + .setNotificationSettings(notificationSettings.map(_.toSDK).orNull) + .setPipelineTask(pipelineTask.map(_.toSDK).orNull) + .setPythonWheelTask(pythonWheelTask.map(_.toSDK).orNull) + .setRetryOnTimeout(retryOnTimeout) + .setRunIf(runIf.orNull) + .setRunJobTask(runJobTask.map(_.toSDK).orNull) + .setSparkJarTask(sparkJarTask.map(_.toSDK).orNull) + .setSparkPythonTask(sparkPythonTask.map(_.toSDK).orNull) + .setSparkSubmitTask(sparkSubmitTask.map(_.toSDK).orNull) + .setSqlTask(sqlTask.map(_.toSDK).orNull) + .setTimeoutSeconds(timeoutSeconds.map(_.asInstanceOf[java.lang.Long]).orNull) + .setWebhookNotifications(webhookNotifications.map(_.toSDK).orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskDependency.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskDependency.scala index 20d4d0cca..894cbe6a7 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskDependency.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskDependency.scala @@ -1,12 +1,8 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs -case class TaskDependency(taskKey: String, outcome: Option[String] = None) extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.TaskDependency = { - val raw = new jobs.TaskDependency() - raw - } +case class TaskDependency(taskKey: String, outcome: Option[String] = None) extends LeafJobNode { + def toSDK: jobs.TaskDependency = new jobs.TaskDependency().setTaskKey(taskKey).setOutcome(outcome.orNull) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskEmailNotifications.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskEmailNotifications.scala index 4bb2c9e37..ee3454731 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskEmailNotifications.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/TaskEmailNotifications.scala @@ -1,6 +1,7 @@ package com.databricks.labs.remorph.intermediate.workflows.tasks -import com.databricks.labs.remorph.intermediate.workflows.JobNode +import scala.collection.JavaConverters._ +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode import com.databricks.sdk.service.jobs case class TaskEmailNotifications( @@ -10,10 +11,12 @@ case class TaskEmailNotifications( onStart: Seq[String] = Seq.empty, onStreamingBacklogExceeded: Seq[String] = Seq.empty, onSuccess: Seq[String] = Seq.empty) - extends JobNode { - override def children: Seq[JobNode] = Seq() - def toSDK: jobs.TaskEmailNotifications = { - val raw = new jobs.TaskEmailNotifications() - raw - } + extends LeafJobNode { + def toSDK: jobs.TaskEmailNotifications = new jobs.TaskEmailNotifications() + .setNoAlertForSkippedRuns(noAlertForSkippedRuns) + .setOnDurationWarningThresholdExceeded(onDurationWarningThresholdExceeded.asJava) + .setOnFailure(onFailure.asJava) + .setOnStart(onStart.asJava) + .setOnStreamingBacklogExceeded(onStreamingBacklogExceeded.asJava) + .setOnSuccess(onSuccess.asJava) } diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/PythonNotebookTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/PythonNotebookTask.scala new file mode 100644 index 000000000..f42cc230c --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/PythonNotebookTask.scala @@ -0,0 +1,13 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.generators.py.Module +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode +import com.databricks.labs.remorph.intermediate.workflows.tasks.NotebookTask +import com.databricks.sdk.service.jobs.Source + +case class PythonNotebookTask(module: Module, baseParameters: Map[String, String] = Map.empty) + extends LeafJobNode + with ToNotebookTask { + override def toNotebookTask(notebookPath: String): NotebookTask = + NotebookTask(notebookPath, Some(baseParameters), Some(Source.WORKSPACE), None) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/RunNotebookJobTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/RunNotebookJobTask.scala new file mode 100644 index 000000000..d4b85fde6 --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/RunNotebookJobTask.scala @@ -0,0 +1,10 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.intermediate.workflows.JobNode +import com.databricks.labs.remorph.intermediate.workflows.jobs.JobSettings +import com.databricks.labs.remorph.intermediate.workflows.tasks.RunJobTask + +case class RunNotebookJobTask(job: JobSettings, params: Map[String, String] = Map.empty) extends JobNode { + override def children: Seq[JobNode] = Seq(job) + def toRunJobTask(id: Long): RunJobTask = RunJobTask(id, notebookParams = params) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlNotebookTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlNotebookTask.scala new file mode 100644 index 000000000..3d822a93a --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlNotebookTask.scala @@ -0,0 +1,14 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.intermediate.LogicalPlan +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode +import com.databricks.labs.remorph.intermediate.workflows.tasks.{NeedsWarehouse, NotebookTask} +import com.databricks.sdk.service.jobs.Source + +case class SqlNotebookTask(plan: LogicalPlan, baseParameters: Map[String, String] = Map.empty) + extends LeafJobNode + with ToNotebookTask + with NeedsWarehouse { + override def toNotebookTask(notebookPath: String): NotebookTask = + NotebookTask(notebookPath, Some(baseParameters), Some(Source.WORKSPACE), Some(DEFAULT_WAREHOUSE_ID)) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlWorkspaceFileTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlWorkspaceFileTask.scala new file mode 100644 index 000000000..f5b3659fd --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/SqlWorkspaceFileTask.scala @@ -0,0 +1,23 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.intermediate.LogicalPlan +import com.databricks.labs.remorph.intermediate.workflows.LeafJobNode +import com.databricks.labs.remorph.intermediate.workflows.sql.SqlTaskFile +import com.databricks.labs.remorph.intermediate.workflows.tasks.{NeedsWarehouse, SqlTask, Task} +import com.databricks.sdk.service.jobs.Source + +import java.io.File + +case class SqlWorkspaceFileTask(plan: LogicalPlan, parameters: Map[String, String] = Map.empty) + extends LeafJobNode + with ToTask + with NeedsWarehouse { + override def toTask(path: String): Task = Task( + taskKey = new File(path).getName, + description = Some(s"Run notebook $path"), + sqlTask = Some( + SqlTask( + warehouseId = DEFAULT_WAREHOUSE_ID, + parameters = Some(parameters), + file = Some(SqlTaskFile(path, Some(Source.WORKSPACE)))))) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToNotebookTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToNotebookTask.scala new file mode 100644 index 000000000..cd6caa05f --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToNotebookTask.scala @@ -0,0 +1,14 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.intermediate.workflows.tasks.{NotebookTask, Task} + +import java.io.File + +trait ToNotebookTask extends ToTask { + def toNotebookTask(notebookPath: String): NotebookTask + + def toTask(path: String): Task = Task( + taskKey = new File(path).getName, + description = Some(s"Run notebook $path"), + notebookTask = Some(toNotebookTask(path))) +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToTask.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToTask.scala new file mode 100644 index 000000000..eedc5322d --- /dev/null +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/tasks/intermediate/ToTask.scala @@ -0,0 +1,7 @@ +package com.databricks.labs.remorph.intermediate.workflows.tasks.intermediate + +import com.databricks.labs.remorph.intermediate.workflows.tasks.Task + +trait ToTask { + def toTask(path: String): Task +} diff --git a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/webhooks/WebhookNotifications.scala b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/webhooks/WebhookNotifications.scala index 446fc89b6..9468b7b75 100644 --- a/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/webhooks/WebhookNotifications.scala +++ b/core/src/main/scala/com/databricks/labs/remorph/intermediate/workflows/webhooks/WebhookNotifications.scala @@ -1,5 +1,6 @@ package com.databricks.labs.remorph.intermediate.workflows.webhooks +import scala.collection.JavaConverters._ import com.databricks.labs.remorph.intermediate.workflows.JobNode import com.databricks.sdk.service.jobs @@ -12,8 +13,10 @@ case class WebhookNotifications( extends JobNode { override def children: Seq[JobNode] = Seq() ++ onDurationWarningThresholdExceeded ++ onFailure ++ onStart ++ onStreamingBacklogExceeded ++ onSuccess - def toSDK: jobs.WebhookNotifications = { - val raw = new jobs.WebhookNotifications() - raw - } + def toSDK: jobs.WebhookNotifications = new jobs.WebhookNotifications() + .setOnDurationWarningThresholdExceeded(onDurationWarningThresholdExceeded.map(_.toSDK).asJava) + .setOnFailure(onFailure.map(_.toSDK).asJava) + .setOnStart(onStart.map(_.toSDK).asJava) + .setOnStreamingBacklogExceeded(onStreamingBacklogExceeded.map(_.toSDK).asJava) + .setOnSuccess(onSuccess.map(_.toSDK).asJava) }