AWS DataSync¶
AWS DataSync is a data-transfer service that simplifies, automates, and accelerates moving and replicating data between on-premises storage systems and AWS storage services over the internet or AWS Direct Connect.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow™
Operators¶
Interact with AWS DataSync Tasks¶
You can use DataSyncOperator
to
find, create, update, execute and delete AWS DataSync tasks.
Once the DataSyncOperator
has identified
the correct TaskArn to run (either because you specified it, or because it was found), it will then be
executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified
by a TaskExecutionArn.
The TaskExecutionArn will be monitored until completion (success / failure), and its status will be periodically written to the Airflow task log.
The DataSyncOperator
supports
optional passing of additional kwargs to the underlying boto3.start_task_execution()
API.
This is done with the task_execution_kwargs
parameter.
This is useful for example to limit bandwidth or filter included files, see the boto3 Datasync
documentation
for more details.
Execute a task¶
To execute a specific task, you can pass the task_arn
to the operator.
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
Search and execute a task¶
To search for a task, you can specify the source_location_uri
and destination_location_uri
to the operator.
If one task is found, this one will be executed.
If more than one task is found, the operator will raise an Exception. To avoid this, you can set
allow_random_task_choice
to True
to randomly choose from candidate tasks.
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
Create and execute a task¶
When searching for a task, if no task is found you have the option to create one before executing it.
In order to do that, you need to provide the extra parameters create_task_kwargs
, create_source_location_kwargs
and create_destination_location_kwargs
.
These extra parameters provide a way for the operator to automatically create a Task and/or Locations if no suitable existing Task was found. If these are left to their default value (None) then no create will be attempted.
Also, because delete_task_after_execution
is set to True
, the task will be deleted
from AWS DataSync after it completes successfully.
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
When creating a Task, the
DataSyncOperator
will try to find
and use existing LocationArns rather than creating new ones. If multiple LocationArns match the
specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly
to how it chooses a single Task from many Tasks:
The operator will raise an Exception. To avoid this, you can set allow_random_location_choice
to True
to randomly choose from candidate Locations.