Skip to content

database_handler

This module provides the DatabaseConnection class for establishing connections to various database types.

DatabaseConnection

Bases: DatabaseOperations

Establishes database connections for various database types.

Source code in libs\cafex_db\src\cafex_db\database_handler.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class DatabaseConnection(DatabaseOperations):
    """Establishes database connections for various database types."""

    SUPPORTED_DATABASE_TYPES = [
        "mssql",
        "mysql",
        "oracle",
        "hive",
        "PostgreSQL",
        "cassandra",
        "spark",
        "ec2_hive",
        "SQLite",
    ]

    def __init__(self):
        """Initializes the DatabaseConnection class."""
        super().__init__()
        self.__obj_db_decrypter = DBSecurity()
        self.logger = CoreLogger(name=__name__).get_logger()
        self.__obj_db_exception = DBExceptions()

    def create_db_connection(self, database_type, server_name, **kwargs):
        """Creates a database connection based on the provided parameters.

        Parameters:
            database_type (str): The type of database. Supported types:
                'mssql', 'mysql', 'oracle', 'hive', 'PostgreSQL',
                'cassandra', 'spark', 'ec2_hive','SQLite'.
            server_name (str): The name or IP address of the database server.
            **kwargs: Additional keyword arguments specific to each database type.

        Keyword Arguments:
            database_name (str): The name of the database.
            username (str): The username for authentication.
            password (str): The password for authentication.
            port_number (int): The port number of the database server.
            is_password_encoded (bool): Whether the password is URL encoded.
                                        Defaults to False.
            pem_file (str): Path to the SSH key file (Hive).
            secret_key (str): Secret key for encoded passwords.
            sid (str): The Oracle SID.
            service_name (str): The Oracle service name.
            encoding (str): Encoding format (Oracle).
            control_timeout (int/float): Query timeout (Cassandra, MSSQL).

        Returns:
            object: The database connection object.
            None: If an error occurs or the database type is not supported.

        Raises:
            ValueError: For invalid database types or missing required parameters.
            DatabaseConnectionError: For connection errors.

        Examples:
            >>> db_conn = DatabaseConnection()
            >>> conn = db_conn.create_db_connection(
            ...     'mssql', 'dbserver', database_name='my_db',
            ...     username='user', password='password'
            ... )
            >>> db_conn.create_db_connection(
            ...     'oracle', 'dbhost', username='user', password='password',
            ...     port_number=1521, service_name='my_service'
            ... )
            >>> db_conn.create_db_connection(
            ...     'hive', 'hiveserver', username='user',
            ...     pem_file='/path/to/key.pem'
            ... )
        """
        database_type = database_type.lower()

        if database_type not in self.SUPPORTED_DATABASE_TYPES:
            self.__obj_db_exception.raise_null_database_type()

        if server_name is None:
            self.__obj_db_exception.raise_null_server_name()

        if database_type in ["mssql", "mysql", "oracle", "postgres", "cassandra"]:
            if kwargs.get("username") is not None and kwargs.get("password") is None:
                self.__obj_db_exception.raise_null_password()

        is_password_encoded = kwargs.get("is_password_encoded")
        secret_key = kwargs.get("secret_key")

        if is_password_encoded and secret_key is None:
            if database_type.lower() not in ["mssql", "mysql", "oracle", "postgres"]:
                self.__obj_db_exception.raise_null_secret_key()

        if database_type in ["hive", "spark", "ec2_hive"]:
            password = kwargs.get("password")
            pem_file = kwargs.get("pem_file")
            if password is None and pem_file is None:
                self.__obj_db_exception.raise_generic_exception(
                    "For hive/Spark/ec2_hive, Password and pem file cannot be null"
                )

        try:
            if database_type == "mssql":
                connection = self._create_mssql_connection(server_name, **kwargs)
            elif database_type == "mysql":
                connection = self._create_mysql_connection(server_name, **kwargs)
            elif database_type == "oracle":
                connection = self._create_oracle_connection(server_name, **kwargs)
            elif database_type == "hive":
                connection = self._create_hive_connection(server_name, **kwargs)
            elif database_type == "PostgreSQL":
                connection = self._create_postgresql_connection(server_name, **kwargs)
            elif database_type == "cassandra":
                connection = self._create_cassandra_connection(server_name, **kwargs)
            else:
                raise ValueError(f"Database type not yet implemented: {database_type}")

            return connection

        except ValueError as ve:
            self.logger.error("ValueError creating %s connection: %s", database_type, ve)
            return None
        except (ConnectionError, TimeoutError) as e:
            self.logger.error("Connection error creating %s connection: %s", database_type, e)
            return None
        except Exception as e:  # pylint: disable=broad-except
            self.logger.error("Error creating %s connection: %s", database_type, e)
            return None

    def _create_mssql_connection(self, server_name, **kwargs):
        """Creates and returns an MSSQL connection object.
        Description:
            |  This method deals with creating a connection to mssql database.

        :return: Database Connection Object

        .. notes::
                |  For input parameters, refer to the method description of create_db_connection
                |  Autocommit is set to True
        """
        try:
            mssql_connection = self.__obj_db_decrypter.mssql(server_name, **kwargs)
            return mssql_connection
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

    def _create_mysql_connection(self, server_name, **kwargs):
        """Creates and returns a MySQL connection object.
        Description:
            |  This method deals with creating a connection to mysql database

        :return: Database Connection Object

        .. notes::
         |  For input parameters, refer to the method description of create_db_connection

        """
        try:
            mysql_connection = self.__obj_db_decrypter.mysql(server_name, **kwargs)
            return mysql_connection
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

    def _create_oracle_connection(self, server_name, **kwargs):
        """
        Description:
            |  This method deals with creating a connection to Oracle database

        :return: Database Connection Object

        . notes::
            |  For input parameters, refer to the method description of create_db_connection


        . warning::
            |  Oracle instant client is a must be available on the host where this method is called.
            |  Refer to the documentation at https://cx-oracle.readthedocs.io/en/latest/installation.html#installing-cx-oracle-on-windows to understand the setup process.
            |  More information different oracle encoding can be seen at https://www.oracle.com/database/technologies/faq-nls-lang.html.

        Examples:
            |  mi_data_ingestion.mi_establish_connection(server,pstr_encoding="utf8")

        """
        try:
            oracle_connection = self.__obj_db_decrypter.oracle(server_name, **kwargs)
            return oracle_connection
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

    def _create_hive_connection(self, server_name, **kwargs):
        """
        Description:
            |  This method deals with creating a connection to HIVE database using the SSH protocol
        :return: Database Connection Object

        .. notes::
            |  For input parameters, refer to the method description of create_db_connection
        """
        try:
            hive_client = self.__obj_db_decrypter.hive_connection(server_name, **kwargs)
            return hive_client
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

    def _create_postgresql_connection(self, server_name, **kwargs):
        """
        Description:
            |  This method deals with creating a connection to postgres database

        :param pint_port_number: port
        :type pint_port_number: kwargs
        :return: Database Connection Object

        .. notes::
                |  For input parameters, refer to the method description of create_db_connection
        """
        try:
            postgres_connection = self.__obj_db_decrypter.postgres(server_name, **kwargs)
            return postgres_connection
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

    def _create_cassandra_connection(self, server_name, **kwargs):
        """
        Description:
            |  This method deals with creating a connection to cassandra database

        :return: Database Connection Object

        . notes::
            |  For input parameters, refer to the method description of create_db_connection
        """
        try:
            cassandra_connection = self.__obj_db_decrypter.cassandra_connection(
                server_name, **kwargs
            )
            return cassandra_connection
        except Exception as e:  # pylint: disable=broad-except
            self.__obj_db_exception.raise_generic_exception(str(e))
            return None

