Skip to content

database_operations

This module provides the DatabaseOperations class for various operations on querying Database.

DatabaseOperations

This class provides a collection of methods for performing database operations.

It includes functionalities for executing SQL queries, handling result sets, managing database connections, and comparing data between different sources.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
class DatabaseOperations:
    """This class provides a collection of methods for performing database
    operations.

    It includes functionalities for executing SQL queries, handling
    result sets, managing database connections, and comparing data
    between different sources.
    """

    def __init__(self):
        """Initializes the DatabaseOperations class."""
        self.__obj_db_decrypter = DBSecurity()
        self.logger_obj = CoreLogger(name=__name__).get_logger()
        self.__obj_db_exception = DBExceptions()

    def execute_statement(
        self,
        object_connection: Any,
        sql_query: str,
        str_return_type: str = "list",
    ) -> Union[List[Dict[str, Any]], Any, bool]:
        """Execute a SQL statement.

        Args:
            object_connection: Database connection object.
            sql_query: SQL query to execute.
            str_return_type: Return type, either 'list' or 'resultset'.

        Returns:
            List or ResultSet or Boolean
        """
        try:
            result = object_connection.execute(sql_query)
            if str_return_type == "list":
                return [dict(row) for row in result]
            if str_return_type == "resultset":
                return result
            return result.rowcount > 0
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def hive_execute_statement(
        self, object_hiveclient: Any, p_query: str, str_return_type: str = "list"
    ) -> Union[List[Any], Any, bool]:
        """Execute a HIVE database query statement using spark-sql.

        Args:
            object_hiveclient: Hive client object.
            p_query: Query to execute.
            str_return_type: Return type, either 'list' or 'resultset'
        Returns:
            List or ResultSet or Boolean
        """
        try:
            result = object_hiveclient.sql(p_query)
            if str_return_type == "list":
                return result.collect()
            if str_return_type == "resultset":
                return result
            return result.count() > 0
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def execute_query_from_file(self, object_connection: Any, pstr_filepath: str) -> Any:
        """Execute a SQL query from a file.

        Args:
            object_connection: Database connection object.
            pstr_filepath: File path of the SQL file.

        Returns:
            ResultSet
        """
        try:
            with open(pstr_filepath, "r", encoding="utf-8") as file:
                sql_query = file.read()
            return object_connection.execute(sql_query)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def close(self, object_connection: Any) -> None:
        """Close the database connection.

        Args:
            object_connection: Database connection object.
        """
        try:
            object_connection.close()
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def check_table_exists(
        self,
        object_connection: Any,
        str_database_name: str,
        str_table_name: str,
        **kwargs,
    ) -> bool:
        """Check if a table exists in the database.

        Args:
            object_connection: Database connection object.
            str_database_name: Database name.
            str_table_name: Table name.
            kwargs: Additional arguments.

        Returns:
            Boolean indicating if the table exists.
        """
        try:
            query = (
                "SELECT * FROM information_schema.tables "
                f"WHERE table_schema = '{str_database_name}' "
                f"AND table_name = '{str_table_name}'"
            )
            result = object_connection.execute(query)
            return result.rowcount > 0
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def cassandra_resultset_to_list(self, prs_cassandra_resultset: Any) -> List[Any]:
        """Convert a Cassandra resultset to a list.

        Args:
            prs_cassandra_resultset: Cassandra resultset object.

        Returns:
            List representation of the resultset.
        """
        try:
            return list(prs_cassandra_resultset)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def is_rowcount_zero(self, plist_resultlist: List[Any]) -> bool:
        """Check if the row count in the result list is zero.

        Args:
            plist_resultlist: Result list containing column headers and row values.

        Returns:
            Boolean indicating if the row count is zero.
        """
        try:
            return len(plist_resultlist) == 0
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def get_rowcount(self, plist_resultlist: List[Any]) -> int:
        """Get the row count from the result list.

        Args:
            plist_resultlist: Result list containing column headers and row values.

        Returns:
            Row count.
        """
        try:
            return len(plist_resultlist)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def get_headers(self, plist_resultlist: List[Dict[str, Any]]) -> List[str]:
        """Get the headers from the result list.

        Args:
            plist_resultlist: Result list.

        Returns:
            List of headers.
        """
        try:
            return list(plist_resultlist[0].keys()) if plist_resultlist else []
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def resultset_to_list(
        self, prs_resultset: Any, pbool_include_headers: bool = True
    ) -> List[Any]:
        """Convert a resultset to a list.

        Args:
            prs_resultset: ResultSet object.
            pbool_include_headers: Whether to include headers in the list.

        Returns:
            List representation of the resultset.
        """
        try:
            result_list = [dict(row) for row in prs_resultset]
            if pbool_include_headers and result_list:
                result_list.insert(0, list(result_list[0].keys()))
            return result_list
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def check_value_exists_in_column(
        self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str, pstr_value: Any
    ) -> bool:
        """Check if a value exists in a specific column of the result list.

        Args:
            plist_resultlist: Result list.
            pstr_column_header: Column header to check.
            pstr_value: Value to check for.

        Returns:
            Boolean indicating if the value exists in the column.
        """
        try:
            return any(row[pstr_column_header] == pstr_value for row in plist_resultlist)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def check_value_not_exists_in_column(
        self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str, pstr_value: Any
    ) -> bool:
        """Check if a value does not exist in a specific column of the result
        list.

        Args:
            plist_resultlist: Result list.
            pstr_column_header: Column header to check.
            pstr_value: Value to check for.

        Returns:
            Boolean indicating if the value does not exist in the column.
        """
        try:
            return all(row[pstr_column_header] != pstr_value for row in plist_resultlist)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def get_column_data(
        self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str
    ) -> List[Any]:
        """Get data from a specific column in the result list.

        Args:
            plist_resultlist: Result list.
            pstr_column_header: Column header to get data from.

        Returns:
            List of data from the column.
        """
        try:
            return [row[pstr_column_header] for row in plist_resultlist]
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def compare_resultlists(
        self,
        plist_resultlist1: List[Dict[str, Any]],
        plist_resultlist2: List[Dict[str, Any]],
    ) -> Dict[str, List[Dict[str, Any]]]:
        """Compare two result lists.

        Args:
            plist_resultlist1: First result list.
            plist_resultlist2: Second result list.
            kwargs: Additional arguments.

        Returns:
            Comparison result.
        """
        try:
            set1 = set(tuple(row.items()) for row in plist_resultlist1)
            set2 = set(tuple(row.items()) for row in plist_resultlist2)
            return {
                "only_in_list1": [dict(items) for items in set1 - set2],
                "only_in_list2": [dict(items) for items in set2 - set1],
                "in_both": [dict(items) for items in set1 & set2],
            }
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def data_frame_diff(
        self, pdf_dataframe_source: pd.DataFrame, pdf_dataframe_destinition: pd.DataFrame, **kwargs
    ) -> pd.DataFrame:
        """Compare two dataframes and return the differences.

        Args:
            pdf_dataframe_source: Source dataframe.
            pdf_dataframe_destinition: Destination dataframe.
            kwargs: Additional arguments.

        Returns:
            Dataframe with differences.
        """
        try:
            diff = pd.concat([pdf_dataframe_source, pdf_dataframe_destinition]).drop_duplicates(
                keep=False
            )
            return diff
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def dataframe_to_resultset(self, pdf_dataframe: pd.DataFrame) -> List[Dict[str, Any]]:
        """Convert a dataframe to a resultset.

        Args:
            pdf_dataframe: Dataframe to convert.

        Returns:
            ResultSet
        """
        try:
            return pdf_dataframe.to_dict(orient="records")
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def file_to_resultlist(
        self, pstr_filepath: str, pstr_filetype: str = "csv", **kwargs
    ) -> List[Dict[str, Any]]:
        """Convert data in a file to a result list.

        Args:
            pstr_filepath: File path.
            pstr_filetype: File type, either 'csv', 'excel', or 'json'.
            kwargs: Additional arguments.

        Returns:
            Result list.
        """
        try:
            if pstr_filetype == "csv":
                return pd.read_csv(pstr_filepath, **kwargs).to_dict(orient="records")
            if pstr_filetype == "excel":
                return pd.read_excel(pstr_filepath, **kwargs).to_dict(orient="records")
            if pstr_filetype == "json":
                with open(pstr_filepath, "r", encoding="utf-8") as file:
                    return json.load(file)
            raise ValueError("Unsupported file type")
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def resultlist_to_csv(self, plist_resultlist: List[Dict[str, Any]], pstr_filepath: str) -> None:
        """Write a result list to a CSV file.

        Args:
            plist_resultlist: Result list.
            pstr_filepath: File path to save the CSV.
        """
        try:
            pd.DataFrame(plist_resultlist).to_csv(pstr_filepath, index=False)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def dataframe_to_csv(self, pdf_dataframe: pd.DataFrame, pstr_path: str) -> None:
        """Write a dataframe to a CSV file.

        Args:
            pdf_dataframe: Dataframe to write.
            pstr_path: File path to save the CSV.
        """
        try:
            pdf_dataframe.to_csv(pstr_path, index=False)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def list_to_dataframe(self, plist_data: List[Any]) -> pd.DataFrame:
        """Convert a list to a dataframe.

        Args:
            plist_data: List to convert.

        Returns:
            Dataframe
        """
        try:
            return pd.DataFrame(plist_data)
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def compare_dataframes_distinct(
        self, pdf_dataframe_source: pd.DataFrame, pdf_dataframe_target: pd.DataFrame, **kwargs
    ) -> pd.DataFrame:
        """Compare two dataframes and return distinct differences.

        Args:
            pdf_dataframe_source: Source dataframe.
            pdf_dataframe_target: Target dataframe.
            kwargs: Additional arguments.

        Returns:
            Dataframe with distinct differences.
        """
        try:
            diff = pd.concat([pdf_dataframe_source, pdf_dataframe_target]).drop_duplicates(
                keep=False
            )
            return diff
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def check_data_in_column(
        self, plist_resultlist: List[Dict[str, Any]], pdata_value: Any, **kwargs
    ) -> bool:
        """Check if a value is present in a column of the result list.

        Args:
            plist_resultlist: Result list.
            pdata_value: Value to check for.
            kwargs: Additional arguments.

        Returns:
            Boolean indicating if the value is present.
        """
        try:
            return any(pdata_value in row.values() for row in plist_resultlist)
        except Exception as e:  # pylint: disable=broad-exception-caught
            raise e

    def check_headers_in_resultlist(
        self, plist_resultlist: List[Dict[str, Any]], plist_headers: List[str], **kwargs
    ) -> List[str]:
        """Check if headers are present in the result list.

        Args:
            plist_resultlist: Result list.
            plist_headers: List of headers to check for.
            kwargs: Additional arguments.

        Returns:
            List of unmatched headers.
        """
        try:
            result_headers = set(self.get_headers(plist_resultlist))
            return [header for header in plist_headers if header not in result_headers]
        except Exception as e:  # pylint: disable=broad-exception-caught
            raise e

    def spark_execute_statement(
        self,
        pobject_sparkclient: Any,
        p_query: str,
        pbool_default_spark_command: bool,
        pstr_name: str,
        str_return_type: str = "list",
        **kwargs,
    ) -> Union[List[Any], Any, bool]:
        """Execute a HIVE database query statement using spark-sql.

        Args:
            pobject_sparkclient: Spark-Sql Database Connection Object.
            p_query: Query to execute.
            pbool_default_spark_command: If True, default spark command is used.
            pstr_name: Spark config name.
            str_return_type: Return type, either 'list' or 'resultset'.
            kwargs: Additional arguments.

        Returns:
            List or ResultSet or Boolean.
        """
        try:
            result = pobject_sparkclient.sql(p_query)
            if str_return_type == "list":
                return result.collect()
            if str_return_type == "resultset":
                return result
            return result.count() > 0
        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(str(e))

    def check_db_exists(self, pobject_connection: Any, pstr_database_name: str) -> bool:
        """Check if a database exists in a particular DB server.

        Args:
            pobject_connection: Connection object provided by the user.
            pstr_database_name: DB name provided by the user.

        Returns:
            bool: True if database exists, False otherwise.

        Raises:
            DBExceptions: If connection object or database name is None.
            Exception: If the database type is not supported.
        """
        try:
            if pobject_connection is None:
                self.__obj_db_exception.raise_null_database_object()
            if pstr_database_name is None:
                self.__obj_db_exception.raise_null_database_name()

            if pobject_connection.engine.name == "postgresql":
                sql_query = (
                    f"SELECT * FROM pg_catalog.pg_database WHERE datname='{pstr_database_name}'"
                )
                rs_resultset = self.execute_statement(
                    pobject_connection, sql_query, str_return_type="resultset"
                )
                return rs_resultset.rowcount != 0

            if pobject_connection.engine.name == "mssql":
                sql_query = f"SELECT * FROM sys.databases WHERE name='{pstr_database_name}'"
                rs_resultset = self.execute_statement(
                    pobject_connection, sql_query, str_return_type="resultset"
                )
                return rs_resultset.rowcount != 0

            if pobject_connection.engine.name == "oracle":
                sql_query = (
                    f"SELECT owner FROM all_tab_columns WHERE Owner ='{pstr_database_name.upper()}'"
                )
                lst_result = self.execute_statement(
                    pobject_connection, sql_query, str_return_type="list"
                )
                return len(lst_result) >= 2

            raise self.__obj_db_exception.raise_generic_exception(
                message=f"Error -> Provided parameters are not supported",
                trim_log=True,
                fail_test=False,
            )

        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(
                message=f"Error -> DB not connected properly: {str(e)}",
                trim_log=True,
                fail_test=False,
            )

    def verify_column_metadata(
        self,
        pobj_connection: Any,
        pstr_db_name: str,
        str_table_name: str,
        pstr_col_name: str,
        **kwargs,
    ) -> Dict[str, Any]:
        """Verify metadata for a column in a database table.

        Args:
            pobj_connection: Database connection object.
            pstr_db_name: Database name.
            str_table_name: Table name.
            pstr_col_name: Column name.
            kwargs: Additional arguments for checking existence, data type, constraints, etc.

        Returns:
            Dictionary containing metadata information.
        """
        try:
            dict_result: Dict[str, Any] = {}
            if pobj_connection.engine.name not in ("mssql", "oracle", "postgresql"):
                raise self.__obj_db_exception.raise_generic_exception(
                    message=f"Error -> Provided parameters are not supported",
                    trim_log=True,
                    fail_test=False,
                )

            if "bln_col_existence" in kwargs:
                dict_result["bln_col_existence"] = self._check_column_existence(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "pstr_data_type" in kwargs:
                dict_result["pstr_data_type"] = self._get_column_data_type(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "pstr_max_length" in kwargs:
                dict_result["pstr_max_length"] = self._get_column_max_length(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "bln_is_null" in kwargs:
                dict_result["bln_is_null"] = self._check_column_nullable(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "bln_is_not_null" in kwargs:
                dict_result["bln_is_not_null"] = not self._check_column_nullable(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "bln_is_primary_key" in kwargs:
                dict_result["bln_is_primary_key"] = self._check_column_primary_key(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            if "bln_is_not_primary_key" in kwargs:
                dict_result["bln_is_not_primary_key"] = not self._check_column_primary_key(
                    pobj_connection, pstr_db_name, str_table_name, pstr_col_name
                )

            return dict_result

        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(
                message=f"Error -> Column information is not given properly: {str(e)}",
                trim_log=True,
                fail_test=False,
            )

    def _check_column_existence(
        self, pobj_connection: Any, pstr_db_name: str, str_table_name: str, pstr_col_name: str
    ) -> bool:
        """Helper method to check column existence."""
        if pobj_connection.engine.name == "oracle":
            lst_result = self.execute_statement(
                pobj_connection,
                f"SELECT * FROM all_tab_columns WHERE table_name = '{str_table_name.upper()}' "
                f"AND column_name ='{pstr_col_name.upper()}' AND owner='{pstr_db_name.upper()}'",
                str_return_type="list",
            )
            return len(lst_result) >= 2

        if pobj_connection.engine.name == "postgresql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{str_table_name}' "
                f"AND COLUMN_NAME = '{pstr_col_name}' AND table_schema='{pstr_db_name}'",
                str_return_type="resultset",
            )
            return rs_resultset.rowcount != 0

        if pobj_connection.engine.name == "mssql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"USE {pstr_db_name} SELECT * FROM INFORMATION_SCHEMA.COLUMNS "
                f"WHERE TABLE_NAME = '{str_table_name}' AND COLUMN_NAME = '{pstr_col_name}'",
                str_return_type="resultset",
            )
            return rs_resultset.rowcount != 0

        return False

    def _get_column_data_type(
        self, pobj_connection: Any, pstr_db_name: str, str_table_name: str, pstr_col_name: str
    ) -> Optional[str]:
        """Helper method to get column data type."""
        if pobj_connection.engine.name == "oracle":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT data_type FROM all_tab_columns WHERE table_name = '{str_table_name.upper()}' "
                f"AND column_name ='{pstr_col_name.upper()}' AND owner='{pstr_db_name.upper()}'",
                str_return_type="resultset",
            )
            pstr_datatype = rs_resultset.fetchall()
            return pstr_datatype[0][0] if pstr_datatype[0][0] else None

        if pobj_connection.engine.name == "postgresql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{str_table_name}' "
                f"AND COLUMN_NAME = '{pstr_col_name}' AND table_schema='{pstr_db_name}'",
                str_return_type="resultset",
            )
            pstr_datatype = rs_resultset.fetchall()
            return pstr_datatype[0][0] if pstr_datatype[0][0] else None

        if pobj_connection.engine.name == "mssql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"USE {pstr_db_name} SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS "
                f"WHERE TABLE_NAME = '{str_table_name}' AND COLUMN_NAME = '{pstr_col_name}'",
                str_return_type="resultset",
            )
            pstr_datatype = rs_resultset.fetchall()
            return pstr_datatype[0][0] if pstr_datatype[0][0] else None

        return None

    def _get_column_max_length(
        self, pobj_connection: Any, pstr_db_name: str, str_table_name: str, pstr_col_name: str
    ) -> Optional[int]:
        """Helper method to get column max length."""
        if pobj_connection.engine.name == "postgresql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT character_maximum_length FROM INFORMATION_SCHEMA.COLUMNS "
                f"WHERE TABLE_NAME = '{str_table_name}' AND COLUMN_NAME = '{pstr_col_name}' "
                f"AND table_schema='{pstr_db_name}'",
                str_return_type="resultset",
            )
            pstr_maxlength = rs_resultset.fetchall()
            return int(pstr_maxlength[0][0]) if pstr_maxlength[0][0] else None

        if pobj_connection.engine.name == "mssql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"USE {pstr_db_name} SELECT * FROM sys.columns WHERE name=N'{pstr_col_name}' "
                f"AND OBJECT_ID = OBJECT_ID(N'{str_table_name}')",
                str_return_type="resultset",
            )
            pstr_maxlength = rs_resultset.fetchall()
            return int(pstr_maxlength[0][5]) if pstr_maxlength[0][5] else None

        if pobj_connection.engine.name == "oracle":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT data_length FROM all_tab_columns WHERE table_name = '{str_table_name.upper()}' "
                f"AND column_name ='{pstr_col_name.upper()}' AND owner='{pstr_db_name.upper()}'",
                str_return_type="resultset",
            )
            pstr_maxlength = rs_resultset.fetchall()
            return int(pstr_maxlength[0][0]) if pstr_maxlength[0][0] else None

        return None

    def _check_column_nullable(
        self, pobj_connection: Any, pstr_db_name: str, str_table_name: str, pstr_col_name: str
    ) -> bool:
        """Helper method to check column nullable constraint."""
        if pobj_connection.engine.name == "oracle":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT nullable FROM all_tab_columns WHERE table_name = '{str_table_name.upper()}' "
                f"AND column_name ='{pstr_col_name.upper()}' AND owner='{pstr_db_name.upper()}'",
                str_return_type="resultset",
            )
            bln_isnull = rs_resultset.fetchall()
            return bln_isnull[0][0] == "Y"

        if pobj_connection.engine.name == "postgresql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT is_nullable FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{str_table_name}' "
                f"AND COLUMN_NAME = '{pstr_col_name}' AND table_schema='{pstr_db_name}'",
                str_return_type="resultset",
            )
            bln_isnull = rs_resultset.fetchall()
            return bln_isnull[0][0] == "YES"

        if pobj_connection.engine.name == "mssql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"USE {pstr_db_name} SELECT is_nullable FROM INFORMATION_SCHEMA.COLUMNS "
                f"WHERE TABLE_NAME = '{str_table_name}' AND COLUMN_NAME = '{pstr_col_name}'",
                str_return_type="resultset",
            )
            bln_isnull = rs_resultset.fetchall()
            return bln_isnull[0][0] == "YES"

        return False

    def _check_column_primary_key(
        self, pobj_connection: Any, pstr_db_name: str, str_table_name: str, pstr_col_name: str
    ) -> bool:
        """Helper method to check column primary key constraint."""
        if pobj_connection.engine.name == "oracle":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT cols.table_name, cols.column_name, cols.position, cons.status, cons.owner "
                f"FROM all_constraints cons, all_cons_columns cols WHERE cols.table_name ='{str_table_name.upper()}' "
                f"AND cols.column_name ='{pstr_col_name.upper()}' AND cons.constraint_type = 'P' "
                f"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner "
                f"ORDER BY cols.table_name, cols.position",
                str_return_type="list",
            )
            return len(rs_resultset) >= 2

        if pobj_connection.engine.name == "postgresql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"SELECT K.TABLE_NAME, C.CONSTRAINT_TYPE, K.COLUMN_NAME, K.CONSTRAINT_NAME "
                f"FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS C JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS K "
                f"ON C.TABLE_NAME = K.TABLE_NAME AND C.CONSTRAINT_CATALOG = K.CONSTRAINT_CATALOG "
                f"AND C.CONSTRAINT_SCHEMA = K.CONSTRAINT_SCHEMA AND C.CONSTRAINT_NAME = K.CONSTRAINT_NAME "
                f"WHERE C.CONSTRAINT_TYPE = 'PRIMARY KEY' AND K.Table_Name='{str_table_name}' "
                f"AND K.Column_name='{pstr_col_name}' AND K.table_schema='{pstr_db_name}' "
                f"ORDER BY K.TABLE_NAME, C.CONSTRAINT_TYPE, K.CONSTRAINT_NAME",
                str_return_type="resultset",
            )
            return rs_resultset.rowcount != 0

        if pobj_connection.engine.name == "mssql":
            rs_resultset = self.execute_statement(
                pobj_connection,
                f"USE {pstr_db_name} SELECT K.TABLE_NAME, C.CONSTRAINT_TYPE, K.COLUMN_NAME, K.CONSTRAINT_NAME "
                f"FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS C JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS K "
                f"ON C.TABLE_NAME = K.TABLE_NAME AND C.CONSTRAINT_CATALOG = K.CONSTRAINT_CATALOG "
                f"AND C.CONSTRAINT_SCHEMA = K.CONSTRAINT_SCHEMA AND C.CONSTRAINT_NAME = K.CONSTRAINT_NAME "
                f"WHERE C.CONSTRAINT_TYPE = 'PRIMARY KEY' AND K.Table_Name='{str_table_name}' "
                f"AND K.Column_name='{pstr_col_name}' ORDER BY K.TABLE_NAME, C.CONSTRAINT_TYPE, K.CONSTRAINT_NAME",
                str_return_type="resultset",
            )
            return rs_resultset.rowcount != 0

        return False

    def modify_sql_query(
        self, pfile_path: str, pdict_data_dictionary: Dict[str, str], **kwargs
    ) -> str:
        """Modify SQL queries residing inside .sql, .txt, or .json files at
        runtime.

        Args:
            pfile_path: File path of the file that will be edited.
            pdict_data_dictionary: Dictionary in the format {'unique_identifiers1': 'actual_value1'}.
            kwargs: Additional arguments (e.g., pstr_key for JSON files).

        Returns:
            The query with unique identifiers replaced by actual values.
        """
        try:
            if pfile_path is None:
                self.__obj_db_exception.raise_null_filepath()
            if os.stat(pfile_path).st_size == 0:
                self.__obj_db_exception.raise_null_query()
            if pdict_data_dictionary is None:
                raise Exception("Dictionary passed is None. There is nothing to replace")

            if pfile_path.endswith((".sql", ".txt")):
                with open(pfile_path, "r", encoding="utf-8") as f:
                    str_query = f.read()

            elif pfile_path.endswith(".json"):
                if "pstr_key" not in kwargs:
                    raise Exception("Error-->Key missing from parameters for json file")

                json_key = str(kwargs.get("pstr_key"))
                with open(pfile_path, "r", encoding="utf-8") as f:
                    str_json = f.read()

                try:
                    pstr_json = json.loads(str_json)
                    str_query = pstr_json[json_key]
                except Exception as e:
                    raise Exception(f"Not a valid json file or invalid key-->{str(e)}") from e

            else:
                raise Exception("Accepted file formats are .sql, .txt & .json")

            for key, value in pdict_data_dictionary.items():
                if key in str_query:
                    str_query = str_query.replace(key, value)
                else:
                    raise Exception(f"Unique Identifier-->{key} not found in the query")

            return str_query

        except Exception as e:  # pylint: disable=broad-exception-caught
            self.__obj_db_exception.raise_generic_exception(
                message=f"Exception occurred in method--> modify_sql_query-->: {str(e)}",
                trim_log=True,
                fail_test=False,
            )

