Skip to content

Commit

Permalink
Webhook communication (#4)
Browse files Browse the repository at this point in the history
* Emit events via ADS webhook

- Fetch metadata from doi.org and verify if it is a software record
- Verify valid ASCL URL/Non-ASCL URL
- Emit event to ADS webhook

* Better control of DOI responses

* Refactored citation change protobuf

- Added URL regex validation
- Reduced protobuf and database size

* Different strategy to delete duplicates (prioritize resolved ones)

* Consider bibcode cited for protobuf only if resolved

* Corrected comment to show how to select only zenodo entries

* Better handling of HTTP error from doi.org

* Bugfix counting changes in database
  • Loading branch information
marblestation authored Oct 24, 2017
1 parent 98f43bd commit d2fc80f
Show file tree
Hide file tree
Showing 16 changed files with 941 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ ENV/

local_config.py
logs/*
python/*
7 changes: 6 additions & 1 deletion adscc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@
from models import *

class ADSCitationCaptureCelery(ADSCelery):
pass
def attempt_recovery(self, task, args=None, kwargs=None, einfo=None, retval=None):
"""
If task fails after 3 attempts...
"""
#task.apply_async(args=args, kwargs=kwargs)
pass
119 changes: 70 additions & 49 deletions adscc/delta_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ def _execute_sql(self, sql_template, *args):
self.logger.debug("Executing SQL: %s", sql_command)
return self.connection.execute(sql_command)

def _citation_changes_query(self):
if self.joint_table_name in Inspector.from_engine(self.engine).get_table_names(schema=self.schema_name):
CitationChanges.__table__.schema = self.schema_name
## Only consider Zenodo and ASCL records
#sqlalchemy_query = self.session.query(CitationChanges).filter((CitationChanges.new_content.like('%zenodo%')) | (CitationChanges.new_pid.is_(True)))
## Only consider Zenodo
#sqlalchemy_query = self.session.query(CitationChanges).filter(CitationChanges.new_content.like('%zenodo%'))
# Consider Zenodo, ASCL and URL records (all of them)
sqlalchemy_query = self.session.query(CitationChanges)
return sqlalchemy_query

def __iter__(self):
return self

Expand All @@ -90,19 +101,27 @@ def next(self): # Python 3: def __next__(self)
raise StopIteration
else:
citation_changes = adsmsg.CitationChanges()
CitationChanges.__table__.schema = self.schema_name
# Get citation changes from DB
for instance in self.session.query(CitationChanges).offset(self.offset).limit(self.group_changes_in_chunks_of).yield_per(100):
for instance in self._citation_changes_query().offset(self.offset).limit(self.group_changes_in_chunks_of).yield_per(100):
## Build protobuf message
citation_change = citation_changes.changes.add()
# Use new_ or previous_ fields depending if status is NEW/UPDATED or DELETED
prefix = "previous_" if instance.status == "DELETED" else "new_"
citation_change.citing = getattr(instance, prefix+"citing")
citation_change.cited = getattr(instance, prefix+"cited")
citation_change.doi = '' if getattr(instance, prefix+"doi") is None else getattr(instance, prefix+"doi")
citation_change.pid = '' if getattr(instance, prefix+"pid") is None else getattr(instance, prefix+"pid")
citation_change.url = '' if getattr(instance, prefix+"url") is None else getattr(instance, prefix+"url")
citation_change.score = getattr(instance, prefix+"score")
resolved = getattr(instance, prefix+"resolved")
if resolved:
# Only accept cited bibcode if score is 1 (resolved)
citation_change.cited = getattr(instance, prefix+"cited")
else:
citation_change.cited = '...................'
if getattr(instance, prefix+"doi"):
citation_change.content_type = adsmsg.CitationChangeContentType.doi
elif getattr(instance, prefix+"pid"):
citation_change.content_type = adsmsg.CitationChangeContentType.pid
elif getattr(instance, prefix+"url"):
citation_change.content_type = adsmsg.CitationChangeContentType.url
citation_change.content = getattr(instance, prefix+"content")
citation_change.resolved = getattr(instance, prefix+"resolved")
citation_change.status = getattr(adsmsg.Status, instance.status.lower())
self.session.commit()

Expand Down Expand Up @@ -181,18 +200,23 @@ def _expand_json(self):
elif table_already_exists:
return

# Expand ignoring the source field
# Expand ignoring the source field, keeping only information about
# score == "1" which have resolved bibcodes in the cited field
# and ordered by citing, data and reverse resolved to guarantee that
# duplicates where there is one entry that is resolved and others that
# don't, the one that is resolved is the one kept (the rest are removed,
# see _delete_dups where MIN(id) is kept)
create_expanded_table = \
"create table {0}.{2} as \
select id, \
payload->>'citing' as citing, \
payload->>'cited' as cited, \
payload->>'doi' as doi, \
payload->>'pid' as pid, \
payload->>'url' as url, \
concat(payload->>'doi'::text, payload->>'pid'::text, payload->>'url'::text) as data, \
payload->>'score' as score \
from {0}.{1};"
(payload->>'doi' is not null) as doi, \
(payload->>'pid' is not null) as pid, \
(payload->>'url' is not null) as url, \
concat(payload->>'doi'::text, payload->>'pid'::text, payload->>'url'::text) as content, \
(payload->>'score' is not null and payload->>'score' = '1') as resolved \
from {0}.{1} order by citing asc, content asc, resolved desc;"
self._execute_sql(create_expanded_table, self.schema_name, self.table_name, self.expanded_table_name)


Expand All @@ -203,26 +227,23 @@ def _delete_dups(self):
2011arXiv1112.0312C {"cited":"2012ascl.soft03003C","citing":"2011arXiv1112.0312C","pid":"ascl:1203.003","score":"1","source":"/proj/ads/references/resolved/arXiv/1112/0312.raw.result:10"}
2011arXiv1112.0312C {"cited":"2012ascl.soft03003C","citing":"2011arXiv1112.0312C","pid":"ascl:1203.003","score":"1","source":"/proj/ads/references/resolved/AUTHOR/2012/0605.pairs.result:89"}
Because the same citation was identified in more than one source. We can safely ignore them.
Because the same citation was identified in more than one source.
We can safely ignore them but in case there is any of these dups
that were not resolved, the resolved one should be prioriticed.
"""
delete_duplicates_sql = \
"DELETE FROM {0}.{1} a USING ( \
SELECT MIN(id) as id, citing, data \
FROM {0}.{1} \
GROUP BY citing, data \
HAVING COUNT(*) > 1 \
) b \
WHERE a.citing = b.citing \
AND a.data = a.data \
AND a.id <> b.id"
"DELETE FROM {0}.{1} WHERE id IN ( \
SELECT id FROM \
(SELECT id, row_number() over(partition by citing, content order by resolved desc) AS dup_id FROM {0}.{1}) t \
WHERE t.dup_id > 1 \
)"
self._execute_sql(delete_duplicates_sql, self.schema_name, self.expanded_table_name)

def _compute_n_changes(self):
"""Count how many citation changes were identified"""
if self.joint_table_name in Inspector.from_engine(self.engine).get_table_names(schema=self.schema_name):
count_all_fields_null_sql = "select count(*) from {0}.{1};"
n_changes = self._execute_sql(count_all_fields_null_sql, self.schema_name, self.joint_table_name)
return n_changes.scalar()
n_changes = self._citation_changes_query().count()
return n_changes
else:
return 0

Expand All @@ -241,9 +262,9 @@ def _verify_input_data(self):
"select count(*) \
from {0}.{1} \
where (\
doi is null \
and pid is null \
and url is null \
not doi \
and not pid \
and not url \
);"
n_all_fields_null = self._execute_sql(count_all_fields_null_sql, self.schema_name, self.expanded_table_name).scalar()
if n_all_fields_null > 0:
Expand All @@ -254,10 +275,10 @@ def _verify_input_data(self):
"select count(*) \
from {0}.{1} \
where (\
(doi is not null and pid is not null and url is null) \
or (doi is not null and pid is null and url is not null) \
or (doi is null and pid is not null and url is not null) \
or (doi is not null and pid is not null and url is not null) \
(doi and pid and not url) \
or (doi and not pid and url) \
or (not doi and pid and url) \
or (doi and pid and url) \
);"
n_too_many_fields_not_null = self._execute_sql(count_too_many_fields_not_null_sql, self.schema_name, self.expanded_table_name).scalar()
if n_too_many_fields_not_null > 0:
Expand All @@ -267,7 +288,7 @@ def _verify_input_data(self):
count_duplicates_sql = \
"select count(*) \
from {0}.{1} \
group by citing, data \
group by citing, content \
having count(*) > 1;"
n_duplicates = self._execute_sql(count_duplicates_sql, self.schema_name, self.expanded_table_name).scalar()
if n_duplicates > 0:
Expand Down Expand Up @@ -302,16 +323,16 @@ def _join_tables(self):
{0}.{1}.doi as new_doi, \
{0}.{1}.pid as new_pid, \
{0}.{1}.url as new_url, \
{0}.{1}.data as new_data, \
{0}.{1}.score as new_score, \
{0}.{1}.content as new_content, \
{0}.{1}.resolved as new_resolved, \
cast(null as text) as previous_id, \
cast(null as text) as previous_citing, \
cast(null as text) as previous_cited, \
cast(null as text) as previous_doi, \
cast(null as text) as previous_pid, \
cast(null as text) as previous_url, \
cast(null as text) as previous_data, \
cast(null as text) as previous_score \
cast(null as boolean) as previous_doi, \
cast(null as boolean) as previous_pid, \
cast(null as boolean) as previous_url, \
cast(null as text) as previous_content, \
cast(null as boolean) as previous_resolved \
from {0}.{1};"
self._execute_sql(joint_table_sql, self.schema_name, self.expanded_table_name, self.joint_table_name)
else:
Expand All @@ -324,24 +345,24 @@ def _join_tables(self):
{0}.{2}.doi as new_doi, \
{0}.{2}.pid as new_pid, \
{0}.{2}.url as new_url, \
{0}.{2}.data as new_data, \
{0}.{2}.score as new_score, \
{0}.{2}.content as new_content, \
{0}.{2}.resolved as new_resolved, \
{1}.{2}.id as previous_id, \
{1}.{2}.citing as previous_citing, \
{1}.{2}.cited as previous_cited, \
{1}.{2}.doi as previous_doi, \
{1}.{2}.pid as previous_pid, \
{1}.{2}.url as previous_url, \
{1}.{2}.data as previous_data, \
{1}.{2}.score as previous_score \
{1}.{2}.content as previous_content, \
{1}.{2}.resolved as previous_resolved \
from {1}.{2} full join {0}.{2} \
on \
{0}.{2}.citing={1}.{2}.citing \
and {0}.{2}.data={1}.{2}.data \
and {0}.{2}.content={1}.{2}.content \
where \
({0}.{2}.id is not null and {1}.{2}.id is null) \
or ({0}.{2}.id is null and {1}.{2}.id is not null) \
or ({0}.{2}.id is not null and {1}.{2}.id is not null and ({0}.{2}.cited<>{1}.{2}.cited or {0}.{2}.score<>{1}.{2}.score)) \
or ({0}.{2}.id is not null and {1}.{2}.id is not null and ({0}.{2}.cited<>{1}.{2}.cited or {0}.{2}.resolved<>{1}.{2}.resolved)) \
;"
self._execute_sql(joint_table_sql, self.schema_name, self.previous_schema_name, self.expanded_table_name, self.joint_table_name)

Expand Down Expand Up @@ -371,7 +392,7 @@ def _calculate_delta(self):
and {0}.{1}.new_id is not null \
and {0}.{1}.previous_id is not null \
and ({0}.{1}.new_cited<>{0}.{1}.previous_cited \
or {0}.{1}.new_score<>{0}.{1}.previous_score);"
or {0}.{1}.new_resolved<>{0}.{1}.previous_resolved);"
self._execute_sql(update_status_updated_sql, self.schema_name, self.joint_table_name)

update_status_new_sql = \
Expand Down
54 changes: 54 additions & 0 deletions adscc/doi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import requests
import json
from lxml import etree
from adsputils import setup_logging

logger = setup_logging(__name__)

def is_software(base_doi_url, doi):
is_software = False
record_found = False
try_later = False
doi_endpoint = base_doi_url + doi
headers = {}
## Supported content types: https://citation.crosscite.org/docs.html#sec-4
#headers["Accept"] = "application/vnd.datacite.datacite+xml;q=1, application/vnd.crossref.unixref+xml;q=1"
#headers["Accept"] = "application/vnd.crossref.unixref+xml;q=1" # This format does not contain software type tag
headers["Accept"] = "application/vnd.datacite.datacite+xml;q=1"
data = {}
timeout = 30
try:
r = requests.get(doi_endpoint, data=json.dumps(data), headers=headers, timeout=timeout)
except:
logger.exception("HTTP request to DOI service failed: %s", doi_endpoint)
try_later = True
else:
if r.status_code == 406:
logger.error("No answer from doi.org with the requested format (%s) for: %s", headers["Accept"], doi_endpoint)
#raise Exception("No answer from doi.org with the requested format ({}) for: {}".format(headers["Accept"], doi_endpoint))
elif r.status_code == 404:
logger.error("Entry not found (404 HTTP error code): %s", doi_endpoint)
#raise Exception("Entry not found (404 HTTP error code): {}".format(doi_endpoint))
elif not r.ok:
# Rest of bad status codes
try_later = True
logger.error("HTTP request with error code '%s' for: %s", r.status_code, doi_endpoint)
else:
record_found = True

if try_later:
# Exceptions make the task to fail, and the framework will re-try automatically later on
logger.error("HTTP request to DOI service failed: %s", doi_endpoint)
raise Exception("HTTP request to DOI service failed: {}".format(doi_endpoint))

if record_found:
try:
root = etree.fromstring(r.content)
except:
pass
else:
resource_type = root.find("{http://datacite.org/schema/kernel-3}resourceType")
if resource_type is not None:
resource_type_general = resource_type.get('resourceTypeGeneral')
is_software = resource_type_general is not None and resource_type_general.lower() == "software"
return is_software
22 changes: 11 additions & 11 deletions adscc/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, DateTime, String, Text, Integer, func
from sqlalchemy import Column, Boolean, DateTime, String, Text, Integer, func
from sqlalchemy.dialects.postgresql import ENUM, JSON, JSONB
from sqlalchemy.ext.declarative import declarative_base

Expand All @@ -18,17 +18,17 @@ class CitationChanges(Base):
new_id = Column(Integer)
new_citing = Column(Text())
new_cited = Column(Text())
new_doi = Column(Text())
new_pid = Column(Text())
new_url = Column(Text())
new_data = Column(Text())
new_score = Column(Text())
new_doi = Column(Boolean())
new_pid = Column(Boolean())
new_url = Column(Boolean())
new_content = Column(Text())
new_resolved = Column(Boolean())
previous_citing = Column(Text())
previous_cited = Column(Text())
previous_doi = Column(Text())
previous_pid = Column(Text())
previous_url = Column(Text())
previous_data = Column(Text())
previous_score = Column(Text())
previous_doi = Column(Boolean())
previous_pid = Column(Boolean())
previous_url = Column(Boolean())
previous_content = Column(Text())
previous_resolved = Column(Boolean())
status = Column(ENUM('NEW', 'DELETED', 'UPDATED', name='status_type'))

42 changes: 31 additions & 11 deletions adscc/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import os
from kombu import Queue
import adscc.app as app_module
#from adsmsg import CitationUpdate
import adscc.webhook as webhook
import adscc.doi as doi
import adscc.url as url
import adsmsg

# ============================= INITIALIZATION ==================================== #

Expand All @@ -26,17 +29,34 @@ def task_process_citation_changes(citation_changes):
"""
logger.debug('Checking content: %s', citation_changes)
for citation_change in citation_changes.changes:
if citation_change.doi != "":
# TODO: Fetch DOI metadata
pass
elif citation_change.pid != "":
# TODO: Fetch ASCL metadata?
pass
elif citation_change.url != "":
# TODO: Check is a valid and alive URL
pass
if citation_change.content_type == adsmsg.CitationChangeContentType.doi \
and citation_change.content not in ["", None]:
# Fetch DOI metadata (if HTTP request fails, an exception is raised
# and the task will be re-queued (see app.py and adsputils))
is_software = doi.is_software(app.conf['DOI_URL'], citation_change.content)
is_link_alive = True
elif citation_change.content_type == adsmsg.CitationChangeContentType.pid \
and citation_change.content not in ["", None]:
is_software = True
is_link_alive = url.is_alive(app.conf['ASCL_URL'] + citation_change.content)
elif citation_change.content_type == adsmsg.CitationChangeContentType.url \
and citation_change.content not in ["", None]:
is_software = False
is_link_alive = url.is_alive(citation_change.content)
else:
raise Exception("Citation change should have doi, pid or url informed: %s", citation_change)
is_software = False
is_link_alive = False
logger.error("Citation change should have doi, pid or url informed: {}", citation_change)
#raise Exception("Citation change should have doi, pid or url informed: {}".format(citation_change))

emitted = False
if is_software and is_link_alive:
emitted = webhook.emit_event(app.conf['ADS_WEBHOOK_URL'], app.conf['ADS_WEBHOOK_AUTH_TOKEN'], citation_change)

if emitted:
logger.debug("Emitted '%s'", citation_change)
else:
logger.debug("Not emitted '%s'", citation_change)

#logger.debug("Calling 'task_output_results' with '%s'", citation_change)
##task_output_results.delay(citation_change)
Expand Down
Loading

0 comments on commit d2fc80f

Please sign in to comment.