Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/mysociety/data_common into …
Browse files Browse the repository at this point in the history
…main
  • Loading branch information
ajparsons committed Sep 6, 2023
2 parents b84627d + a3b8e93 commit 7d04a70
Show file tree
Hide file tree
Showing 17 changed files with 2,437 additions and 1,334 deletions.
3,066 changes: 1,799 additions & 1,267 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ python = ">=3.10,<3.11"
numpy = "1.21.0"
openpyxl = "3.0.7"
pandas = "1.4.2"
PyYAML = "5.4.1"
scikit-learn = "^1.0.2"
unicodecsv = "0.14.1"
xlrd = "2.0.1"
Expand Down Expand Up @@ -46,6 +45,8 @@ lxml = "^4.9.1"
pyarrow = "^11.0.0"
duckdb = "^0.6.1"
sqlfluff = "^1.4.5"
pydantic = "^2.3.0"
geopandas = "^0.13.2"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
4 changes: 2 additions & 2 deletions src/data_common/charting/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ def json_to_chart(json_spec: str) -> alt.Chart:
del di_copy["datasets"]
del di_copy["width"]
c = Chart.from_dict(di_copy)
chart += c
chart += c # type: ignore
else:
del di["width"]
del di["config"]["view"]
chart = Chart.from_dict(di)
return chart
return chart # type: ignore