__init__()

Initializes the DatabaseOperations class.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
25
26
27
28
29
def __init__(self):
    """Initializes the DatabaseOperations class."""
    self.__obj_db_decrypter = DBSecurity()
    self.logger_obj = CoreLogger(name=__name__).get_logger()
    self.__obj_db_exception = DBExceptions()

cassandra_resultset_to_list(prs_cassandra_resultset)

Convert a Cassandra resultset to a list.

Parameters:

Name Type Description Default
prs_cassandra_resultset Any

Cassandra resultset object.

required

Returns:

Type Description
List[Any]

List representation of the resultset.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
136
137
138
139
140
141
142
143
144
145
146
147
148
def cassandra_resultset_to_list(self, prs_cassandra_resultset: Any) -> List[Any]:
    """Convert a Cassandra resultset to a list.

    Args:
        prs_cassandra_resultset: Cassandra resultset object.

    Returns:
        List representation of the resultset.
    """
    try:
        return list(prs_cassandra_resultset)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

check_data_in_column(plist_resultlist, pdata_value, **kwargs)

Check if a value is present in a column of the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
pdata_value Any

Value to check for.

required
kwargs

Additional arguments.

{}

Returns:

Type Description
bool

Boolean indicating if the value is present.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def check_data_in_column(
    self, plist_resultlist: List[Dict[str, Any]], pdata_value: Any, **kwargs
) -> bool:
    """Check if a value is present in a column of the result list.

    Args:
        plist_resultlist: Result list.
        pdata_value: Value to check for.
        kwargs: Additional arguments.

    Returns:
        Boolean indicating if the value is present.
    """
    try:
        return any(pdata_value in row.values() for row in plist_resultlist)
    except Exception as e:  # pylint: disable=broad-exception-caught
        raise e

