From 03d06549bcee7a441a6c11a1bcd2ecacc2b08368 Mon Sep 17 00:00:00 2001 From: Paul Tikken Date: Tue, 10 Oct 2023 13:49:55 +0000 Subject: [PATCH] added specific collection update / populate handles --- CveXplore/VERSION | 2 +- .../database/maintenance/Sources_process.py | 8 -- .../database/maintenance/main_updater.py | 93 +++++++++++++++++-- CveXplore/errors/database.py | 4 + 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/CveXplore/VERSION b/CveXplore/VERSION index 9325c3cc..a2268e2d 100644 --- a/CveXplore/VERSION +++ b/CveXplore/VERSION @@ -1 +1 @@ -0.3.0 \ No newline at end of file +0.3.1 \ No newline at end of file diff --git a/CveXplore/database/maintenance/Sources_process.py b/CveXplore/database/maintenance/Sources_process.py index 501b4c9c..5c4bf1c9 100644 --- a/CveXplore/database/maintenance/Sources_process.py +++ b/CveXplore/database/maintenance/Sources_process.py @@ -27,11 +27,6 @@ from CveXplore.database.maintenance.file_handlers import XMLFileHandler, JSONFileHandler from CveXplore.errors.apis import ApiDataRetrievalFailed -file_prefix = "nvdcve-1.1-" -file_suffix = ".json.gz" -file_mod = "modified" -file_rec = "recent" - date = datetime.datetime.now() year = date.year + 1 @@ -77,9 +72,6 @@ def process_the_item(item=None): "title": title, "CveSearchtitle": generate_title(item["cpeName"]), "cpe_2_2": item["cpeName"], - # "cpe_name": [ - # {"cpe23Uri": f"{item['cpeName']}"} - # ], "vendor": item["cpeName"].split(":")[3], "product": item["cpeName"].split(":")[4], "cpeNameId": item["cpeNameId"], diff --git a/CveXplore/database/maintenance/main_updater.py b/CveXplore/database/maintenance/main_updater.py index b9c32d1b..a490cd7b 100644 --- a/CveXplore/database/maintenance/main_updater.py +++ b/CveXplore/database/maintenance/main_updater.py @@ -15,6 +15,7 @@ VIADownloads, DatabaseIndexer, ) +from CveXplore.errors import UpdateSourceNotFound logging.setLoggerClass(UpdateHandler) @@ -49,18 +50,96 @@ def __init__(self, datasource): self.logger = logging.getLogger("MainUpdater") - def update(self): + def update(self, update_source: str | list = None): """ Method used for updating the database """ + if not isinstance(update_source, str | list): + raise ValueError + try: + if update_source is None: + + for source in self.sources: + up = source["updater"]() + up.update() + + elif isinstance(update_source, list): + for source in update_source: + try: + update_this_source = [ + x for x in self.sources if x["name"] == source + ][0] + up = update_this_source["updater"]() + up.update() + except IndexError: + raise UpdateSourceNotFound( + f"Provided source: {source} could not be found...." + ) + else: + # single string then.... + try: + update_this_source = [ + x for x in self.sources if x["name"] == update_source + ][0] + up = update_this_source["updater"]() + up.update() + except IndexError: + raise UpdateSourceNotFound( + f"Provided source: {update_source} could not be found...." + ) + except UpdateSourceNotFound: + raise + else: + for post in self.posts: + indexer = post["updater"]() + indexer.create_indexes() - for source in self.sources: - up = source["updater"]() - up.update() + self.datasource.set_handlers_for_collections() + + self.logger.info(f"Database update / initialization complete!") - for post in self.posts: - indexer = post["updater"]() - indexer.create_indexes() + def populate(self, populate_source: str | list = None): + """ + Method used for updating the database + """ + if not isinstance(populate_source, str | list): + raise ValueError + try: + if populate_source is None: + for source in self.sources: + up = source["updater"]() + up.populate() + + elif isinstance(populate_source, list): + for source in populate_source: + try: + update_this_source = [ + x for x in self.sources if x["name"] == source + ][0] + up = update_this_source["updater"]() + up.populate() + except IndexError: + raise UpdateSourceNotFound( + f"Provided source: {source} could not be found...." + ) + else: + # single string then.... + try: + update_this_source = [ + x for x in self.sources if x["name"] == populate_source + ][0] + up = update_this_source["updater"]() + up.populate() + except IndexError: + raise UpdateSourceNotFound( + f"Provided source: {populate_source} could not be found...." + ) + except UpdateSourceNotFound: + raise + else: + for post in self.posts: + indexer = post["updater"]() + indexer.create_indexes() self.datasource.set_handlers_for_collections() diff --git a/CveXplore/errors/database.py b/CveXplore/errors/database.py index 0f6d473a..aa2854f4 100644 --- a/CveXplore/errors/database.py +++ b/CveXplore/errors/database.py @@ -12,3 +12,7 @@ class DatabaseConnectionException(DatabaseException): class DatabaseIllegalCollection(DatabaseException): pass + + +class UpdateSourceNotFound(DatabaseException): + pass