Skip to content

Commit

Permalink
Merge pull request #208 from catalyst-cooperative/github-load-failure
Browse files Browse the repository at this point in the history
Fix `load_metrics` failures due to Github and Zenodo ETLs
  • Loading branch information
e-belfer authored Oct 25, 2024
2 parents 8697211 + 3a244bb commit 1bd4092
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Create core Zenodo log table
"""Create core zenodo table
Revision ID: 324a4c7b342c
Revision ID: b4fee22b4a4d
Revises: e8435a653eb2
Create Date: 2024-10-01 13:10:01.924963
Create Date: 2024-10-18 17:39:14.494677
"""
from typing import Sequence, Union
Expand All @@ -12,7 +12,7 @@


# revision identifiers, used by Alembic.
revision: str = '324a4c7b342c'
revision: str = 'b4fee22b4a4d'
down_revision: Union[str, None] = 'e8435a653eb2'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
Expand Down Expand Up @@ -46,7 +46,7 @@ def upgrade() -> None:
sa.Column('version_status', sa.String(), nullable=True, comment='The status of the Zenodo version.'),
sa.Column('version_state', sa.String(), nullable=True, comment='The state of the Zenodo version.'),
sa.Column('version_submitted', sa.Boolean(), nullable=True, comment='Is the version submitted?'),
sa.Column('version_description', sa.Boolean(), nullable=True, comment='The description of the version.'),
sa.Column('version_description', sa.String(), nullable=True, comment='The description of the version.'),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date', 'version_id')
)
Expand Down
7 changes: 4 additions & 3 deletions src/usage_metrics/core/github_nonpartitioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ def core_github_stargazers(
"""Transform the stargazers to the PUDL Github repository."""
df = raw_github_stargazers

# Drop 'fork' column and all URLs other than the main one for the repository
# Also drop "open_issues_count" which is identical to "open_issues"
df = df.rename({"size": "size_kb"}).drop(columns=["gravatar_id", "avatar_url"])
# Drop fields that don't pertain to usage metrics
df = df.rename({"size": "size_kb"}).drop(
columns=["gravatar_id", "avatar_url", "user_view_type"]
)

# Convert string to datetime using Pandas
df["starred_at"] = pd.to_datetime(df["starred_at"])
Expand Down
133 changes: 43 additions & 90 deletions src/usage_metrics/core/zenodo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@asset(
partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"),
io_manager_key="database_manager",
# io_manager_key="database_manager",
tags={"source": "zenodo"},
)
def core_zenodo_logs(
Expand All @@ -26,49 +26,8 @@ def core_zenodo_logs(
context.log.warn(f"No data found for the week of {context.partition_key}")
return raw_zenodo_logs

# Drop columns
df = raw_zenodo_logs.drop(
columns=[
"files",
"owners",
"revision",
"metadata.access_right",
"metadata.creators",
"metadata.language",
"metadata.resource_type.title",
"metadata.resource_type.type",
"metadata.license.id",
"links.self_html",
"links.doi",
"links.self_doi",
"links.self_doi_html",
"links.parent",
"links.parent_html",
"links.parent_doi",
"links.parent_doi_html",
"links.self_iiif_manifest",
"links.self_iiif_sequence",
"links.files",
"links.media_files",
"links.archive",
"links.archive_media",
"links.latest",
"links.latest_html",
"links.versions",
"links.draft",
"links.reserve_doi",
"links.access_links",
"links.access_grants",
"links.access_users",
"links.access_request",
"links.access",
"links.communities",
"links.communities-suggestions",
"links.requests",
]
)

df = df.rename(
# Rename columns
df = raw_zenodo_logs.rename(
columns={
"created": "version_creation_date", # Datetime
"modified": "version_last_modified_date", # Datetime
Expand Down Expand Up @@ -96,6 +55,11 @@ def core_zenodo_logs(
"stats.version_unique_views": "version_unique_views",
}
)
# Drop columns
df = df.drop(columns=["files", "owners", "revision"]).drop(
columns=[col for col in df.columns if col.startswith(("metadata.", "links."))]
)
# Column names vary by Zenodo archive type, so we drop any remaining metadata and link columns

# Convert string to date using Pandas
for col in ["metrics_date", "version_publication_date"]:
Expand All @@ -115,56 +79,45 @@ def core_zenodo_logs(
assert df.index.is_unique

# Add a column with the dataset slug
dataset_slugs = (
{
"Open Data for an Open Energy Transition": "ipi_presentation",
"PUDL Raw EIA Annual Energy Outlook (AEO)": "eiaaeo",
"PUDL Raw EIA Bulk Electricity API Data": "eia_bulk_elec",
"PUDL Raw EIA Form 191 -- Monthly Underground Natural Gas Storage Report": "eia191",
"PUDL Raw EIA Form 860": "eia860",
"PUDL Raw EIA Form 860 -- Annual Electric Generator Report": "eia860",
"PUDL Raw EIA Form 860M": "eia860m",
"PUDL Raw EIA Form 861": "eia861",
"PUDL Raw EIA Form 923": "eia923",
"PUDL Raw EIA Form 923 -- Power Plant Operations Report": "eia923",
"PUDL Raw EIA Form 930 -- Hourly and Daily Balancing Authority Operations Report": "eia930",
"PUDL Raw EIA Thermoelectric Cooling Water": "eiawater",
"PUDL Raw EPA CAMD to EIA Data Crosswalk": "epacamd_eia",
"PUDL Raw EPA CEMS unitid to EIA Plant Crosswalk": "epacamd_eia",
"PUDL Raw EPA Hourly Continuous Emission Monitoring System (CEMS)": "epacems",
"PUDL Raw FERC Form 1": "ferc1",
"PUDL Raw FERC Form 2": "ferc2",
"PUDL Raw FERC Form 6": "ferc6",
"PUDL Raw FERC Form 60": "ferc60",
"PUDL Raw FERC Form 714": "ferc714",
"PUDL Raw GridPath Resource Adequacy Toolkit Data": "gridpathatk",
"PUDL Raw GridPath Resource Adequacy Toolkit Renewable Generation Profiles": "gridpathatk",
"PUDL Raw Mine Safety and Health Administration (MSHA) Mines": "mshamines",
"PUDL Raw NREL Annual Technology Baseline (ATB) for Electricity": "nrelatb",
"PUDL Raw Pipelines and Hazardous Materials Safety Administration (PHMSA) Annual Natural Gas Report": "phmsagas",
"Public Utility Data Liberation Project (PUDL) Data Release": "pudl",
"The Public Utility Data Liberation (PUDL) Project": "pudl",
"Workplace Democracy, Open Data, and Open Source": "csv_conf_presentation",
}
| {
col: "pudl"
for col in df.version_title.unique()
if "catalyst-cooperative/pudl" in col or "PUDL Data Release" in col
}
| {
col: "ferc_xbrl_extractor"
for col in df.version_title.unique()
if "catalyst-cooperative/ferc-xbrl-extractor" in col
}
)
dataset_slugs = {
"10723220": "ipi_presentation",
"10838487": "eiaaeo",
"7067366": "eia_bulk_elec",
"10607836": "eia191",
"4127026": "eia860",
"4281336": "eia860m",
"4127028": "eia861",
"4127039": "eia923",
"10840077": "eia930",
"7683135": "eiawater",
"6633769": "epacamd_eia",
"10233185": "epacems",
"4127043": "ferc1",
"5879542": "ferc2",
"7126395": "ferc6",
"7126434": "ferc60",
"4127100": "ferc714",
"10844661": "gridpathatk",
"7683517": "mshamines",
"10839267": "nrelatb",
"7683351": "phmsagas",
"11402753": "csv_conf_2024_coops",
"13937522": "vcerare",
"13948331": "naps2024",
"11455506": "csv_conf_2024_pudl",
"13919959": "vcerare",
"3653158": "pudl_data_release",
"3404014": "pudl_code",
"10020145": "ferc_xbrl_extractor",
}

missed_mapping = df[
~df.version_title.isin(dataset_slugs.keys())
].version_title.unique()
assert not missed_mapping, f"Missed mapping slugs for {missed_mapping}"
~df.concept_record_id.isin(dataset_slugs.keys())
].concept_record_id
assert missed_mapping.empty, f"Missed mapping slugs for {missed_mapping.unique()}"

# Assert we haven't missed any of the titles
df["dataset_slug"] = df["version_title"].map(dataset_slugs)
df["dataset_slug"] = df["concept_record_id"].map(dataset_slugs)
assert not df["dataset_slug"].isnull().to_numpy().any()

context.log.info(f"Saving to {os.getenv("METRICS_PROD_ENV", "local")} environment.")
Expand Down
2 changes: 1 addition & 1 deletion src/usage_metrics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@
),
Column(
"version_description",
Boolean,
String,
comment="The description of the version.",
),
Column("partition_key", String),
Expand Down
7 changes: 5 additions & 2 deletions src/usage_metrics/raw/zenodo.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,21 @@ def filter_blobs(
# (ignoring older CSV archives)
# and only search for files in date range
file_name_prefixes = tuple(f"zenodo/{date}-" for date in partition_dates)
pattern = re.compile(r"\d{4}-\d{2}-\d{2}-\d+\.json$")

blobs = [
blob
for blob in blobs
if re.search(r"zenodo\/\d{4}-\d{2}-\d{2}-\d+\.json$", blob.name)
and blob.name in file_name_prefixes
if pattern.search(str(blob.name))
and blob.name.startswith(file_name_prefixes)
]
return blobs

def load_file(self, file_path: Path) -> pd.DataFrame:
"""Read in file as dataframe."""
with Path.open(file_path) as data_file:
data_json = json.load(data_file)

df = pd.json_normalize(data_json["hits"]["hits"])
# Add in date of metrics column from file name
df["metrics_date"] = re.search(r"\d{4}-\d{2}-\d{2}", str(file_path)).group()
Expand Down

0 comments on commit 1bd4092

Please sign in to comment.