Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DMS Serverless Operators #43988

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

ellisms
Copy link
Contributor

@ellisms ellisms commented Nov 13, 2024


Adding operators, waiters, and triggers to support AWS DMS Serverless replications.
closes #39954

@@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, status: DmsTaskWaiterS
],
WithoutSettings=True,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I am wondering if it is worth creating all these methods, they are all just wrapper around boto3 apis. You added some error management around it but I am just questioning if it is worth creating new methods for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The describe_* methods get used in a few places, so I think those should be in the hook to avoid duplication. I'll look to consolidate some of the simpler ones directly into the Operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some below are quite simple, but others are decent number of lines that's nice to abstract out of the Operators (start and create for example). So I'm on the fence here, I could see both ways, but might lean on keeping it how it is since you've already spent time writing and testing the code as it is?

Comment on lines +664 to +672
result = self.hook.describe_replications(
filters=[{"Name": "replication-config-arn", "Values": [self.replication_config_arn]}]
)

try:
current_status = result[0].get("Status", "")
except Exception as ex:
self.log.error("Error while getting replication status: %s. Unable to start replication", str(ex))
raise ex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I dont think it is necessary

Comment on lines +681 to +691
if self.deferrable:
self.log.info("Deferring until deprovisioning completes.")
self.defer(
trigger=DmsReplicationDeprovisionedTrigger(
replication_config_arn=self.replication_config_arn,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
aws_conn_id=self.aws_conn_id,
),
method_name="retry_execution",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment also apply for the above operator. I am wondering if you should not split this operator in two. One sensor would wait for the deprovisioning to complete, and this operator would fail if the status is not complete before actually doing it. I just think this operator does a lot and the user does not necessarily know that the operator could potentially wait for some time (with a cost) even though wait_for_completion is False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The DMS docs weren't clear that a replication must be stopped (or failed) AND deprovisioned, so I was trying to cover both cases to simplify the deletion process. System testing pointed this out. It does add a bit to the operator, but the wait time would be the same in the Sensor + Operator combo. I see meeting the pre-requisite for deleting different than wait_for_completion of the actual delete. Open to exploring creating a sensor and would like to hear what others think.

@@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, status: DmsTaskWaiterS
],
WithoutSettings=True,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some below are quite simple, but others are decent number of lines that's nice to abstract out of the Operators (start and create for example). So I'm on the fence here, I could see both ways, but might lean on keeping it how it is since you've already spent time writing and testing the code as it is?

providers/src/airflow/providers/amazon/aws/triggers/dms.py Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Adding DMS serverless options to aws provider
3 participants