Skip to content

apache_air_flow_utils

Description: This module provides automation support for Apache AirFlow. User can interact with AirFlow DAGs, by triggering the DAG and track the progress by getting state of DAG

ApacheAirFlow

Description

| This Class provides automation support for Apache AirFlow. User can interact with AirFlow DAGs, by triggering the DAG and track the progress by getting state of DAG | Apache Airflow is a workflow automation and scheduling system that can be used to author and manage data pipelines. | Airflow uses workflows made of directed acyclic graphs (DAGs) of tasks.

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class ApacheAirFlow:
    """
    Description:
        |  This Class provides automation support for Apache AirFlow. User can interact
        with AirFlow DAGs, by triggering the DAG and track the progress by getting state
        of DAG
        |   Apache Airflow is a workflow automation and scheduling system that can be used to
         author and manage data pipelines.
        |   Airflow uses workflows made of directed acyclic graphs (DAGs) of tasks.

    """

    def __init__(self):
        self.__ssh_handler = SshHandler()
        self.__obj_exception = CoreExceptions()
        self.logger = CoreLogger(name=__name__).get_logger()

    def trigger_dag(
            self, ssh_client, dag_id, in_buffer=2048, out_buffer=2048
    ) -> list:
        """
        Triggers the AirFlow DAG and returns a list of results from AirFlow.

        Args:
            ssh_client: SSH client object used for connection.
            dag_id (str): AirFlow DAG ID (case sensitive).
            in_buffer (int): Input buffer size.
            out_buffer (int): Output buffer size.

        Returns:
            list: Contains results from execution of AirFlow DAG.

        Examples:
            >> results = ApacheUtils.AirFlow().trigger_dag(ssh_client, 'TestDagId')
        """
        try:
            commands = [f"airflow trigger_dag {dag_id}"]
            dag_results = self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=in_buffer,
                out_buffer=out_buffer,
            )

            return dag_results
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while triggering DAG '{dag_id}': {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return []

    def check_dag_state(
            self,
            ssh_client,
            dag_id: str,
            dag_result: list,
            in_buffer: int = 2048,
            out_buffer: int = 2048,
    ) -> list:
        """
        Returns the current status of the DAG based on the list provided, which is returned
        by the trigger AirFlow DAG method.

        Args:
            ssh_client: SSH client object used for connection.
            dag_id (str): AirFlow DAG ID (case sensitive).
            dag_result (list): List of results from AirFlow DAG trigger.
            in_buffer (int): Input buffer size.
            out_buffer (int): Output buffer size.

        Returns:
            list: Contains results from execution of AirFlow DAG.

        Examples:
            >> state = ApacheUtils.AirFlow().
            check_dag_state(ssh_client, 'TestDagId', lst_trigger_dag_results)
        """
        try:
            execution_date = dag_result[-1].split("__")[1].split(",")[0]
            commands = [f"airflow dag_state {dag_id} {execution_date}"]
            dag_results = self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=in_buffer,
                out_buffer=out_buffer,
            )
            return dag_results
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while checking DAG state for ID {dag_id}: {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return []

    def unpause_dag(self, ssh_client, dag_id: str) -> bool:
        """
        Unpauses a specified DAG in Airflow.

        Args:
            ssh_client: SSH client object used for connection.
            dag_id (str): The ID of the DAG to unpause.

        Returns:
            bool: True if the operation was successful; otherwise, False.

        Examples:
            >> success = ApacheUtils.AirFlow().unpause_dag(ssh_client, "TestDagId")

        """
        try:
            commands = [f"airflow dags unpause {dag_id}"]
            self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=2048,
                out_buffer=2048,
            )
            self.logger.info("DAG %s unpaused successfully.", dag_id)
            return True
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while unpausing DAG '{dag_id}': {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return False

    def pause_dag(self, ssh_client, dag_id: str) -> bool:
        """
        Pauses a specified DAG in Airflow.

        Args:
            ssh_client: SSH client object used for connection.
            dag_id (str): The ID of the DAG to pause.

        Returns:
            bool: True if the operation was successful; otherwise, False.

        Examples:
            >> success = ApacheUtils.AirFlow().pause_dag(ssh_client, "TestDagId")

        """
        try:
            commands = [f"airflow dags pause {dag_id}"]
            self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=2048,
                out_buffer=2048,
            )
            self.logger.info("DAG %s paused successfully.", dag_id)
            return True
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while pausing DAG '{dag_id}': {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return False

    def get_dag_details(self, ssh_client, dag_id: str) -> list:
        """
        Retrieves detailed information about a specific DAG.

        Args:
            ssh_client: SSH client object used for connection.
            dag_id (str): The ID of the DAG to retrieve details for.

        Returns:
            dict: A dictionary containing details about the specified DAG.

        Examples:
            >> details = ApacheUtils.AirFlow().get_dag_details(ssh_client, "TestDagId")

        """

        try:
            commands = [f"airflow dags show {dag_id}"]
            dag_results = self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=2048,
                out_buffer=2048,
            )
            return dag_results
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while retrieving details for " \
                            f"DAG '{dag_id}': {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return []

    def list_dags(self, ssh_client) -> list:
        """
        Lists all available DAGs in the Airflow environment.

        Args:
            ssh_client: SSH client object used for connection.

        Returns:
            list: A list of DAG IDs.

        Examples:
            >> dags = ApacheUtils.AirFlow().list_dags(ssh_client)

        """
        try:
            commands = ["airflow dags list"]
            dag_results = self.__ssh_handler.execute(
                ssh_client,
                commands,
                in_buffer=2048,
                out_buffer=2048,
            )
            return dag_results
        except (SSHException, ValueError) as e:
            error_message = f"Error occurred while listing DAGs: {str(e)}"
            self.__obj_exception.raise_generic_exception(
                message=error_message,
                insert_report=True,
                trim_log=True,
                log_local=True,
                fail_test=False,
            )
            return []

check_dag_state(ssh_client, dag_id, dag_result, in_buffer=2048, out_buffer=2048)

Returns the current status of the DAG based on the list provided, which is returned by the trigger AirFlow DAG method.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required
dag_id str

AirFlow DAG ID (case sensitive).

required
dag_result list

List of results from AirFlow DAG trigger.

required
in_buffer int

Input buffer size.

2048
out_buffer int

Output buffer size.

2048

Returns:

Name Type Description
list list

Contains results from execution of AirFlow DAG.

Examples:

state = ApacheUtils.AirFlow(). check_dag_state(ssh_client, 'TestDagId', lst_trigger_dag_results)

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def check_dag_state(
        self,
        ssh_client,
        dag_id: str,
        dag_result: list,
        in_buffer: int = 2048,
        out_buffer: int = 2048,
) -> list:
    """
    Returns the current status of the DAG based on the list provided, which is returned
    by the trigger AirFlow DAG method.

    Args:
        ssh_client: SSH client object used for connection.
        dag_id (str): AirFlow DAG ID (case sensitive).
        dag_result (list): List of results from AirFlow DAG trigger.
        in_buffer (int): Input buffer size.
        out_buffer (int): Output buffer size.

    Returns:
        list: Contains results from execution of AirFlow DAG.

    Examples:
        >> state = ApacheUtils.AirFlow().
        check_dag_state(ssh_client, 'TestDagId', lst_trigger_dag_results)
    """
    try:
        execution_date = dag_result[-1].split("__")[1].split(",")[0]
        commands = [f"airflow dag_state {dag_id} {execution_date}"]
        dag_results = self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=in_buffer,
            out_buffer=out_buffer,
        )
        return dag_results
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while checking DAG state for ID {dag_id}: {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return []

get_dag_details(ssh_client, dag_id)

Retrieves detailed information about a specific DAG.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required
dag_id str

The ID of the DAG to retrieve details for.

required

Returns:

Name Type Description
dict list

A dictionary containing details about the specified DAG.

Examples:

details = ApacheUtils.AirFlow().get_dag_details(ssh_client, "TestDagId")

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def get_dag_details(self, ssh_client, dag_id: str) -> list:
    """
    Retrieves detailed information about a specific DAG.

    Args:
        ssh_client: SSH client object used for connection.
        dag_id (str): The ID of the DAG to retrieve details for.

    Returns:
        dict: A dictionary containing details about the specified DAG.

    Examples:
        >> details = ApacheUtils.AirFlow().get_dag_details(ssh_client, "TestDagId")

    """

    try:
        commands = [f"airflow dags show {dag_id}"]
        dag_results = self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=2048,
            out_buffer=2048,
        )
        return dag_results
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while retrieving details for " \
                        f"DAG '{dag_id}': {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return []

list_dags(ssh_client)

Lists all available DAGs in the Airflow environment.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required

Returns:

Name Type Description
list list

A list of DAG IDs.

Examples:

dags = ApacheUtils.AirFlow().list_dags(ssh_client)

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def list_dags(self, ssh_client) -> list:
    """
    Lists all available DAGs in the Airflow environment.

    Args:
        ssh_client: SSH client object used for connection.

    Returns:
        list: A list of DAG IDs.

    Examples:
        >> dags = ApacheUtils.AirFlow().list_dags(ssh_client)

    """
    try:
        commands = ["airflow dags list"]
        dag_results = self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=2048,
            out_buffer=2048,
        )
        return dag_results
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while listing DAGs: {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return []

pause_dag(ssh_client, dag_id)

Pauses a specified DAG in Airflow.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required
dag_id str

The ID of the DAG to pause.

required

Returns:

Name Type Description
bool bool

True if the operation was successful; otherwise, False.

Examples:

success = ApacheUtils.AirFlow().pause_dag(ssh_client, "TestDagId")

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def pause_dag(self, ssh_client, dag_id: str) -> bool:
    """
    Pauses a specified DAG in Airflow.

    Args:
        ssh_client: SSH client object used for connection.
        dag_id (str): The ID of the DAG to pause.

    Returns:
        bool: True if the operation was successful; otherwise, False.

    Examples:
        >> success = ApacheUtils.AirFlow().pause_dag(ssh_client, "TestDagId")

    """
    try:
        commands = [f"airflow dags pause {dag_id}"]
        self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=2048,
            out_buffer=2048,
        )
        self.logger.info("DAG %s paused successfully.", dag_id)
        return True
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while pausing DAG '{dag_id}': {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return False

trigger_dag(ssh_client, dag_id, in_buffer=2048, out_buffer=2048)

Triggers the AirFlow DAG and returns a list of results from AirFlow.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required
dag_id str

AirFlow DAG ID (case sensitive).

required
in_buffer int

Input buffer size.

2048
out_buffer int

Output buffer size.

2048

Returns:

Name Type Description
list list

Contains results from execution of AirFlow DAG.

Examples:

results = ApacheUtils.AirFlow().trigger_dag(ssh_client, 'TestDagId')

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def trigger_dag(
        self, ssh_client, dag_id, in_buffer=2048, out_buffer=2048
) -> list:
    """
    Triggers the AirFlow DAG and returns a list of results from AirFlow.

    Args:
        ssh_client: SSH client object used for connection.
        dag_id (str): AirFlow DAG ID (case sensitive).
        in_buffer (int): Input buffer size.
        out_buffer (int): Output buffer size.

    Returns:
        list: Contains results from execution of AirFlow DAG.

    Examples:
        >> results = ApacheUtils.AirFlow().trigger_dag(ssh_client, 'TestDagId')
    """
    try:
        commands = [f"airflow trigger_dag {dag_id}"]
        dag_results = self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=in_buffer,
            out_buffer=out_buffer,
        )

        return dag_results
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while triggering DAG '{dag_id}': {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return []

unpause_dag(ssh_client, dag_id)

Unpauses a specified DAG in Airflow.

Parameters:

Name Type Description Default
ssh_client

SSH client object used for connection.

required
dag_id str

The ID of the DAG to unpause.

required

Returns:

Name Type Description
bool bool

True if the operation was successful; otherwise, False.

Examples:

success = ApacheUtils.AirFlow().unpause_dag(ssh_client, "TestDagId")

Source code in libs\cafex_core\src\cafex_core\utils\apache_air_flow_utils.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def unpause_dag(self, ssh_client, dag_id: str) -> bool:
    """
    Unpauses a specified DAG in Airflow.

    Args:
        ssh_client: SSH client object used for connection.
        dag_id (str): The ID of the DAG to unpause.

    Returns:
        bool: True if the operation was successful; otherwise, False.

    Examples:
        >> success = ApacheUtils.AirFlow().unpause_dag(ssh_client, "TestDagId")

    """
    try:
        commands = [f"airflow dags unpause {dag_id}"]
        self.__ssh_handler.execute(
            ssh_client,
            commands,
            in_buffer=2048,
            out_buffer=2048,
        )
        self.logger.info("DAG %s unpaused successfully.", dag_id)
        return True
    except (SSHException, ValueError) as e:
        error_message = f"Error occurred while unpausing DAG '{dag_id}': {str(e)}"
        self.__obj_exception.raise_generic_exception(
            message=error_message,
            insert_report=True,
            trim_log=True,
            log_local=True,
            fail_test=False,
        )
        return False