check_db_exists(pobject_connection, pstr_database_name)

Check if a database exists in a particular DB server.

Parameters:

Name Type Description Default
pobject_connection Any

Connection object provided by the user.

required
pstr_database_name str

DB name provided by the user.

required

Returns:

Name Type Description
bool bool

True if database exists, False otherwise.

Raises:

Type Description
DBExceptions

If connection object or database name is None.

Exception

If the database type is not supported.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def check_db_exists(self, pobject_connection: Any, pstr_database_name: str) -> bool:
    """Check if a database exists in a particular DB server.

    Args:
        pobject_connection: Connection object provided by the user.
        pstr_database_name: DB name provided by the user.

    Returns:
        bool: True if database exists, False otherwise.

    Raises:
        DBExceptions: If connection object or database name is None.
        Exception: If the database type is not supported.
    """
    try:
        if pobject_connection is None:
            self.__obj_db_exception.raise_null_database_object()
        if pstr_database_name is None:
            self.__obj_db_exception.raise_null_database_name()

        if pobject_connection.engine.name == "postgresql":
            sql_query = (
                f"SELECT * FROM pg_catalog.pg_database WHERE datname='{pstr_database_name}'"
            )
            rs_resultset = self.execute_statement(
                pobject_connection, sql_query, str_return_type="resultset"
            )
            return rs_resultset.rowcount != 0

        if pobject_connection.engine.name == "mssql":
            sql_query = f"SELECT * FROM sys.databases WHERE name='{pstr_database_name}'"
            rs_resultset = self.execute_statement(
                pobject_connection, sql_query, str_return_type="resultset"
            )
            return rs_resultset.rowcount != 0

        if pobject_connection.engine.name == "oracle":
            sql_query = (
                f"SELECT owner FROM all_tab_columns WHERE Owner ='{pstr_database_name.upper()}'"
            )
            lst_result = self.execute_statement(
                pobject_connection, sql_query, str_return_type="list"
            )
            return len(lst_result) >= 2

        raise self.__obj_db_exception.raise_generic_exception(
            message=f"Error -> Provided parameters are not supported",
            trim_log=True,
            fail_test=False,
        )

    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(
            message=f"Error -> DB not connected properly: {str(e)}",
            trim_log=True,
            fail_test=False,
        )

