blob: 6714c9c209b088b37e3737ae89599c8ead073443 [file] [log] [blame]
# Copyright (c) 2019-2021 The Regents of the University of California
# All Rights Reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""This file defines the ArtifactDB type and some common implementations of
ArtifactDB.
The database interface defined here does not include any schema information.
The database "schema" is defined in the artifact.py file based on the types of
artifacts stored in the database.
Some common queries can be found in common_queries.py
"""
from abc import ABC, abstractmethod
import copy
import json
import os
from pathlib import Path
import shutil
from typing import Any, Dict, Iterable, Union, Type, List, Tuple
from urllib.parse import urlparse
from uuid import UUID
try:
import gridfs # type: ignore
from pymongo import MongoClient # type: ignore
MONGO_SUPPORT = True
except ModuleNotFoundError:
# If pymongo isn't installed, then disable support for it
MONGO_SUPPORT = False
class ArtifactDB(ABC):
"""
Abstract base class for all artifact DBs.
"""
@abstractmethod
def __init__(self, uri: str) -> None:
"""Initialize the database with a URI"""
pass
@abstractmethod
def put(self, key: UUID, artifact: Dict[str, Union[str, UUID]]) -> None:
"""Insert the artifact into the database with the key"""
pass
@abstractmethod
def upload(self, key: UUID, path: Path) -> None:
"""Upload the file at path to the database with _id of key"""
pass
@abstractmethod
def __contains__(self, key: Union[UUID, str]) -> bool:
"""Key can be a UUID or a string. Returns true if item in DB"""
pass
@abstractmethod
def get(self, key: Union[UUID, str]) -> Dict[str, str]:
"""Key can be a UUID or a string. Returns a dictionary to construct
an artifact.
"""
pass
@abstractmethod
def downloadFile(self, key: UUID, path: Path) -> None:
"""Download the file with the _id key to the path. Will overwrite the
file if it currently exists."""
pass
def searchByName(self, name: str, limit: int) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some name. Note: Not all DB implementations will implement this
function"""
raise NotImplementedError()
def searchByType(self, typ: str, limit: int) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some type. Note: Not all DB implementations will implement this
function"""
raise NotImplementedError()
def searchByNameType(
self, name: str, typ: str, limit: int
) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some name and type. Note: Not all DB implementations will implement
this function"""
raise NotImplementedError()
def searchByLikeNameType(
self, name: str, typ: str, limit: int
) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some type and a regex name. Note: Not all DB implementations will
implement this function"""
raise NotImplementedError()
class ArtifactMongoDB(ArtifactDB):
"""
This is a mongodb database connector for storing Artifacts (as defined in
artifact.py).
This database stores the data in three collections:
- artifacts: This stores the json serialized Artifact class
- files and chunks: These two collections store the large files required
for some artifacts. Within the files collection, the _id is the
UUID of the artifact.
"""
def __init__(self, uri: str) -> None:
"""Initialize the mongodb connection and grab pointers to the databases
uri is the location of the database in a mongodb compatible form.
http://dochub.mongodb.org/core/connections.
"""
# Note: Need "connect=False" so that we don't connect until the first
# time we interact with the database. Required for the gem5 running
# celery server
self.db = MongoClient(host=uri, connect=False).artifact_database
self.artifacts = self.db.artifacts
self.fs = gridfs.GridFSBucket(self.db, disable_md5=True)
def put(self, key: UUID, artifact: Dict[str, Union[str, UUID]]) -> None:
"""Insert the artifact into the database with the key"""
assert artifact["_id"] == key
self.artifacts.insert_one(artifact)
def upload(self, key: UUID, path: Path) -> None:
"""Upload the file at path to the database with _id of key"""
with open(path, "rb") as f:
self.fs.upload_from_stream_with_id(key, str(path), f)
def __contains__(self, key: Union[UUID, str]) -> bool:
"""Key can be a UUID or a string. Returns true if item in DB"""
if isinstance(key, UUID):
count = self.artifacts.count_documents({"_id": key}, limit=1)
else:
# This is a hash. Count the number of matches
count = self.artifacts.count_documents({"hash": key}, limit=1)
return bool(count > 0)
def get(self, key: Union[UUID, str]) -> Dict[str, str]:
"""Key can be a UUID or a string. Returns a dictionary to construct
an artifact.
"""
if isinstance(key, UUID):
return self.artifacts.find_one({"_id": key}, limit=1)
else:
# This is a hash.
return self.artifacts.find_one({"hash": key}, limit=1)
def downloadFile(self, key: UUID, path: Path) -> None:
"""Download the file with the _id key to the path. Will overwrite the
file if it currently exists."""
with open(path, "wb") as f:
self.fs.download_to_stream(key, f)
def searchByName(self, name: str, limit: int) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some name."""
for d in self.artifacts.find({"name": name}, limit=limit):
yield d
def searchByType(self, typ: str, limit: int) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some type."""
for d in self.artifacts.find({"type": typ}, limit=limit):
yield d
def searchByNameType(
self, name: str, typ: str, limit: int
) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some name and type."""
for d in self.artifacts.find({"type": typ, "name": name}, limit=limit):
yield d
def searchByLikeNameType(
self, name: str, typ: str, limit: int
) -> Iterable[Dict[str, Any]]:
"""Returns an iterable of all artifacts in the database that match
some type and a regex name."""
data = self.artifacts.find(
{"type": typ, "name": {"$regex": "{}".format(name)}}, limit=limit
)
for d in data:
yield d
class ArtifactFileDB(ArtifactDB):
"""
This is a file-based database where Artifacts (as defined in artifacts.py)
are stored in a JSON file.
This database stores a list of serialized artifacts in a JSON file.
This database is not thread-safe.
If the user specifies a valid path in the environment variable
GEM5ART_STORAGE then this database will copy all artifacts to that
directory named with their UUIDs.
"""
class ArtifactEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
return str(obj)
return ArtifactFileDB.ArtifactEncoder(self, obj)
_json_file: Path
_uuid_artifact_map: Dict[str, Dict[str, str]]
_hash_uuid_map: Dict[str, List[str]]
_storage_enabled: bool
_storage_path: Path
def __init__(self, uri: str) -> None:
"""Initialize the file-driven database from a JSON file.
If the file doesn't exist, a new file will be created.
"""
parsed_uri = urlparse(uri)
# using urlparse to parse relative/absolute file path
# abs path: urlparse("file:///path/to/file") ->
# (netloc='', path='/path/to/file')
# rel path: urlparse("file://path/to/file") ->
# (netloc='path', path='/to/file')
# so, the filepath would be netloc+path for both cases
self._json_file = Path(parsed_uri.netloc) / Path(parsed_uri.path)
storage_path = os.environ.get("GEM5ART_STORAGE", "")
self._storage_enabled = True if storage_path else False
self._storage_path = Path(storage_path)
if (
self._storage_enabled
and self._storage_path.exists()
and not self._storage_path.is_dir()
):
raise Exception(
f"GEM5ART_STORAGE={storage_path} exists and is not a directory"
)
if self._storage_enabled:
os.makedirs(self._storage_path, exist_ok=True)
self._uuid_artifact_map, self._hash_uuid_map = self._load_from_file(
self._json_file
)
def put(self, key: UUID, artifact: Dict[str, Union[str, UUID]]) -> None:
"""Insert the artifact into the database with the key."""
assert artifact["_id"] == key
assert isinstance(artifact["hash"], str)
self.insert_artifact(key, artifact["hash"], artifact)
def upload(self, key: UUID, path: Path) -> None:
"""Copy the artifact to the folder specified by GEM5ART_STORAGE."""
if not self._storage_enabled:
return
src_path = path
dst_path = self._storage_path / str(key)
if not dst_path.exists():
shutil.copy2(src_path, dst_path)
def __contains__(self, key: Union[UUID, str]) -> bool:
"""Key can be a UUID or a string. Returns true if item in DB"""
if isinstance(key, UUID):
return self.has_uuid(key)
return self.has_hash(key)
def get(self, key: Union[UUID, str]) -> Dict[str, str]:
"""Key can be a UUID or a string. Returns a dictionary to construct
an artifact.
"""
artifact: List[Dict[str, str]] = []
if isinstance(key, UUID):
artifact = list(self.get_artifact_by_uuid(key))
else:
# This is a hash.
artifact = list(self.get_artifact_by_hash(key))
return artifact[0]
def downloadFile(self, key: UUID, path: Path) -> None:
"""Copy the file from the storage to specified path."""
assert path.exists()
if not self._storage_enabled:
return
src_path = self._storage_path / str(key)
dst_path = path
shutil.copy2(src_path, dst_path)
def _load_from_file(
self, json_file: Path
) -> Tuple[Dict[str, Dict[str, str]], Dict[str, List[str]]]:
uuid_mapping: Dict[str, Dict[str, str]] = {}
hash_mapping: Dict[str, List[str]] = {}
if json_file.exists():
with open(json_file, "r") as f:
j = json.load(f)
for an_artifact in j:
the_uuid = an_artifact["_id"]
the_hash = an_artifact["hash"]
uuid_mapping[the_uuid] = an_artifact
if not the_hash in hash_mapping:
hash_mapping[the_hash] = []
hash_mapping[the_hash].append(the_uuid)
return uuid_mapping, hash_mapping
def _save_to_file(self, json_file: Path) -> None:
content = list(self._uuid_artifact_map.values())
with open(json_file, "w") as f:
json.dump(content, f, indent=4, cls=ArtifactFileDB.ArtifactEncoder)
def has_uuid(self, the_uuid: UUID) -> bool:
return str(the_uuid) in self._uuid_artifact_map
def has_hash(self, the_hash: str) -> bool:
return the_hash in self._hash_uuid_map
def get_artifact_by_uuid(self, the_uuid: UUID) -> Iterable[Dict[str, str]]:
uuid_str = str(the_uuid)
if not uuid_str in self._uuid_artifact_map:
return
yield self._uuid_artifact_map[uuid_str]
def get_artifact_by_hash(self, the_hash: str) -> Iterable[Dict[str, str]]:
if not the_hash in self._hash_uuid_map:
return
for the_uuid in self._hash_uuid_map[the_hash]:
yield self._uuid_artifact_map[the_uuid]
def insert_artifact(
self,
the_uuid: UUID,
the_hash: str,
the_artifact: Dict[str, Union[str, UUID]],
) -> bool:
"""
Put the artifact to the database.
Return True if the artifact uuid does not exist in the database prior
to calling this function; return False otherwise.
"""
uuid_str = str(the_uuid)
if uuid_str in self._uuid_artifact_map:
return False
artifact_copy = copy.deepcopy(the_artifact)
artifact_copy["_id"] = str(artifact_copy["_id"])
self._uuid_artifact_map[uuid_str] = artifact_copy # type: ignore
if not the_hash in self._hash_uuid_map:
self._hash_uuid_map[the_hash] = []
self._hash_uuid_map[the_hash].append(uuid_str)
self._save_to_file(self._json_file)
return True
def find_exact(
self, attr: Dict[str, str], limit: int
) -> Iterable[Dict[str, Any]]:
"""
Return all artifacts such that, for every yielded artifact,
and for every (k,v) in attr, the attribute `k` of the artifact has
the value of `v`.
"""
count = 0
if count >= limit:
return
for artifact in self._uuid_artifact_map.values():
#https://docs.python.org/3/library/stdtypes.html#frozenset.issubset
if attr.items() <= artifact.items():
yield artifact
_db = None
if MONGO_SUPPORT:
_default_uri = "mongodb://localhost:27017"
else:
_default_uri = "file://db.json"
_db_schemes: Dict[str, Type[ArtifactDB]] = {"file": ArtifactFileDB}
if MONGO_SUPPORT:
_db_schemes["mongodb"] = ArtifactMongoDB
def _getDBType(uri: str) -> Type[ArtifactDB]:
"""Internal function to take a URI and return a class that can be
constructed with that URI. For instance "mongodb://localhost" will return
an ArtifactMongoDB. More types will be added in the future.
Supported types:
**ArtifactMongoDB**: mongodb://...
See http://dochub.mongodb.org/core/connections for details.
**ArtifactFileDB**: file://...
A simple flat file database with optional storage for the binary
artifacts. The filepath is where the json file is stored and the
data storage can be specified with GEM5ART_STORAGE
"""
result = urlparse(uri)
if result.scheme in _db_schemes:
return _db_schemes[result.scheme]
else:
raise Exception(f"Cannot find DB type for {uri}")
def getDBConnection(uri: str = "") -> ArtifactDB:
"""Returns the database connection
uri: a string representing the URI of the database. See _getDBType for
details. If no URI is given we use the default
(mongodb://localhost:27017) or the value in the GEM5ART_DB environment
variable.
If the connection has not been established, this will create a new
connection. If the connection has been established, this will replace the
connection if the uri input is non-empy.
"""
global _db
# mypy bug: https://github.com/python/mypy/issues/5423
if _db is not None and not uri: # type: ignore[unreachable]
# If we have already established a connection, use that
return _db # type: ignore[unreachable]
if not uri:
uri = os.environ.get("GEM5ART_DB", _default_uri)
typ = _getDBType(uri)
_db = typ(uri)
return _db