Skip to content

mongo_utils

MongoDBUtils

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class MongoDBUtils:

    def __init__(
        self, pstr_username=None, pstr_password=None, pstr_cluster_url=None, pstr_database_name=None
    ):
        """Initializes the MongoDBUtils class."""
        self.__obj_db_decrytor = DBSecurity()
        self.logger = CoreLogger(name=__name__).get_logger()
        self.client = self.__mongo_connect(
            pstr_username, pstr_password, pstr_cluster_url, pstr_database_name
        )

    def __mongo_connect(self, pstr_username, pstr_password, pstr_cluster_url, pstr_database_name):
        """Establishes a connection to a MongoDB database.

        Parameters:
        pstr_username (str): The username for MongoDB authentication.
        pstr_password (str): The pwd for MongoDB authentication.
        pstr_cluster_url (str): The MongoDB cluster URL.
        pstr_database_name (str): The name of the database to connect to.

        Returns:
        MongoClient: A MongoClient instance connected to the specified database.

        Raises:
        Exception: If an error occurs during the connection process.
        """
        try:
            self.client = self.__obj_db_decrytor.establish_mongodb_connection(
                pstr_username, pstr_password, pstr_cluster_url, pstr_database_name
            )
            if self.client:
                self.client.admin.command("ping")
                self.logger.info("MongoDB connection established successfully.")
                return self.client
        except Exception as e:
            self.logger.exception("Error occurred in mongo_connect: " + str(e))
            raise Exception(e)

    def mongo_execute(self, database_name, collection_name):
        """Executes a query to retrieve documents from a specified collection
        in a MongoDB database.

        Parameters:
        database_name (str): The name of the database to query.
        collection_name (str): The name of the collection (table) to query.

        Returns:
        dict: The first document retrieved from the collection.

        Raises:
        Exception: If an error occurs during the query execution.
        """
        try:
            db = self.client[database_name]
            collection = db[collection_name]
            for document in collection.find():
                return document
        except Exception as e:
            self.client.close()
            self.logger.exception("Error occurred in mongo_execute: " + str(e))
            raise Exception(e)

    def mongo_execute_parameters(self, database_name, collection_name, projection, query=None):
        """Executes a query to retrieve documents from a specified collection
        in a MongoDB database with a given projection.

        Parameters:
        database_name (str): The name of the database to query.
        collection_name (str): The name of the collection (table) to query.
        projection (dict): The fields to include or exclude in the returned documents.
        query (dict): The query to filter the documents.

        Returns:
        list: The documents retrieved from the collection.

        Raises:
        Exception: If an error occurs during the query execution.
        """
        try:
            result = []
            db = self.client[database_name]
            collection = db[collection_name]
            if query is None:
                query = {}
            documents = collection.find(query, projection)
            for document in documents:
                result.append(document)
            return result
        except Exception as e:
            self.client.close()
            self.logger.exception("Error occurred in mongo_execute_parameters: " + str(e))
            raise Exception(e)

    def get_mongo_records_based_on_aggregate_query(
        self, pstr_database_name, pstr_collection_name, aggregate_query, allowDiskUse=False
    ):
        """Retrieves the list of records from a MongoDB view based on an
        aggregate query.

        Parameters:
        pstr_database_name (str): The name of the database to query.
        pstr_collection_name (str): The name of the collection (table) to query.
        aggregate_query (dict): The aggregate query to filter the documents.
        allowDiskUse (bool): The flag to allow disk use for the query.By default, it is False.

        Returns:
        list: The list of documents retrieved from the collection.
        """
        try:
            db = self.client[pstr_database_name]
            collection = db[pstr_collection_name]
            if not allowDiskUse:
                record_list = list(collection.aggregate(aggregate_query))
            else:
                record_list = list(collection.aggregate(aggregate_query, allowDiskUse=allowDiskUse))
            return record_list
        except Exception as e:
            self.client.close()
            self.logger.exception(
                f"Error occurred in get_mongo_records_based_on_aggregate_query: {e}"
            )
            raise Exception(e)

    def get_collection_count(self, database_name, collection_name):
        """Retrieves the count of documents in a specified collection in a
        MongoDB database.

        Parameters:
        database_name (str): The name of the database to query.
        collection_name (str): The name of the collection (table) to query.

        Returns:
        int: The count of documents in the specified collection.

        Raises:
        Exception: If an error occurs during the query execution.
        """
        try:
            db = self.client[database_name]
            count = db[collection_name].count_documents({})
            return count
        except Exception as e:
            self.client.close()
            self.logger.exception("Error occurred in get_collection_count: " + str(e))
            raise Exception(e)

    def get_specific_document_from_collection(self, database_name, collection_name, query):
        """Retrieves a specific document from a collection in a MongoDB
        database.

        Parameters:
        database_name (str): The name of the database to query.
        collection_name (str): The name of the collection (table) to query.
        query (dict): The query to filter the documents.

        Returns:
        dict: The document that matches the query.

        Raises:
        Exception: If an error occurs during the query execution.
        """
        try:
            db = self.client[database_name]
            document = db[collection_name].find_one(query)
            return document
        except Exception as e:
            self.client.close()
            self.logger.exception(
                "Error occurred in get_specific_document_from_collection: " + str(e)
            )
            raise Exception(e)

    def close_connection(self):
        """Closes the connection to the MongoDB database.

        Returns:
        None
        """
        try:
            if self.client:
                self.client.close()
        except Exception as e:
            self.logger.exception("Error occurred in close_connection: " + str(e))
            raise Exception(e)