check_headers_in_resultlist(plist_resultlist, plist_headers, **kwargs)

Check if headers are present in the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
plist_headers List[str]

List of headers to check for.

required
kwargs

Additional arguments.

{}

Returns:

Type Description
List[str]

List of unmatched headers.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def check_headers_in_resultlist(
    self, plist_resultlist: List[Dict[str, Any]], plist_headers: List[str], **kwargs
) -> List[str]:
    """Check if headers are present in the result list.

    Args:
        plist_resultlist: Result list.
        plist_headers: List of headers to check for.
        kwargs: Additional arguments.

    Returns:
        List of unmatched headers.
    """
    try:
        result_headers = set(self.get_headers(plist_resultlist))
        return [header for header in plist_headers if header not in result_headers]
    except Exception as e:  # pylint: disable=broad-exception-caught
        raise e

check_table_exists(object_connection, str_database_name, str_table_name, **kwargs)

Check if a table exists in the database.

Parameters:

Name Type Description Default
object_connection Any

Database connection object.

required
str_database_name str

Database name.

required
str_table_name str

Table name.

required
kwargs

Additional arguments.

{}

Returns:

Type Description
bool

Boolean indicating if the table exists.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def check_table_exists(
    self,
    object_connection: Any,
    str_database_name: str,
    str_table_name: str,
    **kwargs,
) -> bool:
    """Check if a table exists in the database.

    Args:
        object_connection: Database connection object.
        str_database_name: Database name.
        str_table_name: Table name.
        kwargs: Additional arguments.

    Returns:
        Boolean indicating if the table exists.
    """
    try:
        query = (
            "SELECT * FROM information_schema.tables "
            f"WHERE table_schema = '{str_database_name}' "
            f"AND table_name = '{str_table_name}'"
        )
        result = object_connection.execute(query)
        return result.rowcount > 0
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