__init__()

Initializes the DatabaseConnection class.

Source code in libs\cafex_db\src\cafex_db\database_handler.py
25
26
27
28
29
30
def __init__(self):
    """Initializes the DatabaseConnection class."""
    super().__init__()
    self.__obj_db_decrypter = DBSecurity()
    self.logger = CoreLogger(name=__name__).get_logger()
    self.__obj_db_exception = DBExceptions()

create_db_connection(database_type, server_name, **kwargs)

Creates a database connection based on the provided parameters.

Parameters:

Name Type Description Default
database_type str

The type of database. Supported types: 'mssql', 'mysql', 'oracle', 'hive', 'PostgreSQL', 'cassandra', 'spark', 'ec2_hive','SQLite'.

required
server_name str

The name or IP address of the database server.

required
**kwargs

Additional keyword arguments specific to each database type.

{}

Other Parameters:

Name Type Description
database_name str

The name of the database.

username str

The username for authentication.

password str

The password for authentication.

port_number int

The port number of the database server.

is_password_encoded bool

Whether the password is URL encoded. Defaults to False.

pem_file str

Path to the SSH key file (Hive).

secret_key str

Secret key for encoded passwords.

sid str

The Oracle SID.

service_name str

The Oracle service name.

encoding str

Encoding format (Oracle).

control_timeout int / float

Query timeout (Cassandra, MSSQL).

Returns:

Name Type Description
object

The database connection object.

None