__init__(pstr_username=None, pstr_password=None, pstr_cluster_url=None, pstr_database_name=None)

Initializes the MongoDBUtils class.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
 7
 8
 9
10
11
12
13
14
15
def __init__(
    self, pstr_username=None, pstr_password=None, pstr_cluster_url=None, pstr_database_name=None
):
    """Initializes the MongoDBUtils class."""
    self.__obj_db_decrytor = DBSecurity()
    self.logger = CoreLogger(name=__name__).get_logger()
    self.client = self.__mongo_connect(
        pstr_username, pstr_password, pstr_cluster_url, pstr_database_name
    )

__mongo_connect(pstr_username, pstr_password, pstr_cluster_url, pstr_database_name)

Establishes a connection to a MongoDB database.

Parameters: pstr_username (str): The username for MongoDB authentication. pstr_password (str): The pwd for MongoDB authentication. pstr_cluster_url (str): The MongoDB cluster URL. pstr_database_name (str): The name of the database to connect to.

Returns: MongoClient: A MongoClient instance connected to the specified database.

Raises: Exception: If an error occurs during the connection process.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __mongo_connect(self, pstr_username, pstr_password, pstr_cluster_url, pstr_database_name):
    """Establishes a connection to a MongoDB database.

    Parameters:
    pstr_username (str): The username for MongoDB authentication.
    pstr_password (str): The pwd for MongoDB authentication.
    pstr_cluster_url (str): The MongoDB cluster URL.
    pstr_database_name (str): The name of the database to connect to.

    Returns:
    MongoClient: A MongoClient instance connected to the specified database.

    Raises:
    Exception: If an error occurs during the connection process.
    """
    try:
        self.client = self.__obj_db_decrytor.establish_mongodb_connection(
            pstr_username, pstr_password, pstr_cluster_url, pstr_database_name
        )
        if self.client:
            self.client.admin.command("ping")
            self.logger.info("MongoDB connection established successfully.")
            return self.client
    except Exception as e:
        self.logger.exception("Error occurred in mongo_connect: " + str(e))
        raise Exception(e)

close_connection()

Closes the connection to the MongoDB database.

Returns: None

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
178
179
180
181
182
183
184
185
186
187
188
189
def close_connection(self):
    """Closes the connection to the MongoDB database.

    Returns:
    None
    """
    try:
        if self.client:
            self.client.close()
    except Exception as e:
        self.logger.exception("Error occurred in close_connection: " + str(e))
        raise Exception(e)

get_collection_count(database_name, collection_name)

Retrieves the count of documents in a specified collection in a MongoDB database.

Parameters: database_name (str): The name of the database to query. collection_name (str): The name of the collection (table) to query.

Returns: int: The count of documents in the specified collection.

Raises: Exception: If an error occurs during the query execution.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_collection_count(self, database_name, collection_name):
    """Retrieves the count of documents in a specified collection in a
    MongoDB database.

    Parameters:
    database_name (str): The name of the database to query.
    collection_name (str): The name of the collection (table) to query.

    Returns:
    int: The count of documents in the specified collection.

    Raises:
    Exception: If an error occurs during the query execution.
    """
    try:
        db = self.client[database_name]
        count = db[collection_name].count_documents({})
        return count
    except Exception as e:
        self.client.close()
        self.logger.exception("Error occurred in get_collection_count: " + str(e))
        raise Exception(e)

get_mongo_records_based_on_aggregate_query(pstr_database_name, pstr_collection_name, aggregate_query, allowDiskUse=False)

Retrieves the list of records from a MongoDB view based on an aggregate query.