def get_chart_from_url(url: str, n: int = 0) -> alt.Chart:
Expand Down
2 changes: 1 addition & 1 deletion src/data_common/charting/sw_theme.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def color_scale(
use_palette = palette[: len(domain)]
if reverse:
use_palette = use_palette[::-1]
return alt.Scale(domain=domain, range=use_palette)
return alt.Scale(domain=domain, range=use_palette) # type: ignore


font = "Lato"
Expand Down
2 changes: 1 addition & 1 deletion src/data_common/charting/theme.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def color_scale(
use_palette = palette[: len(domain)]
if reverse:
use_palette = use_palette[::-1]
return alt.Scale(domain=domain, range=use_palette)
return alt.Scale(domain=domain, range=use_palette) # type: ignore


font = "Source Sans Pro"
Expand Down
5 changes: 1 addition & 4 deletions src/data_common/dataset/jekyll_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
def markdown_with_frontmatter(
data: dict[str, Any], dest: Path, content: str = "", from_file: Path | None = None
):

if content and from_file:
raise ValueError("Trying to use contents and from_file arguments")

Expand All @@ -31,7 +30,6 @@ def markdown_with_frontmatter(


def render_download_format_to_dir(items: list[dict[str, Any]], output_dir: Path):

if output_dir.exists() is False:
output_dir.mkdir()
# remove existing files
Expand All @@ -52,7 +50,6 @@ def render_download_format_to_dir(items: list[dict[str, Any]], output_dir: Path)


def render_sources_to_dir(items: list[dict[str, Any]], output_dir: Path):

if output_dir.exists() is False:
output_dir.mkdir()
# remove existing files
Expand Down Expand Up @@ -103,7 +100,7 @@ def make_version_info_page(items: list[dict[str, Any]], output_dir: Path):
df = pd.DataFrame(items)[["name", "title", "version", "full_version"]]

for name, d in df.groupby("name"):
safe_name = name.replace("-", "_")
safe_name = str(name).replace("-", "_")
data_dict = {
"name": name,
"title": d["title"].iloc[0],
Expand Down
110 changes: 75 additions & 35 deletions src/data_common/dataset/resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import json
import os
import re
import shutil
import sqlite3
import subprocess
Expand All @@ -11,18 +12,15 @@
from dataclasses import dataclass
from pathlib import Path
from shutil import copyfile
from typing import Any, Callable, Dict, Literal, TypedDict, TypeVar, cast
from typing import Any, Callable, Literal, TypedDict, TypeVar, cast
from urllib.parse import urlencode

import geopandas as gpd
import pandas as pd
import pytest
import rich
import xlsxwriter
import re

from frictionless import Schema, describe, validate
from pyparsing import any_open_tag
from rich.markdown import Markdown
from frictionless import describe, validate
from rich.table import Table
from ruamel.yaml import YAML

Expand Down Expand Up @@ -168,16 +166,18 @@ def get_df(self) -> pd.DataFrame:
else:
raise ValueError(f"Unhandled file type {self.path.suffix}")

def get_resource(self, inline_data: bool = False) -> dict[str, Any]:

def get_resource(
self, inline_data: bool = False, is_geodata: bool = False
) -> dict[str, Any]:
if self.has_resource_yaml:
yaml = YAML(typ="safe")
with open(self.resource_path, "r") as f:
with self.resource_path.open("r") as f:
resource = yaml.load(f)
if inline_data:
resource["data"] = (
self.get_df().fillna(value="").to_dict(orient="records")
)
df = self.get_df()
if is_geodata and "geometry" in df.columns:
df = df.drop(columns=["geometry"])
resource["data"] = df.fillna(value="").to_dict(orient="records")
resource["format"] = "json"
del resource["scheme"]
del resource["path"]
Expand Down Expand Up @@ -205,19 +205,26 @@ def get_schema_from_file(
) -> SchemaValidator:
return update_table_schema(self.path, existing_schema)

def rebuild_yaml(self):
def rebuild_yaml(self, is_geodata: bool = False):
"""
Recreate yaml file from source file, preserving any custom values from previously existing yaml file
"""
from frictionless.resource.resource import Resource

existing_desc = self.get_resource()
desc = describe(self.path)
desc.update(existing_desc)

desc["schema"] = self.get_schema_from_file(existing_desc.get("schema", None))
desc["path"] = self.path.name

# if geodata - drop geometry example from schema
if is_geodata:
new_fields = []
for f in desc["schema"]["fields"]:
if f["name"] == "geometry":
f["example"] = ""
new_fields.append(f)
desc["schema"]["fields"] = new_fields

# ensure a blank title and description
new_dict = {"title": None, "description": None, "custom": {}}

Expand Down Expand Up @@ -262,7 +269,7 @@ def rebuild_yaml(self):
yaml_str = yaml_str.replace("- no\n", "- 'no'\n")
yaml_str = yaml_str.replace("- yes\n", "- 'yes'\n")

with open(self.resource_path, "w") as f:
with self.resource_path.open("w") as f:
f.write(yaml_str)
print(f"Updated config for {self.slug} to {self.resource_path}")

Expand Down Expand Up @@ -337,7 +344,6 @@ def build_from_function(self):
)
return None
if ":" in build_module and " " not in build_module:

module, function = build_module.split(":")
module = importlib.import_module(module)
function = getattr(module, function)
Expand Down Expand Up @@ -680,7 +686,6 @@ def derive_bump_rule_from_change(self) -> tuple[version_rules, str] | None:
)

if current_data != previous_data:

dict_diff = diff_dicts(previous_data, current_data)
rich.print(dict_diff)

Expand Down Expand Up @@ -809,8 +814,13 @@ def rebuild_resource(self, slug: str):
resource.rebuild_yaml()

def rebuild_all_resources(self):
is_geodata = self.is_geodata()
for resource in self.resources().values():
resource.rebuild_yaml()
resource.rebuild_yaml(is_geodata=is_geodata)

def is_geodata(self) -> bool:
desc = self.get_datapackage()
return desc["custom"].get("is_geodata", False)

def get_datapackage(self) -> dict[str, Any]:
yaml = YAML(typ="safe")
Expand Down Expand Up @@ -897,15 +907,24 @@ def copy_resources(self):
"""

desc = self.get_datapackage()
csv_value = desc.get("custom", {}).get("formats", {}).get("csv", True)
parquet_value = desc.get("custom", {}).get("formats", {}).get("parquet", True)
formats = desc.get("custom", {}).get("formats", {})
csv_value = formats.get("csv", True)
parquet_value = formats.get("parquet", True)
geojson_value = formats.get("geojson", False)
geopackage_value = formats.get("gpkg", False)

csv_copy_query = """
copy (select * from {{ source }}) to {{ dest }} (format PARQUET);
"""

# __index_level_0__ is an internal parquet column that duckdb has access to
# but we don't want to export
exclude = ""
if desc["custom"].get("is_geodata", False):
exclude = "EXCLUDE (__index_level_0__, geometry)"

parquet_copy_query = """
copy (select * from {{ source }}) to {{ dest }} (HEADER, DELIMITER ',');
copy (select * {{ exclude }} from {{ source }}) to {{ dest }} (HEADER, DELIMITER ',');
"""

for r in self.resources().values():
Expand All @@ -915,13 +934,30 @@ def copy_resources(self):
copyfile(r.path, self.build_path() / r.path.name)
if parquet_value:
parquet_file = self.build_path() / (r.path.stem + ".parquet")
duck_query(csv_copy_query, source=r.path, dest=parquet_file)
duck_query(csv_copy_query, source=r.path, dest=parquet_file).run()
if geojson_value or geopackage_value:
raise ValueError(
"Writing to geojson/geopackage from csv source not supported. Use parquet internally."
)
elif r.path.suffix == ".parquet":
if parquet_value:
copyfile(r.path, self.build_path() / r.path.name)
if csv_value:
csv_file = self.build_path() / (r.path.stem + ".csv")
duck_query(parquet_copy_query, source=r.path, dest=csv_file)
duck_query(
parquet_copy_query,
exclude=exclude,
source=r.path,
dest=csv_file,
).run()
if geojson_value:
geojson_path = self.build_path() / (r.path.stem + ".geojson")
gdf = gpd.read_parquet(r.path)
gdf.to_file(geojson_path, driver="GeoJSON")
if geopackage_value:
geopackage_path = self.build_path() / (r.path.stem + ".gpkg")
gdf = gpd.read_parquet(r.path)
gdf.to_file(geopackage_path, driver="GPKG")

def get_datapackage_order(self) -> int:
"""
Expand Down Expand Up @@ -1103,7 +1139,7 @@ def get_composite_options(

return composite_options

def build_excel(self):
def build_excel(self, is_geodata: bool = False):
"""
Build a single excel file for all resources
"""
Expand Down Expand Up @@ -1134,6 +1170,9 @@ def build_excel(self):

for sheet_name, df in sheets.items():
short_sheet_name = sheet_name[-31:] # only allow 31 characters
# if geometry is column - remove it
if is_geodata and "geometry" in df.columns:
df = df.drop(columns=["geometry"])
df.to_excel(writer, sheet_name=short_sheet_name, index=False)

for column in df:
Expand All @@ -1142,7 +1181,6 @@ def build_excel(self):

col_idx = df.columns.get_loc(column)
if column_length <= 50:

writer.sheets[short_sheet_name].set_column(
col_idx, col_idx, column_length
)
Expand All @@ -1153,7 +1191,7 @@ def build_excel(self):

writer.save()

def build_sqlite(self):
def build_sqlite(self, is_geodata: bool = False):
"""
Create a composite sqlite file for all resources
with metadata as a seperate table.
Expand All @@ -1176,7 +1214,10 @@ def build_sqlite(self):
for slug, resource in self.resources().items():
if slug not in allowed_resource_slugs:
continue
sheets[slug] = resource.get_df()
df = resource.get_df()
if is_geodata and "geometry" in df.columns:
df = df.drop(columns=["geometry"])
sheets[slug] = df
meta_df = resource.get_metadata_df()
meta_df["resource"] = slug
metadata.append(meta_df)
Expand All @@ -1192,7 +1233,7 @@ def build_sqlite(self):
df.to_sql(name, con, index=False)
con.close()

def build_composite_json(self):
def build_composite_json(self, is_geodata: bool = False):
"""
This builds a composite json file that inlines the data as json.
It can have less resources than the total, and some modifiers on the data.
Expand All @@ -1211,7 +1252,7 @@ def build_composite_json(self):
]

datapackage["resources"] = [
x.get_resource(inline_data=True)
x.get_resource(inline_data=True, is_geodata=is_geodata)
for x in self.resources().values()
if x.slug in allowed_resource_slugs
]
Expand All @@ -1236,7 +1277,6 @@ def convert_to_array_from_comma(value: t) -> list[t]:
# for instance splitting comma seperated fields to arrays
for resource_slug, modify_maps in composite_options["modify"].items():
for column, modify_type in modify_maps.items():

# split specified columns to arrays and update the schema
if modify_type == "comma-to-array":
for resource in datapackage["resources"]:
Expand Down Expand Up @@ -1271,9 +1311,10 @@ def build_composites(self):
"""
Create composite files for the datapackage
"""
self.build_excel()
self.build_sqlite()
self.build_composite_json()
is_geodata = self.is_geodata()
self.build_excel(is_geodata)
self.build_sqlite(is_geodata)
self.build_composite_json(is_geodata)

def build_markdown(self):
"""
Expand All @@ -1282,7 +1323,6 @@ def build_markdown(self):
...

def print_status(self):

resources = list(self.resources().values())

df = pd.DataFrame(
Expand Down
5 changes: 4 additions & 1 deletion src/data_common/dataset/table_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
from pandas.io.json import build_table_schema

from data_common.db import duck_query


Expand Down Expand Up @@ -80,6 +81,9 @@ def enhance_field(
field["constraints"]["enum"] = enum_value
if isinstance(enum_value, EnumPlaceholder):
field["constraints"]["enum"] = enum_value.process(col)
if isinstance(field["constraints"]["enum"], list):
# sort the enum values
field["constraints"]["enum"] = sorted(field["constraints"]["enum"])
return field

@classmethod
Expand All @@ -105,7 +109,6 @@ def get_table_schema(
def update_table_schema(
path: Path, existing_schema: SchemaValidator | None
) -> SchemaValidator:

if path.suffix == ".csv":
df = pd.read_csv(path)
elif path.suffix == ".parquet":
Expand Down
Loading

0 comments on commit 7d04a70

Please sign in to comment.