check_value_exists_in_column(plist_resultlist, pstr_column_header, pstr_value)

Check if a value exists in a specific column of the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
pstr_column_header str

Column header to check.

required
pstr_value Any

Value to check for.

required

Returns:

Type Description
bool

Boolean indicating if the value exists in the column.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def check_value_exists_in_column(
    self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str, pstr_value: Any
) -> bool:
    """Check if a value exists in a specific column of the result list.

    Args:
        plist_resultlist: Result list.
        pstr_column_header: Column header to check.
        pstr_value: Value to check for.

    Returns:
        Boolean indicating if the value exists in the column.
    """
    try:
        return any(row[pstr_column_header] == pstr_value for row in plist_resultlist)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

check_value_not_exists_in_column(plist_resultlist, pstr_column_header, pstr_value)

Check if a value does not exist in a specific column of the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
pstr_column_header str

Column header to check.

required
pstr_value Any

Value to check for.

required

Returns:

Type Description
bool

Boolean indicating if the value does not exist in the column.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def check_value_not_exists_in_column(
    self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str, pstr_value: Any
) -> bool:
    """Check if a value does not exist in a specific column of the result
    list.

    Args:
        plist_resultlist: Result list.
        pstr_column_header: Column header to check.
        pstr_value: Value to check for.

    Returns:
        Boolean indicating if the value does not exist in the column.
    """
    try:
        return all(row[pstr_column_header] != pstr_value for row in plist_resultlist)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

