diff --git a/src/pymedx/__init__.py b/src/pymedx/__init__.py index 94dc0d68..57d7f817 100644 --- a/src/pymedx/__init__.py +++ b/src/pymedx/__init__.py @@ -2,7 +2,7 @@ """PyMedX package.""" from importlib import metadata as importlib_metadata -from .api import PubMed +from .api import PubMed, PubMedCentral def get_version(): @@ -18,4 +18,4 @@ def get_version(): __version__ = version -__all__ = ["PubMed", "__version__"] +__all__ = ["PubMed", "PubMedCentral", "__version__"] diff --git a/src/pymedx/api.py b/src/pymedx/api.py index 0f58515b..ca68aa27 100644 --- a/src/pymedx/api.py +++ b/src/pymedx/api.py @@ -8,7 +8,7 @@ from lxml import etree as xml -from .article import PubMedArticle +from .article import PubMedArticle, PubMedCentralArticle from .book import PubMedBookArticle from .helpers import batches @@ -62,10 +62,10 @@ def __init__( def query( self, query: str, - min_date: str, - max_date: str, max_results: int = 100, - ) -> Iterable[Union[PubMedArticle, PubMedBookArticle]]: + ) -> Iterable[ + Union[PubMedArticle, PubMedBookArticle, PubMedCentralArticle] + ]: """ Execute a query agains the GraphQL schema. @@ -84,8 +84,6 @@ def query( # Retrieve the article IDs for the query article_ids = self._getArticleIds( query=query, - min_date=min_date, - max_date=max_date, max_results=max_results, ) @@ -188,8 +186,7 @@ def _get( # Set the response mode - if parameters: - parameters["retmode"] = output + parameters["retmode"] = output # Make the request to PubMed response = requests.get(f"{BASE_URL}{url}", params=parameters) @@ -207,7 +204,9 @@ def _get( def _getArticles( self, article_ids: List[str] - ) -> Iterable[Union[PubMedArticle, PubMedBookArticle]]: + ) -> Iterable[ + Union[PubMedArticle, PubMedBookArticle, PubMedCentralArticle] + ]: """Batch a list of article IDs and retrieves the content. Parameters @@ -241,8 +240,6 @@ def _getArticles( def _getArticleIds( self, query: str, - min_date: str, - max_date: str, max_results: int, ) -> List[str]: """Retrieve the article IDs for a query. @@ -267,29 +264,19 @@ def _getArticleIds( # Add specific query parameters parameters["term"] = query - parameters["retmax"] = 50000 + parameters["retmax"] = 500000 parameters["datetype"] = "edat" - parameters["mindate"] = min_date - parameters["maxdate"] = max_date retmax: int = cast(int, parameters["retmax"]) + # Calculate a cut off point based on the max_results parameter if max_results < retmax: parameters["retmax"] = max_results - new_url = ( - "/entrez/eutils/esearch.fcgi?" - f"db={parameters['db']}&" - f"term={parameters['term']}&" - f"retmax={parameters['retmax']}&" - f"datetype={parameters['datetype']}&" - f"mindate={parameters['mindate']}&" - f"maxdate={parameters['maxdate']}&" - f"retmode=json" - ) - # Make the first request to PubMed - response: requests.models.Response = self._get(url=new_url) + response: requests.models.Response = self._get( + url="/entrez/eutils/esearch.fcgi", parameters=parameters + ) # Add the retrieved IDs to the list article_ids += response.get("esearchresult", {}).get("idlist", []) @@ -335,3 +322,74 @@ def _getArticleIds( # Return the response return article_ids + + +class PubMedCentral(PubMed): + """Warp around the PubMedCentral API.""" + + def __init__( + self, + tool: str = "my_tool", + email: str = "my_email@example.com", + api_key: str = "", + ) -> None: + """ + Initialize the PubMedCentral object. + + Parameters + ---------- + tool: String + name of the tool that is executing the query. + This parameter is not required but kindly requested by + PMC (PubMed Central). + email: String + email of the user of the tool. This parameter + is not required but kindly requested by PMC (PubMed Central). + api_key: str + the NCBI API KEY + + Returns + ------- + None + """ + # Inherits from PubMed object and initialize. + super().__init__(tool, email, api_key) + # Changes database source to pmc (PubMedCentral) + self.parameters["db"] = "pmc" + + def _getArticles( + self, article_ids: List[str] + ) -> Iterable[ + Union[PubMedArticle, PubMedBookArticle, PubMedCentralArticle] + ]: + """Batch a list of article IDs and retrieves the content. + + Parameters + ---------- + - article_ids List, article IDs. + + Returns + ------- + - articles List, article objects. + """ + # Get the default parameters + parameters = self.parameters.copy() + parameters["id"] = article_ids + + # Make the request + response = self._get( + url="/entrez/eutils/efetch.fcgi", + parameters=parameters, + output="xml", + ) + + # Parse as XML + root = xml.fromstring(response) + + # Loop over the articles and construct article objects + for article in root.iter("article"): + yield PubMedCentralArticle(xml_element=article) + + # TODO: Adapt to PubMed Central API + # for book in root.iter("PubmedBookArticle"): + # yield PubMedBookArticle(xml_element=book) diff --git a/src/pymedx/article.py b/src/pymedx/article.py index 3a66f750..5614acf7 100644 --- a/src/pymedx/article.py +++ b/src/pymedx/article.py @@ -6,7 +6,7 @@ from lxml.etree import _Element -from .helpers import getAllContent, getContent, getContentUnique +from .helpers import getAbstract, getAllContent, getContent, getContentUnique class PubMedArticle: @@ -69,7 +69,7 @@ def _extractJournal(self, xml_element: _Element) -> Union[str, None, int]: def _extractAbstract(self, xml_element: _Element) -> Union[str, None, int]: path = ".//AbstractText" - return getAllContent(element=xml_element, path=path) + return getAbstract(element=xml_element, path=path) def _extractConclusions( self, xml_element: _Element @@ -171,3 +171,194 @@ def toJSON(self) -> str: sort_keys=True, indent=4, ) + + +class PubMedCentralArticle: + """Data class that contains a PubMedCentral article.""" + + # Full slots + """ + __slots__ = ( + "pmc_id", + "title", + "abstract", + "keywords", + "journal", + "publication_date", + "authors", + "methods", + "conclusions", + "results", + "copyrights", + "doi", + "xml", + ) + """ + + # slots which have been implemented + __slots__ = ( + "pmc_id", + "title", + "abstract", + "publication_date", + "authors", + "doi", + ) + + def __init__( + self, + xml_element: Optional[_Element] = None, + *args: List[Any], + **kwargs: Dict[Any, Any], + ) -> None: + """Initialize of the object from XML or from parameters.""" + if args: + # keep it for resolving problems with linter + pass + # If an XML element is provided, use it for initialization + if xml_element is not None: + self._initializeFromXML(xml_element=xml_element) + + # If no XML element was provided, try to parse the input parameters + else: + for field in self.__slots__: + self.__setattr__(field, kwargs.get(field, None)) + + def _extractPMCId(self, xml_element: _Element) -> Union[str, None, int]: + path = ".//article-meta/article-id[@pub-id-type='pmc']" + return getContentUnique(element=xml_element, path=path) + + def _extractTitle(self, xml_element: _Element) -> Union[str, None, int]: + path = ".//title-group" + return getAllContent(element=xml_element, path=path) + + # TODO: adapt the function for PubMed Central + # def _extractKeywords(self, xml_element: _Element) -> List[Any]: + # path = ".//Keyword" + # return [ + # keyword.text + # for keyword in xml_element.findall(path) + # if keyword is not None + # ] + # TODO: adapt the function for PubMed Central + # def _extractJournal + # (self, xml_element: _Element) -> Union[str, None, int]: + # path = ".//Journal/Title" + # return getContent(element=xml_element, path=path) + + def _extractAbstract(self, xml_element: _Element) -> Union[str, None, int]: + path = ".//abstract" + return getAllContent(element=xml_element, path=path) + + # TODO: adapt the function for PubMed Central + # def _extractConclusions( + # self, xml_element: _Element + # ) -> Union[str, None, int]: + # path = ".//AbstractText[@Label='CONCLUSION']" + # return getContent(element=xml_element, path=path) + # TODO: adapt the function for PubMed Central + # def _extractMethods(self, xml_element: _Element) + # -> Union[str, None, int]: + # path = ".//AbstractText[@Label='METHOD']" + # return getContent(element=xml_element, path=path) + # TODO: adapt the function for PubMed Central + # def _extractResults(self, xml_element: _Element) + # -> Union[str, None, int]: + # path = ".//AbstractText[@Label='RESULTS']" + # return getContent(element=xml_element, path=path) + # TODO: adapt the function for PubMed Central + # def _extractCopyrights( + # self, xml_element: _Element + # ) -> Union[str, None, int]: + # path = ".//CopyrightInformation" + # return getContent(element=xml_element, path=path) + + def _extractDoi(self, xml_element: _Element) -> Union[str, None, int]: + path = ".//article-meta/article-id[@pub-id-type='doi']" + return getContentUnique(element=xml_element, path=path) + + def _extractPublicationDate( + self, xml_element: _Element + ) -> Optional[datetime.date]: + # Get the publication date + + # Get the publication elements + publication_date = xml_element.find(".//pub-date[@pub-type='epub']") + + if not publication_date: # Check this part + publication_date = xml_element.find(".//pub-date") + + if publication_date is not None: + publication_year = getContent(publication_date, ".//year", None) + + publication_month = getContent(publication_date, ".//month", "1") + + publication_day = getContent(publication_date, ".//day", "1") + + # Construct a datetime object from the info + date_str: str = ( + f"{publication_year}/{publication_month}/{publication_day}" + ) + + return datetime.datetime.strptime(date_str, "%Y/%m/%d") + + # Unable to parse the datetime + return None + + def _extractAuthors( + self, xml_element: _Element + ) -> List[dict[str, Union[str, None, int]]]: + contrib_group = xml_element.findall(".//contrib-group") + if contrib_group: + return [ + { + "lastname": getContent(author, ".//surname", None), + "firstname": getContent(author, ".//given-names", None), + # TODO: adapt the function for PubMed Central + # "initials": getContent(author, ".//Initials", None), + # "affiliation": getContent( + # author, ".//AffiliationInfo/Affiliation", None + # ), + } + for author in contrib_group[0].findall( + ".//contrib[@contrib-type='author']" + ) + ] + return [] + + def _initializeFromXML(self, xml_element: _Element) -> None: + """Parse an XML element into an article object.""" + # Parse the different fields of the article + self.pmc_id = self._extractPMCId(xml_element) + self.title = self._extractTitle(xml_element) + self.abstract = self._extractAbstract(xml_element) + self.doi = self._extractDoi(xml_element) + self.publication_date = self._extractPublicationDate(xml_element) + self.authors = self._extractAuthors(xml_element) + # TODO: adapt the function for PubMed Central + # self.xml = xml_element + # self.keywords = self._extractKeywords(xml_element) + # self.journal = self._extractJournal(xml_element) + # self.conclusions = self._extractConclusions(xml_element) + # self.methods = self._extractMethods(xml_element) + # self.results = self._extractResults(xml_element) + # self.copyrights = self._extractCopyrights(xml_element) + + def toDict(self) -> Dict[Any, Any]: + """Convert the parsed information to a Python dict.""" + return {key: self.__getattribute__(key) for key in self.__slots__} + + def toJSON(self) -> str: + """Dump the object as JSON string.""" + return json.dumps( + { + key: ( + value + if not isinstance(value, (datetime.date, _Element)) + else str(value) + ) + for key, value in self.toDict().items() + }, + sort_keys=True, + indent=4, + ) diff --git a/src/pymedx/helpers.py b/src/pymedx/helpers.py index 60b050d5..950d1888 100644 --- a/src/pymedx/helpers.py +++ b/src/pymedx/helpers.py @@ -140,4 +140,56 @@ def getAllContent( ) # Extract the text and return it - return result + return " ".join(result.split()) + + +def getAbstract( + element: _Element, + path: str, + default: Optional[str] = None, +) -> Optional[Union[str, int]]: + """ + Retrieve text content of an XML element. + + Return all the text inside the path and omit XML tags inside. + and omits abstract-type == scanned-figures + + Parameters + ---------- + element: Element + the XML element to parse. + path: Str + Nested path in the XML element. + default: Str + default value to return when no text is found. + + Returns + ------- + text: str + text in the XML node. + """ + # Find the path in the element + raw_result = element.findall(path) + + # Return the default if there is no such element + if not raw_result: + return default + + if raw_result[0].attrib.get("abstract-type", None) == "scanned-figures": + return default + + for fig in raw_result[0].iter("fig"): + parent = fig.getparent() + if parent is not None: + parent.remove(fig) + + # Get all text avoiding the tags + result = cast( + str, + lxml.etree.tostring( + raw_result[0], method="text", encoding="utf-8" + ).decode("utf-8"), + ) + + # Extract the text and return it + return " ".join(result.split()) diff --git a/tests/conftest.py b/tests/conftest.py index b0725d96..3e73dd7b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ import pytest import requests_cache -from pymedx.api import PubMed +from pymedx.api import PubMed, PubMedCentral @pytest.fixture(scope="session", autouse=True) @@ -38,3 +38,15 @@ def pubmed() -> PubMed: params["api_key"] = api_key return PubMed(**params) + + +@pytest.fixture(scope="session", autouse=True) +def pmc() -> PubMedCentral: + """Fixture to create a PubMed instance.""" + params = dict(tool="TestTool", email="test@example.com") + + api_key = os.getenv("NCBI_API_KEY", "") + if api_key: + params["api_key"] = api_key + + return PubMedCentral(**params) diff --git a/tests/test_api.py b/tests/test_api.py index b321face..d0174e5e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -23,8 +23,6 @@ def test_query(self, pubmed: PubMed): # Use a very specific query to limit results articles = pubmed.query( query="COVID-19 vaccines", - min_date="2021-01-01", - max_date="2021-01-31", max_results=10, ) articles = list(articles) # Convert from generator to list diff --git a/tests/test_article.py b/tests/test_article.py index 49aa8a8f..188662c6 100644 --- a/tests/test_article.py +++ b/tests/test_article.py @@ -10,9 +10,7 @@ @pytest.fixture(scope="module") def sample_article(pubmed): """Fixture to create a PubMedArticle instance from dynamic XML.""" - article_ids = pubmed.query( - "COVID-19 vaccines", "2021-01-01", "2021-01-31", max_results=10 - ) + article_ids = pubmed.query("COVID-19 vaccines", max_results=10) article_ids = list(article_ids) articles = pubmed._getArticles(article_ids[:1]) diff --git a/tests/test_pmc.py b/tests/test_pmc.py new file mode 100644 index 00000000..0a013658 --- /dev/null +++ b/tests/test_pmc.py @@ -0,0 +1,23 @@ +"""Test PubMedCentral class.""" + + +from pymedx.api import PubMedCentral + + +class TestPMC: + """Test the helpers module.""" + + def test_query_results(self): + """Test the batches function.""" + email = "email@email.com" + tool = "testing" + collector = PubMedCentral(tool=tool, email=email) + query = ( + 'chile AND wastewater ("2000/01/01"[Publication Date]' + ' : "3000"[Publication Date])' + ) + results = collector.query(query=query, max_results=10) + listed = list(results) + assert len(listed) > 0 + assert len(listed[0].title) > 0 + assert len(listed[0].pmc_id) > 0 diff --git a/tests/test_pmc_api.py b/tests/test_pmc_api.py new file mode 100644 index 00000000..d6a60ecd --- /dev/null +++ b/tests/test_pmc_api.py @@ -0,0 +1,33 @@ +"""Tests for the api module.""" + + +from pymedx.api import PubMedCentral + + +class TestPubMedCentral: + """Tests for PubMedCentral.""" + + def test_initialization(self, pmc: PubMedCentral): + """Test the initialization of the PubMed class.""" + assert pmc.tool == "TestTool" + assert pmc.email == "test@example.com" + + def test_rate_limit_not_exceeded(self, pmc: PubMedCentral): + """Test that the rate limit is not exceeded initially.""" + assert not pmc._exceededRateLimit() + + def test_query(self, pmc: PubMedCentral): + """Test a simple query. This will hit the live PubMed API.""" + # Use a very specific query to limit results + articles = pmc.query( + query="COVID-19 vaccines", + max_results=10, + ) + articles = list(articles) # Convert from generator to list + assert len(articles) > 0 # Assert that we got some results + + def test_get_total_results_count(self, pmc: PubMedCentral): + """Test getting the total results count for a query.""" + count = pmc.getTotalResultsCount(query="COVID-19 vaccines") + assert isinstance(count, int) + assert count > 0 # Assert that the query matches some results diff --git a/tests/test_pmc_article.py b/tests/test_pmc_article.py new file mode 100644 index 00000000..2dc5bb2c --- /dev/null +++ b/tests/test_pmc_article.py @@ -0,0 +1,57 @@ +"""Test for the article module.""" +import datetime + +import pytest + +DOI_LEN_MIN = 5 +DOI_LEN_MAX = 255 + + +@pytest.fixture(scope="module") +def sample_pmc_article(pmc): + """Fixture to create a PubMedArticle instance from dynamic XML.""" + article_ids = pmc.query("'machine learning'", max_results=10) + article_ids = list(article_ids) + article = article_ids[0] + + return article + + +class TestArticle: + """Test PubMed Central article module.""" + + def test_doi_length(self, sample_pmc_article): + """Test that the DOI attribute has an expected length range.""" + doi = sample_pmc_article.doi + # although, by definition, doi size could be infinite + # it seems in the applications it is limited to 255 + # for example: + assert DOI_LEN_MIN <= len(doi) <= DOI_LEN_MAX + + def test_title_exists(self, sample_pmc_article): + """Test that the article title exists and is not empty.""" + title = sample_pmc_article.title + assert title and len(title) > 0 + + @pytest.mark.skip(reason="verify source") + def test_abstract_structure(self, sample_pmc_article): + """Test that the abstract exists and meets basic expectations.""" + # note: is some cases the abstract is not available + abstract = sample_pmc_article.abstract or "" + # assert abstract and len(abstract) > 20 + assert abstract == "" + + def test_publication_date_type(self, sample_pmc_article): + """Test that the publication date is a datetime.date object.""" + pub_date = sample_pmc_article.publication_date + assert isinstance(pub_date, datetime.date) + + def test_toDict(self, sample_pmc_article): + """Test toDict method.""" + article_dict = sample_pmc_article.toDict() + assert article_dict + + def test_toJSON(self, sample_pmc_article): + """Test toJSON method.""" + article_json = sample_pmc_article.toJSON() + assert article_json