Parameters: pstr_database_name (str): The name of the database to query. pstr_collection_name (str): The name of the collection (table) to query. aggregate_query (dict): The aggregate query to filter the documents. allowDiskUse (bool): The flag to allow disk use for the query.By default, it is False.

Returns: list: The list of documents retrieved from the collection.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def get_mongo_records_based_on_aggregate_query(
    self, pstr_database_name, pstr_collection_name, aggregate_query, allowDiskUse=False
):
    """Retrieves the list of records from a MongoDB view based on an
    aggregate query.

    Parameters:
    pstr_database_name (str): The name of the database to query.
    pstr_collection_name (str): The name of the collection (table) to query.
    aggregate_query (dict): The aggregate query to filter the documents.
    allowDiskUse (bool): The flag to allow disk use for the query.By default, it is False.

    Returns:
    list: The list of documents retrieved from the collection.
    """
    try:
        db = self.client[pstr_database_name]
        collection = db[pstr_collection_name]
        if not allowDiskUse:
            record_list = list(collection.aggregate(aggregate_query))
        else:
            record_list = list(collection.aggregate(aggregate_query, allowDiskUse=allowDiskUse))
        return record_list
    except Exception as e:
        self.client.close()
        self.logger.exception(
            f"Error occurred in get_mongo_records_based_on_aggregate_query: {e}"
        )
        raise Exception(e)

get_specific_document_from_collection(database_name, collection_name, query)

Retrieves a specific document from a collection in a MongoDB database.

Parameters: database_name (str): The name of the database to query. collection_name (str): The name of the collection (table) to query. query (dict): The query to filter the documents.

Returns: dict: The document that matches the query.

Raises: Exception: If an error occurs during the query execution.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def get_specific_document_from_collection(self, database_name, collection_name, query):
    """Retrieves a specific document from a collection in a MongoDB
    database.

    Parameters:
    database_name (str): The name of the database to query.
    collection_name (str): The name of the collection (table) to query.
    query (dict): The query to filter the documents.

    Returns:
    dict: The document that matches the query.

    Raises:
    Exception: If an error occurs during the query execution.
    """
    try:
        db = self.client[database_name]
        document = db[collection_name].find_one(query)
        return document
    except Exception as e:
        self.client.close()
        self.logger.exception(
            "Error occurred in get_specific_document_from_collection: " + str(e)
        )
        raise Exception(e)

mongo_execute(database_name, collection_name)

Executes a query to retrieve documents from a specified collection in a MongoDB database.

Parameters: database_name (str): The name of the database to query. collection_name (str): The name of the collection (table) to query.

Returns: dict: The first document retrieved from the collection.

Raises: Exception: If an error occurs during the query execution.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def mongo_execute(self, database_name, collection_name):
    """Executes a query to retrieve documents from a specified collection
    in a MongoDB database.

    Parameters:
    database_name (str): The name of the database to query.
    collection_name (str): The name of the collection (table) to query.

    Returns:
    dict: The first document retrieved from the collection.

    Raises:
    Exception: If an error occurs during the query execution.
    """
    try:
        db = self.client[database_name]
        collection = db[collection_name]
        for document in collection.find():
            return document
    except Exception as e:
        self.client.close()
        self.logger.exception("Error occurred in mongo_execute: " + str(e))
        raise Exception(e)

mongo_execute_parameters(database_name, collection_name, projection, query=None)

Executes a query to retrieve documents from a specified collection in a MongoDB database with a given projection.

Parameters: database_name (str): The name of the database to query. collection_name (str): The name of the collection (table) to query. projection (dict): The fields to include or exclude in the returned documents. query (dict): The query to filter the documents.

Returns: list: The documents retrieved from the collection.

Raises: Exception: If an error occurs during the query execution.

Source code in libs\cafex_db\src\cafex_db\mongo_utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def mongo_execute_parameters(self, database_name, collection_name, projection, query=None):
    """Executes a query to retrieve documents from a specified collection
    in a MongoDB database with a given projection.

    Parameters:
    database_name (str): The name of the database to query.
    collection_name (str): The name of the collection (table) to query.
    projection (dict): The fields to include or exclude in the returned documents.
    query (dict): The query to filter the documents.

    Returns:
    list: The documents retrieved from the collection.

    Raises:
    Exception: If an error occurs during the query execution.
    """
    try:
        result = []
        db = self.client[database_name]
        collection = db[collection_name]
        if query is None:
            query = {}
        documents = collection.find(query, projection)
        for document in documents:
            result.append(document)
        return result
    except Exception as e:
        self.client.close()
        self.logger.exception("Error occurred in mongo_execute_parameters: " + str(e))
        raise Exception(e)