close(object_connection)

Close the database connection.

Parameters:

Name Type Description Default
object_connection Any

Database connection object.

required
Source code in libs\cafex_db\src\cafex_db\database_operations.py
 96
 97
 98
 99
100
101
102
103
104
105
def close(self, object_connection: Any) -> None:
    """Close the database connection.

    Args:
        object_connection: Database connection object.
    """
    try:
        object_connection.close()
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

compare_dataframes_distinct(pdf_dataframe_source, pdf_dataframe_target, **kwargs)

Compare two dataframes and return distinct differences.

Parameters:

Name Type Description Default
pdf_dataframe_source DataFrame

Source dataframe.

required
pdf_dataframe_target DataFrame

Target dataframe.

required
kwargs

Additional arguments.

{}

Returns:

Type Description
DataFrame

Dataframe with distinct differences.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def compare_dataframes_distinct(
    self, pdf_dataframe_source: pd.DataFrame, pdf_dataframe_target: pd.DataFrame, **kwargs
) -> pd.DataFrame:
    """Compare two dataframes and return distinct differences.

    Args:
        pdf_dataframe_source: Source dataframe.
        pdf_dataframe_target: Target dataframe.
        kwargs: Additional arguments.

    Returns:
        Dataframe with distinct differences.
    """
    try:
        diff = pd.concat([pdf_dataframe_source, pdf_dataframe_target]).drop_duplicates(
            keep=False
        )
        return diff
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

compare_resultlists(plist_resultlist1, plist_resultlist2)

Compare two result lists.

Parameters:

Name Type Description Default
plist_resultlist1 List[Dict[str, Any]]

First result list.

required
plist_resultlist2 List[Dict[str, Any]]

Second result list.

required
kwargs

Additional arguments.

required

Returns:

Type Description
Dict[str, List[Dict[str, Any]]]

Comparison result.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def compare_resultlists(
    self,
    plist_resultlist1: List[Dict[str, Any]],
    plist_resultlist2: List[Dict[str, Any]],
) -> Dict[str, List[Dict[str, Any]]]:
    """Compare two result lists.

    Args:
        plist_resultlist1: First result list.
        plist_resultlist2: Second result list.
        kwargs: Additional arguments.

    Returns:
        Comparison result.
    """
    try:
        set1 = set(tuple(row.items()) for row in plist_resultlist1)
        set2 = set(tuple(row.items()) for row in plist_resultlist2)
        return {
            "only_in_list1": [dict(items) for items in set1 - set2],
            "only_in_list2": [dict(items) for items in set2 - set1],
            "in_both": [dict(items) for items in set1 & set2],
        }
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

data_frame_diff(pdf_dataframe_source, pdf_dataframe_destinition, **kwargs)

Compare two dataframes and return the differences.

Parameters:

Name Type Description Default
pdf_dataframe_source DataFrame

Source dataframe.

required
pdf_dataframe_destinition DataFrame

Destination dataframe.

required
kwargs

Additional arguments.

{}

Returns:

Type Description
DataFrame

Dataframe with differences.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def data_frame_diff(
    self, pdf_dataframe_source: pd.DataFrame, pdf_dataframe_destinition: pd.DataFrame, **kwargs
) -> pd.DataFrame:
    """Compare two dataframes and return the differences.

    Args:
        pdf_dataframe_source: Source dataframe.
        pdf_dataframe_destinition: Destination dataframe.
        kwargs: Additional arguments.

    Returns:
        Dataframe with differences.
    """
    try:
        diff = pd.concat([pdf_dataframe_source, pdf_dataframe_destinition]).drop_duplicates(
            keep=False
        )
        return diff
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

dataframe_to_csv(pdf_dataframe, pstr_path)

Write a dataframe to a CSV file.

Parameters:

Name Type Description Default
pdf_dataframe DataFrame

Dataframe to write.

required
pstr_path str

File path to save the CSV.

required
Source code in libs\cafex_db\src\cafex_db\database_operations.py
364
365
366
367
368
369
370
371
372
373
374
def dataframe_to_csv(self, pdf_dataframe: pd.DataFrame, pstr_path: str) -> None:
    """Write a dataframe to a CSV file.

    Args:
        pdf_dataframe: Dataframe to write.
        pstr_path: File path to save the CSV.
    """
    try:
        pdf_dataframe.to_csv(pstr_path, index=False)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

dataframe_to_resultset(pdf_dataframe)

Convert a dataframe to a resultset.

Parameters:

Name Type Description Default
pdf_dataframe DataFrame

Dataframe to convert.

required

Returns:

Type Description
List[Dict[str, Any]]

ResultSet

Source code in libs\cafex_db\src\cafex_db\database_operations.py
313
314
315
316
317
318
319
320
321
322
323
324
325
def dataframe_to_resultset(self, pdf_dataframe: pd.DataFrame) -> List[Dict[str, Any]]:
    """Convert a dataframe to a resultset.

    Args:
        pdf_dataframe: Dataframe to convert.

    Returns:
        ResultSet
    """
    try:
        return pdf_dataframe.to_dict(orient="records")
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

execute_query_from_file(object_connection, pstr_filepath)

Execute a SQL query from a file.

Parameters:

Name Type Description Default
object_connection Any

Database connection object.

required
pstr_filepath str

File path of the SQL file.

required

Returns:

Type Description
Any

ResultSet

Source code in libs\cafex_db\src\cafex_db\database_operations.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def execute_query_from_file(self, object_connection: Any, pstr_filepath: str) -> Any:
    """Execute a SQL query from a file.

    Args:
        object_connection: Database connection object.
        pstr_filepath: File path of the SQL file.

    Returns:
        ResultSet
    """
    try:
        with open(pstr_filepath, "r", encoding="utf-8") as file:
            sql_query = file.read()
        return object_connection.execute(sql_query)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

execute_statement(object_connection, sql_query, str_return_type='list')

Execute a SQL statement.

Parameters:

Name Type Description Default
object_connection Any

Database connection object.

required
sql_query str

SQL query to execute.

required
str_return_type str

Return type, either 'list' or 'resultset'.

'list'

Returns:

Type Description
Union[List[Dict[str, Any]], Any, bool]

List or ResultSet or Boolean

Source code in libs\cafex_db\src\cafex_db\database_operations.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def execute_statement(
    self,
    object_connection: Any,
    sql_query: str,
    str_return_type: str = "list",
) -> Union[List[Dict[str, Any]], Any, bool]:
    """Execute a SQL statement.

    Args:
        object_connection: Database connection object.
        sql_query: SQL query to execute.
        str_return_type: Return type, either 'list' or 'resultset'.

    Returns:
        List or ResultSet or Boolean
    """
    try:
        result = object_connection.execute(sql_query)
        if str_return_type == "list":
            return [dict(row) for row in result]
        if str_return_type == "resultset":
            return result
        return result.rowcount > 0
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

