Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DMS Serverless Operators #43988

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/dms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,69 @@ To delete a replication task you can use
:start-after: [START howto_operator_dms_delete_task]
:end-before: [END howto_operator_dms_delete_task]


Create a serverless replication config
======================================

To create a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateReplicationConfigOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_create_replication_config]
:end-before: [END howto_operator_dms_create_replication_config]

Describe a serverless replication config
========================================

To describe a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeReplicationConfigsOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_describe_replication_config]
:end-before: [END howto_operator_dms_describe_replication_config]

Start a serverless replication
==============================

To start a serverless replication use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsStartReplicationOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_start_replication]
:end-before: [END howto_operator_dms_serverless_start_replication]

Get the status of a serverless replication
==========================================

To get the status of a serverless replication use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeReplicationsOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_describe_replication]
:end-before: [END howto_operator_dms_serverless_describe_replication]

Delete a serverless replication configuration
=============================================

To delete a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeReplicationsOperator`.
ellisms marked this conversation as resolved.
Show resolved Hide resolved

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_delete_replication_config]
:end-before: [END howto_operator_dms_serverless_delete_replication_config]



Sensors
-------

Expand Down
160 changes: 160 additions & 0 deletions providers/src/airflow/providers/amazon/aws/hooks/dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from __future__ import annotations

import json
from datetime import datetime
from enum import Enum
from typing import Any

from botocore.exceptions import ClientError
from dateutil import parser

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

Expand Down Expand Up @@ -219,3 +224,158 @@ def wait_for_task_status(self, replication_task_arn: str, status: DmsTaskWaiterS
],
WithoutSettings=True,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I am wondering if it is worth creating all these methods, they are all just wrapper around boto3 apis. You added some error management around it but I am just questioning if it is worth creating new methods for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The describe_* methods get used in a few places, so I think those should be in the hook to avoid duplication. I'll look to consolidate some of the simpler ones directly into the Operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some below are quite simple, but others are decent number of lines that's nice to abstract out of the Operators (start and create for example). So I'm on the fence here, I could see both ways, but might lean on keeping it how it is since you've already spent time writing and testing the code as it is?

def describe_replication_configs(self, filters: list[dict] | None = None, **kwargs) -> list[dict]:
"""
Return list of serverless replication configs.

.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_configs`

:param filters: List of filter objects
:return: List of replication tasks
"""
filters = filters if filters is not None else []

try:
resp = self.conn.describe_replication_configs(Filters=filters, **kwargs)
return resp.get("ReplicationConfigs", [])
except Exception as ex:
self.log.error("Error while describing replication configs: %s", str(ex))
return []
ellisms marked this conversation as resolved.
Show resolved Hide resolved

def create_replication_config(
self,
replication_config_id: str,
source_endpoint_arn: str,
target_endpoint_arn: str,
compute_config: dict[str, Any],
replication_type: str,
table_mappings: str,
additional_config_kwargs: dict[str, Any] | None = None,
**kwargs,
):
"""
Create an AWS DMS Serverless configuration that can be used to start an DMS Serverless replication.

.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.create_replication_config`

:param replicationConfigId: Unique identifier used to create a ReplicationConfigArn.
:param sourceEndpointArn: ARN of the source endpoint
:param targetEndpointArn: ARN of the target endpoint
:param computeConfig: Parameters for provisioning an DMS Serverless replication.
:param replicationType: type of DMS Serverless replication
:param tableMappings: JSON table mappings
:param tags: Key-value tag pairs
:param resourceId: Unique value or name that you set for a given resource that can be used to construct an Amazon Resource Name (ARN) for that resource.
:param supplementalSettings: JSON settings for specifying supplemental data
:param replicationSettings: JSON settings for DMS Serverless replications

:return: ReplicationConfigArn

"""
if additional_config_kwargs is None:
additional_config_kwargs = {}
try:
resp = self.conn.create_replication_config(
ReplicationConfigIdentifier=replication_config_id,
SourceEndpointArn=source_endpoint_arn,
TargetEndpointArn=target_endpoint_arn,
ComputeConfig=compute_config,
ReplicationType=replication_type,
TableMappings=table_mappings,
**additional_config_kwargs,
)
arn = resp.get("ReplicationConfig", {}).get("ReplicationConfigArn")
self.log.info("Successfully created replication config: %s", arn)
return arn

except ClientError as err:
err_str = f"Error: {err.get('Error','').get('Code','')}: {err.get('Error','').get('Message','')}"
self.log.error("Error while creating replication config: %s", err_str)
raise err

def describe_replications(self, filters: list[dict[str, Any]] | None = None, **kwargs) -> list[dict]:
"""
Return list of serverless replications.

.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replications`

:param filters: List of filter objects
:return: List of replications
"""
filters = filters if filters is not None else []
try:
resp = self.conn.describe_replications(Filters=filters, **kwargs)
return resp.get("Replications", [])
except Exception:
return []
ellisms marked this conversation as resolved.
Show resolved Hide resolved

def delete_replication_config(
self, replication_config_arn: str, delay: int = 60, max_attempts: int = 120
):
"""
Delete an AWS DMS Serverless configuration.

.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.delete_replication_config`

:param replication_config_arn: ReplicationConfigArn
"""
try:
self.log.info("Deleting replication config: %s", replication_config_arn)

self.conn.delete_replication_config(ReplicationConfigArn=replication_config_arn)

except ClientError as err:
err_str = (
f"Error: {err.get('Error', '').get('Code', '')}: {err.get('Error', '').get('Message', '')}"
)
self.log.error("Error while deleting replication config: %s", err_str)
raise err

def start_replication(
self,
replication_config_arn: str,
start_replication_type: str,
cdc_start_time: datetime | str | None = None,
cdc_start_pos: str | None = None,
cdc_stop_pos: str | None = None,
):
additional_args: dict[str, Any] = {}

if cdc_start_time:
additional_args["CdcStartTime"] = (
cdc_start_time if isinstance(cdc_start_time, datetime) else parser.parse(cdc_start_time)
)
if cdc_start_pos:
additional_args["CdcStartPosition"] = cdc_start_pos
if cdc_stop_pos:
additional_args["CdcStopPosition"] = cdc_stop_pos

try:
resp = self.conn.start_replication(
ReplicationConfigArn=replication_config_arn,
StartReplicationType=start_replication_type,
**additional_args,
)

return resp
except Exception as ex:
self.log.error("Error while starting replication: %s", str(ex))
raise ex

def stop_replication(self, replication_config_arn: str):
resp = self.conn.stop_replication(ReplicationConfigArn=replication_config_arn)
return resp

def get_provision_status(self, replication_config_arn: str) -> str:
"""Get the provisioning status for a serverless replication."""
result = self.describe_replications(
filters=[{"Name": "replication-config-arn", "Values": [replication_config_arn]}]
)

provision_status = result[0].get("ProvisionData", {}).get("ProvisionState", "")
return provision_status
Loading
Loading