| # Copyright (c) 2023 The Regents of the University of California |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer; |
| # redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution; |
| # neither the name of the copyright holders nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| import json |
| from bson import json_util |
| from api.client import Client |
| from pymongo.errors import ConnectionFailure, ConfigurationError |
| from pymongo import MongoClient |
| from typing import Dict, List |
| import pymongo |
| |
| |
| class DatabaseConnectionError(Exception): |
| "Raised for failure to connect to MongoDB client" |
| pass |
| |
| |
| class MongoDBClient(Client): |
| def __init__(self, mongo_uri, database_name, collection_name): |
| super().__init__() |
| self.mongo_uri = mongo_uri |
| self.collection_name = collection_name |
| self.database_name = database_name |
| self.collection = self._get_database( |
| mongo_uri, database_name, collection_name |
| ) |
| |
| def _get_database( |
| self, |
| mongo_uri: str, |
| database_name: str, |
| collection_name: str, |
| ) -> pymongo.collection.Collection: |
| """ |
| This function returns a MongoDB database object for the specified |
| collection. |
| It takes three arguments: 'mongo_uri', 'database_name', and |
| 'collection_name'. |
| |
| :param: mongo_uri: URI of the MongoDB instance |
| :param: database_name: Name of the database |
| :param: collection_name: Name of the collection |
| :return: database: MongoDB database object |
| """ |
| |
| try: |
| client = MongoClient(mongo_uri) |
| client.admin.command("ping") |
| except ConnectionFailure: |
| client.close() |
| raise DatabaseConnectionError( |
| "Could not connect to MongoClient with given URI!" |
| ) |
| except ConfigurationError as e: |
| raise DatabaseConnectionError(e) |
| |
| database = client[database_name] |
| if database.name not in client.list_database_names(): |
| raise DatabaseConnectionError("Database Does not Exist!") |
| |
| collection = database[collection_name] |
| if collection.name not in database.list_collection_names(): |
| raise DatabaseConnectionError("Collection Does not Exist!") |
| |
| return collection |
| |
| def find_resource(self, query: Dict) -> Dict: |
| """ |
| Find a resource in the database |
| |
| :param query: JSON object with id and resource_version |
| :return: json_resource: JSON object with request resource or |
| error message |
| """ |
| if "resource_version" not in query or query["resource_version"] == "": |
| resource = ( |
| self.collection.find({"id": query["id"]}, {"_id": 0}) |
| .sort("resource_version", -1) |
| .limit(1) |
| ) |
| else: |
| resource = ( |
| self.collection.find( |
| { |
| "id": query["id"], |
| "resource_version": query["resource_version"], |
| }, |
| {"_id": 0}, |
| ) |
| .sort("resource_version", -1) |
| .limit(1) |
| ) |
| json_resource = json_util.dumps(resource) |
| res = json.loads(json_resource) |
| if res == []: |
| return {"exists": False} |
| return res[0] |
| |
| def update_resource(self, query: Dict) -> Dict[str, str]: |
| """ |
| This function updates a resource in the database by first checking if |
| the resource version in the request matches the resource version |
| stored in the database. |
| If they match, the resource is updated in the database. If they do not |
| match, the update is rejected. |
| |
| :param: query: JSON object with original_resource and the |
| updated resource |
| :return: json_response: JSON object with status message |
| """ |
| original_resource = query["original_resource"] |
| modified_resource = query["resource"] |
| try: |
| self.collection.replace_one( |
| { |
| "id": original_resource["id"], |
| "resource_version": original_resource["resource_version"], |
| }, |
| modified_resource, |
| ) |
| except Exception as e: |
| print(e) |
| return {"status": "Resource does not exist"} |
| return {"status": "Updated"} |
| |
| def get_versions(self, query: Dict) -> List[Dict]: |
| """ |
| This function retrieves all versions of a resource with the given ID |
| from the database. |
| It takes two arguments, the database object and a JSON object |
| containing the 'id' key of the resource to be retrieved. |
| |
| :param: query: JSON object with id |
| :return: json_resource: JSON object with all resource versions |
| """ |
| versions = self.collection.find( |
| {"id": query["id"]}, {"resource_version": 1, "_id": 0} |
| ).sort("resource_version", -1) |
| # convert to json |
| res = json_util.dumps(versions) |
| return json_util.loads(res) |
| |
| def delete_resource(self, query: Dict) -> Dict[str, str]: |
| """ |
| This function deletes a resource from the database by first checking |
| if the resource version in the request matches the resource version |
| stored in the database. |
| If they match, the resource is deleted from the database. If they do |
| not match, the delete operation is rejected |
| |
| :param: query: JSON object with id and resource_version |
| :return: json_response: JSON object with status message |
| """ |
| self.collection.delete_one( |
| {"id": query["id"], "resource_version": query["resource_version"]} |
| ) |
| return {"status": "Deleted"} |
| |
| def insert_resource(self, query: Dict) -> Dict[str, str]: |
| """ |
| This function inserts a new resource into the database using the |
| 'insert_one' method of the MongoDB client. |
| The function takes two arguments, the database object and the JSON |
| object representing the new resource to be inserted. |
| |
| :param: json: JSON object representing the new resource to be inserted |
| :return: json_response: JSON object with status message |
| """ |
| try: |
| self.collection.insert_one(query) |
| except Exception as e: |
| return {"status": "Resource already exists"} |
| return {"status": "Inserted"} |
| |
| def check_resource_exists(self, query: Dict) -> Dict: |
| """ |
| This function checks if a resource exists in the database by searching |
| for a resource with a matching 'id' and 'resource_version' in |
| the database. |
| The function takes two arguments, the database object and a JSON object |
| containing the 'id' and 'resource_version' keys. |
| |
| :param: json: JSON object with id and resource_version |
| :return: json_response: JSON object with boolean 'exists' key |
| """ |
| resource = ( |
| self.collection.find( |
| { |
| "id": query["id"], |
| "resource_version": query["resource_version"], |
| }, |
| {"_id": 0}, |
| ) |
| .sort("resource_version", -1) |
| .limit(1) |
| ) |
| json_resource = json_util.dumps(resource) |
| res = json.loads(json_resource) |
| if res == []: |
| return {"exists": False} |
| return {"exists": True} |
| |
| def save_session(self) -> Dict: |
| """ |
| This function saves the client session to a dictionary. |
| :return: A dictionary containing the client session. |
| """ |
| session = { |
| "client": "mongodb", |
| "uri": self.mongo_uri, |
| "database": self.database_name, |
| "collection": self.collection_name, |
| } |
| return session |