file_to_resultlist(pstr_filepath, pstr_filetype='csv', **kwargs)

Convert data in a file to a result list.

Parameters:

Name Type Description Default
pstr_filepath str

File path.

required
pstr_filetype str

File type, either 'csv', 'excel', or 'json'.

'csv'
kwargs

Additional arguments.

{}

Returns:

Type Description
List[Dict[str, Any]]

Result list.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def file_to_resultlist(
    self, pstr_filepath: str, pstr_filetype: str = "csv", **kwargs
) -> List[Dict[str, Any]]:
    """Convert data in a file to a result list.

    Args:
        pstr_filepath: File path.
        pstr_filetype: File type, either 'csv', 'excel', or 'json'.
        kwargs: Additional arguments.

    Returns:
        Result list.
    """
    try:
        if pstr_filetype == "csv":
            return pd.read_csv(pstr_filepath, **kwargs).to_dict(orient="records")
        if pstr_filetype == "excel":
            return pd.read_excel(pstr_filepath, **kwargs).to_dict(orient="records")
        if pstr_filetype == "json":
            with open(pstr_filepath, "r", encoding="utf-8") as file:
                return json.load(file)
        raise ValueError("Unsupported file type")
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

get_column_data(plist_resultlist, pstr_column_header)

Get data from a specific column in the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
pstr_column_header str

Column header to get data from.

required

Returns:

Type Description
List[Any]

List of data from the column.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def get_column_data(
    self, plist_resultlist: List[Dict[str, Any]], pstr_column_header: str
) -> List[Any]:
    """Get data from a specific column in the result list.

    Args:
        plist_resultlist: Result list.
        pstr_column_header: Column header to get data from.

    Returns:
        List of data from the column.
    """
    try:
        return [row[pstr_column_header] for row in plist_resultlist]
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

get_headers(plist_resultlist)

Get the headers from the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required

Returns:

Type Description
List[str]

List of headers.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
178
179
180
181
182
183
184
185
186
187
188
189
190
def get_headers(self, plist_resultlist: List[Dict[str, Any]]) -> List[str]:
    """Get the headers from the result list.

    Args:
        plist_resultlist: Result list.

    Returns:
        List of headers.
    """
    try:
        return list(plist_resultlist[0].keys()) if plist_resultlist else []
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

get_rowcount(plist_resultlist)

Get the row count from the result list.

Parameters:

Name Type Description Default
plist_resultlist List[Any]

Result list containing column headers and row values.

required

Returns:

Type Description
int

Row count.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
164
165
166
167
168
169
170
171
172
173
174
175
176
def get_rowcount(self, plist_resultlist: List[Any]) -> int:
    """Get the row count from the result list.

    Args:
        plist_resultlist: Result list containing column headers and row values.

    Returns:
        Row count.
    """
    try:
        return len(plist_resultlist)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

hive_execute_statement(object_hiveclient, p_query, str_return_type='list')

Execute a HIVE database query statement using spark-sql.

Parameters:

Name Type Description Default
object_hiveclient Any

Hive client object.

required
p_query str

Query to execute.

required
str_return_type str

Return type, either 'list' or 'resultset'

'list'

Returns: List or ResultSet or Boolean

Source code in libs\cafex_db\src\cafex_db\database_operations.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def hive_execute_statement(
    self, object_hiveclient: Any, p_query: str, str_return_type: str = "list"
) -> Union[List[Any], Any, bool]:
    """Execute a HIVE database query statement using spark-sql.

    Args:
        object_hiveclient: Hive client object.
        p_query: Query to execute.
        str_return_type: Return type, either 'list' or 'resultset'
    Returns:
        List or ResultSet or Boolean
    """
    try:
        result = object_hiveclient.sql(p_query)
        if str_return_type == "list":
            return result.collect()
        if str_return_type == "resultset":
            return result
        return result.count() > 0
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

is_rowcount_zero(plist_resultlist)

Check if the row count in the result list is zero.

Parameters:

Name Type Description Default
plist_resultlist List[Any]

Result list containing column headers and row values.

required

Returns:

Type Description
bool

Boolean indicating if the row count is zero.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
150
151
152
153
154
155
156
157
158
159
160
161
162
def is_rowcount_zero(self, plist_resultlist: List[Any]) -> bool:
    """Check if the row count in the result list is zero.

    Args:
        plist_resultlist: Result list containing column headers and row values.

    Returns:
        Boolean indicating if the row count is zero.
    """
    try:
        return len(plist_resultlist) == 0
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

list_to_dataframe(plist_data)

Convert a list to a dataframe.

Parameters:

Name Type Description Default
plist_data List[Any]

List to convert.

required

Returns:

Type Description
DataFrame

Dataframe

Source code in libs\cafex_db\src\cafex_db\database_operations.py
376
377
378
379
380
381
382
383
384
385
386
387
388
def list_to_dataframe(self, plist_data: List[Any]) -> pd.DataFrame:
    """Convert a list to a dataframe.

    Args:
        plist_data: List to convert.

    Returns:
        Dataframe
    """
    try:
        return pd.DataFrame(plist_data)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

modify_sql_query(pfile_path, pdict_data_dictionary, **kwargs)

Modify SQL queries residing inside .sql, .txt, or .json files at runtime.

Parameters:

Name Type Description Default
pfile_path str

File path of the file that will be edited.

required
pdict_data_dictionary Dict[str, str]

Dictionary in the format {'unique_identifiers1': 'actual_value1'}.

required
kwargs

Additional arguments (e.g., pstr_key for JSON files).

{}

Returns:

Type Description
str

