MsSqlOperator¶
The purpose of MSSQL Operator is to define tasks involving interactions with the MSSQL database.
Use the MsSqlOperator
to execute
SQL commands in MSSQL database.
Common Database Operations with MsSqlOperator¶
To use the mssql operator to carry out SQL request, two parameters are required: sql
and mssql_conn_id
.
These two parameters are eventually fed to the MSSQL hook object that interacts directly with the MSSQL database.
Creating a MSSQL database table¶
The code snippets below are based on Airflow-2.2
An example usage of the MsSqlOperator is as follows:
# Example of creating a task to create a table in MsSql
create_table_mssql_task = MsSqlOperator(
task_id="create_country_table",
mssql_conn_id="airflow_mssql",
sql=r"""
CREATE TABLE Country (
country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
name TEXT,
continent TEXT
);
""",
dag=dag,
)
You can also use an external file to execute the SQL commands. Script folder must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.
# Example of creating a task that calls an sql command from an external file.
create_table_mssql_from_external_file = MsSqlOperator(
task_id="create_table_from_external_file",
mssql_conn_id="airflow_mssql",
sql="create_table.sql",
dag=dag,
)
Your dags/create_table.sql
should look like this:
Inserting data into a MSSQL database table¶
We can then create a MsSqlOperator task that populate the Users
table.
populate_user_table = MsSqlOperator(
task_id="populate_user_table",
mssql_conn_id="airflow_mssql",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
Fetching records from your MSSQL database table¶
Fetching records from your MSSQL database table can be as simple as:
get_all_countries = MsSqlOperator(
task_id="get_all_countries",
mssql_conn_id="airflow_mssql",
sql=r"""SELECT * FROM Country;""",
)
Passing Parameters into MsSqlOperator¶
MsSqlOperator provides parameters
attribute which makes it possible to dynamically inject values into your
SQL requests during runtime.
To find the countries in Asian continent:
get_countries_from_continent = MsSqlOperator(
task_id="get_countries_from_continent",
mssql_conn_id="airflow_mssql",
sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
)
The complete MSSQL Operator DAG¶
When we put everything together, our DAG should look like this:
import os
from datetime import datetime
import pytest
from airflow import DAG
try:
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
except ImportError:
pytest.skip("MSSQL provider not available", allow_module_level=True)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_mssql"
with DAG(
DAG_ID,
schedule="@daily",
start_date=datetime(2021, 10, 1),
tags=["example"],
catchup=False,
) as dag:
# Example of creating a task to create a table in MsSql
create_table_mssql_task = MsSqlOperator(
task_id="create_country_table",
mssql_conn_id="airflow_mssql",
sql=r"""
CREATE TABLE Country (
country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
name TEXT,
continent TEXT
);
""",
dag=dag,
)
@dag.task(task_id="insert_mssql_task")
def insert_mssql_hook():
mssql_hook = MsSqlHook(mssql_conn_id="airflow_mssql", schema="airflow")
rows = [
("India", "Asia"),
("Germany", "Europe"),
("Argentina", "South America"),
("Ghana", "Africa"),
("Japan", "Asia"),
("Namibia", "Africa"),
]
target_fields = ["name", "continent"]
mssql_hook.insert_rows(table="Country", rows=rows, target_fields=target_fields)
# Example of creating a task that calls an sql command from an external file.
create_table_mssql_from_external_file = MsSqlOperator(
task_id="create_table_from_external_file",
mssql_conn_id="airflow_mssql",
sql="create_table.sql",
dag=dag,
)
populate_user_table = MsSqlOperator(
task_id="populate_user_table",
mssql_conn_id="airflow_mssql",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = MsSqlOperator(
task_id="get_all_countries",
mssql_conn_id="airflow_mssql",
sql=r"""SELECT * FROM Country;""",
)
get_all_description = MsSqlOperator(
task_id="get_all_description",
mssql_conn_id="airflow_mssql",
sql=r"""SELECT description FROM Users;""",
)
get_countries_from_continent = MsSqlOperator(
task_id="get_countries_from_continent",
mssql_conn_id="airflow_mssql",
sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
)
(
create_table_mssql_task
>> insert_mssql_hook()
>> create_table_mssql_from_external_file
>> populate_user_table
>> get_all_countries
>> get_all_description
>> get_countries_from_continent
)