If an error occurs or the database type is not supported.

Raises:

Type Description
ValueError

For invalid database types or missing required parameters.

DatabaseConnectionError

For connection errors.

Examples:

>>> db_conn = DatabaseConnection()
>>> conn = db_conn.create_db_connection(
...     'mssql', 'dbserver', database_name='my_db',
...     username='user', password='password'
... )
>>> db_conn.create_db_connection(
...     'oracle', 'dbhost', username='user', password='password',
...     port_number=1521, service_name='my_service'
... )
>>> db_conn.create_db_connection(
...     'hive', 'hiveserver', username='user',
...     pem_file='/path/to/key.pem'
... )
Source code in libs\cafex_db\src\cafex_db\database_handler.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def create_db_connection(self, database_type, server_name, **kwargs):
    """Creates a database connection based on the provided parameters.

    Parameters:
        database_type (str): The type of database. Supported types:
            'mssql', 'mysql', 'oracle', 'hive', 'PostgreSQL',
            'cassandra', 'spark', 'ec2_hive','SQLite'.
        server_name (str): The name or IP address of the database server.
        **kwargs: Additional keyword arguments specific to each database type.

    Keyword Arguments:
        database_name (str): The name of the database.
        username (str): The username for authentication.
        password (str): The password for authentication.
        port_number (int): The port number of the database server.
        is_password_encoded (bool): Whether the password is URL encoded.
                                    Defaults to False.
        pem_file (str): Path to the SSH key file (Hive).
        secret_key (str): Secret key for encoded passwords.
        sid (str): The Oracle SID.
        service_name (str): The Oracle service name.
        encoding (str): Encoding format (Oracle).
        control_timeout (int/float): Query timeout (Cassandra, MSSQL).

    Returns:
        object: The database connection object.
        None: If an error occurs or the database type is not supported.

    Raises:
        ValueError: For invalid database types or missing required parameters.
        DatabaseConnectionError: For connection errors.

    Examples:
        >>> db_conn = DatabaseConnection()
        >>> conn = db_conn.create_db_connection(
        ...     'mssql', 'dbserver', database_name='my_db',
        ...     username='user', password='password'
        ... )
        >>> db_conn.create_db_connection(
        ...     'oracle', 'dbhost', username='user', password='password',
        ...     port_number=1521, service_name='my_service'
        ... )
        >>> db_conn.create_db_connection(
        ...     'hive', 'hiveserver', username='user',
        ...     pem_file='/path/to/key.pem'
        ... )
    """
    database_type = database_type.lower()

    if database_type not in self.SUPPORTED_DATABASE_TYPES:
        self.__obj_db_exception.raise_null_database_type()

    if server_name is None:
        self.__obj_db_exception.raise_null_server_name()

    if database_type in ["mssql", "mysql", "oracle", "postgres", "cassandra"]:
        if kwargs.get("username") is not None and kwargs.get("password") is None:
            self.__obj_db_exception.raise_null_password()

    is_password_encoded = kwargs.get("is_password_encoded")
    secret_key = kwargs.get("secret_key")

    if is_password_encoded and secret_key is None:
        if database_type.lower() not in ["mssql", "mysql", "oracle", "postgres"]:
            self.__obj_db_exception.raise_null_secret_key()

    if database_type in ["hive", "spark", "ec2_hive"]:
        password = kwargs.get("password")
        pem_file = kwargs.get("pem_file")
        if password is None and pem_file is None:
            self.__obj_db_exception.raise_generic_exception(
                "For hive/Spark/ec2_hive, Password and pem file cannot be null"
            )

    try:
        if database_type == "mssql":
            connection = self._create_mssql_connection(server_name, **kwargs)
        elif database_type == "mysql":
            connection = self._create_mysql_connection(server_name, **kwargs)
        elif database_type == "oracle":
            connection = self._create_oracle_connection(server_name, **kwargs)
        elif database_type == "hive":
            connection = self._create_hive_connection(server_name, **kwargs)
        elif database_type == "PostgreSQL":
            connection = self._create_postgresql_connection(server_name, **kwargs)
        elif database_type == "cassandra":
            connection = self._create_cassandra_connection(server_name, **kwargs)
        else:
            raise ValueError(f"Database type not yet implemented: {database_type}")

        return connection

    except ValueError as ve:
        self.logger.error("ValueError creating %s connection: %s", database_type, ve)
        return None
    except (ConnectionError, TimeoutError) as e:
        self.logger.error("Connection error creating %s connection: %s", database_type, e)
        return None
    except Exception as e:  # pylint: disable=broad-except
        self.logger.error("Error creating %s connection: %s", database_type, e)
        return None