The query with unique identifiers replaced by actual values.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
def modify_sql_query(
    self, pfile_path: str, pdict_data_dictionary: Dict[str, str], **kwargs
) -> str:
    """Modify SQL queries residing inside .sql, .txt, or .json files at
    runtime.

    Args:
        pfile_path: File path of the file that will be edited.
        pdict_data_dictionary: Dictionary in the format {'unique_identifiers1': 'actual_value1'}.
        kwargs: Additional arguments (e.g., pstr_key for JSON files).

    Returns:
        The query with unique identifiers replaced by actual values.
    """
    try:
        if pfile_path is None:
            self.__obj_db_exception.raise_null_filepath()
        if os.stat(pfile_path).st_size == 0:
            self.__obj_db_exception.raise_null_query()
        if pdict_data_dictionary is None:
            raise Exception("Dictionary passed is None. There is nothing to replace")

        if pfile_path.endswith((".sql", ".txt")):
            with open(pfile_path, "r", encoding="utf-8") as f:
                str_query = f.read()

        elif pfile_path.endswith(".json"):
            if "pstr_key" not in kwargs:
                raise Exception("Error-->Key missing from parameters for json file")

            json_key = str(kwargs.get("pstr_key"))
            with open(pfile_path, "r", encoding="utf-8") as f:
                str_json = f.read()

            try:
                pstr_json = json.loads(str_json)
                str_query = pstr_json[json_key]
            except Exception as e:
                raise Exception(f"Not a valid json file or invalid key-->{str(e)}") from e

        else:
            raise Exception("Accepted file formats are .sql, .txt & .json")

        for key, value in pdict_data_dictionary.items():
            if key in str_query:
                str_query = str_query.replace(key, value)
            else:
                raise Exception(f"Unique Identifier-->{key} not found in the query")

        return str_query

    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(
            message=f"Exception occurred in method--> modify_sql_query-->: {str(e)}",
            trim_log=True,
            fail_test=False,
        )

resultlist_to_csv(plist_resultlist, pstr_filepath)

Write a result list to a CSV file.

Parameters:

Name Type Description Default
plist_resultlist List[Dict[str, Any]]

Result list.

required
pstr_filepath str

File path to save the CSV.

required
Source code in libs\cafex_db\src\cafex_db\database_operations.py
352
353
354
355
356
357
358
359
360
361
362
def resultlist_to_csv(self, plist_resultlist: List[Dict[str, Any]], pstr_filepath: str) -> None:
    """Write a result list to a CSV file.

    Args:
        plist_resultlist: Result list.
        pstr_filepath: File path to save the CSV.
    """
    try:
        pd.DataFrame(plist_resultlist).to_csv(pstr_filepath, index=False)
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

resultset_to_list(prs_resultset, pbool_include_headers=True)

Convert a resultset to a list.

Parameters:

Name Type Description Default
prs_resultset Any

ResultSet object.

required
pbool_include_headers bool

Whether to include headers in the list.

True

Returns:

Type Description
List[Any]

List representation of the resultset.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def resultset_to_list(
    self, prs_resultset: Any, pbool_include_headers: bool = True
) -> List[Any]:
    """Convert a resultset to a list.

    Args:
        prs_resultset: ResultSet object.
        pbool_include_headers: Whether to include headers in the list.

    Returns:
        List representation of the resultset.
    """
    try:
        result_list = [dict(row) for row in prs_resultset]
        if pbool_include_headers and result_list:
            result_list.insert(0, list(result_list[0].keys()))
        return result_list
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

spark_execute_statement(pobject_sparkclient, p_query, pbool_default_spark_command, pstr_name, str_return_type='list', **kwargs)

Execute a HIVE database query statement using spark-sql.

Parameters:

Name Type Description Default
pobject_sparkclient Any

Spark-Sql Database Connection Object.

required
p_query str

Query to execute.

required
pbool_default_spark_command bool

If True, default spark command is used.

required
pstr_name str

Spark config name.

required
str_return_type str

Return type, either 'list' or 'resultset'.

'list'
kwargs

Additional arguments.

{}

Returns:

Type Description
Union[List[Any], Any, bool]

List or ResultSet or Boolean.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def spark_execute_statement(
    self,
    pobject_sparkclient: Any,
    p_query: str,
    pbool_default_spark_command: bool,
    pstr_name: str,
    str_return_type: str = "list",
    **kwargs,
) -> Union[List[Any], Any, bool]:
    """Execute a HIVE database query statement using spark-sql.

    Args:
        pobject_sparkclient: Spark-Sql Database Connection Object.
        p_query: Query to execute.
        pbool_default_spark_command: If True, default spark command is used.
        pstr_name: Spark config name.
        str_return_type: Return type, either 'list' or 'resultset'.
        kwargs: Additional arguments.

    Returns:
        List or ResultSet or Boolean.
    """
    try:
        result = pobject_sparkclient.sql(p_query)
        if str_return_type == "list":
            return result.collect()
        if str_return_type == "resultset":
            return result
        return result.count() > 0
    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(str(e))

verify_column_metadata(pobj_connection, pstr_db_name, str_table_name, pstr_col_name, **kwargs)

Verify metadata for a column in a database table.

Parameters:

Name Type Description Default
pobj_connection Any

Database connection object.

required
pstr_db_name str

Database name.

required
str_table_name str

Table name.

required
pstr_col_name str

Column name.

required
kwargs

Additional arguments for checking existence, data type, constraints, etc.

{}

Returns:

Type Description
Dict[str, Any]

Dictionary containing metadata information.

Source code in libs\cafex_db\src\cafex_db\database_operations.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def verify_column_metadata(
    self,
    pobj_connection: Any,
    pstr_db_name: str,
    str_table_name: str,
    pstr_col_name: str,
    **kwargs,
) -> Dict[str, Any]:
    """Verify metadata for a column in a database table.

    Args:
        pobj_connection: Database connection object.
        pstr_db_name: Database name.
        str_table_name: Table name.
        pstr_col_name: Column name.
        kwargs: Additional arguments for checking existence, data type, constraints, etc.

    Returns:
        Dictionary containing metadata information.
    """
    try:
        dict_result: Dict[str, Any] = {}
        if pobj_connection.engine.name not in ("mssql", "oracle", "postgresql"):
            raise self.__obj_db_exception.raise_generic_exception(
                message=f"Error -> Provided parameters are not supported",
                trim_log=True,
                fail_test=False,
            )

        if "bln_col_existence" in kwargs:
            dict_result["bln_col_existence"] = self._check_column_existence(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "pstr_data_type" in kwargs:
            dict_result["pstr_data_type"] = self._get_column_data_type(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "pstr_max_length" in kwargs:
            dict_result["pstr_max_length"] = self._get_column_max_length(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "bln_is_null" in kwargs:
            dict_result["bln_is_null"] = self._check_column_nullable(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "bln_is_not_null" in kwargs:
            dict_result["bln_is_not_null"] = not self._check_column_nullable(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "bln_is_primary_key" in kwargs:
            dict_result["bln_is_primary_key"] = self._check_column_primary_key(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        if "bln_is_not_primary_key" in kwargs:
            dict_result["bln_is_not_primary_key"] = not self._check_column_primary_key(
                pobj_connection, pstr_db_name, str_table_name, pstr_col_name
            )

        return dict_result

    except Exception as e:  # pylint: disable=broad-exception-caught
        self.__obj_db_exception.raise_generic_exception(
            message=f"Error -> Column information is not given properly: {str(e)}",
            trim_log=True,
            fail_test=False,
        )