Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dashboard As Code for Reconcile #768

Merged
merged 13 commits into from
Oct 2, 2024
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies = [
"databricks-sdk~=0.29.0",
"sqlglot==25.8.1",
"databricks-labs-blueprint[yaml]>=0.2.3",
"databricks-labs-lsql>=0.4.3",
"databricks-labs-lsql>=0.7.5",
]

[project.urls]
Expand Down
137 changes: 80 additions & 57 deletions src/databricks/labs/remorph/deployment/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,105 @@
import json
import logging
from datetime import timedelta
from importlib.abc import Traversable
from typing import Any
from pathlib import Path

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.retries import retried
from databricks.sdk.service.dashboards import Dashboard
from databricks.sdk.errors import InvalidParameterValue, NotFound, DatabricksError, ResourceAlreadyExists

from databricks.sdk.service.dashboards import LifecycleState
from databricks.labs.lsql.dashboards import DashboardMetadata, Dashboards

from databricks.labs.remorph.config import ReconcileMetadataConfig

logger = logging.getLogger(__name__)


class DashboardDeployment:
_UPLOAD_TIMEOUT = timedelta(seconds=30)

def __init__(self, ws: WorkspaceClient, installation: Installation, install_state: InstallState):
def __init__(
self,
ws: WorkspaceClient,
installation: Installation,
install_state: InstallState,
):
self._ws = ws
self._installation = installation
self._install_state = install_state

def deploy(self, name: str, dashboard_file: Traversable, parameters: dict[str, Any] | None = None):
logger.debug(f"Deploying dashboard {name} from {dashboard_file.name}")
dashboard_data = self._substitute_params(dashboard_file, parameters or {})
dashboard = self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Dashboard deployed with dashboard_id {dashboard.dashboard_id}")
logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}")
def _handle_existing_dashboard(self, dashboard_id: str, display_name: str, parent_path: str) -> str | None:
try:
dashboard = self._ws.lakeview.get(dashboard_id)
if dashboard.lifecycle_state is None:
raise NotFound(f"Dashboard life cycle state: {display_name} ({dashboard_id})")
if dashboard.lifecycle_state == LifecycleState.TRASHED:
logger.info(f"Recreating trashed dashboard: {display_name} ({dashboard_id})")
return None # Recreate the dashboard if it is trashed (manually)

sundarshankar89 marked this conversation as resolved.
Show resolved Hide resolved
except (NotFound, InvalidParameterValue):
logger.info(f"Recovering invalid dashboard: {display_name} ({dashboard_id})")
try:
dashboard_path = f"{parent_path}/{display_name}.lvdash.json"
self._ws.workspace.delete(dashboard_path) # Cannot recreate dashboard if file still exists
logger.debug(f"Deleted dangling dashboard {display_name} ({dashboard_id}): {dashboard_path}")
except NotFound:
pass
return None # Recreate the dashboard if it's reference is corrupted (manually)
return dashboard_id # Update the existing dashboard
bishwajit-db marked this conversation as resolved.
Show resolved Hide resolved

def deploy(
self,
name: str,
folder: Path,
config: ReconcileMetadataConfig,
):
"""Create a dashboard from Queries inside folder"""
logger.info(f"Deploying dashboard {name} from {folder}")
parent_path = f"{self._installation.install_folder()}/dashboards"
try:
self._ws.workspace.mkdirs(parent_path)
except ResourceAlreadyExists:
pass

bishwajit-db marked this conversation as resolved.
Show resolved Hide resolved
metadata = DashboardMetadata.from_path(folder).replace_database(
database=f"hive_metastore.{config.schema}",
database_to_replace="inventory",
)

metadata.display_name = self._name_with_prefix(metadata.display_name)

reference = f"{folder.parent.stem}_{folder.stem}".lower()
dashboard_id = self._install_state.dashboards.get(reference)
if dashboard_id is not None:
dashboard_id = self._handle_existing_dashboard(dashboard_id, metadata.display_name, parent_path)

# dashboard_data = self._substitute_params(dashboard_file, parameters or {})
dashboard_id = self._update_or_create_dashboard(name, dashboard_id, metadata, parent_path)
bishwajit-db marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Dashboard deployed with dashboard_id {dashboard_id}")
logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard_id}")
self._install_state.save()
bishwajit-db marked this conversation as resolved.
Show resolved Hide resolved

@retried(on=[DatabricksError], timeout=_UPLOAD_TIMEOUT)
def _update_or_create_dashboard(self, name: str, dashboard_data, dashboard_file) -> Dashboard:
if name in self._install_state.dashboards:
try:
dashboard_id = self._install_state.dashboards[name]
logger.info(f"Updating dashboard with id={dashboard_id}")
updated_dashboard = self._ws.lakeview.update(
dashboard_id,
display_name=self._name_with_prefix(name),
serialized_dashboard=dashboard_data,
)
return updated_dashboard
except InvalidParameterValue:
del self._install_state.dashboards[name]
logger.warning(f"Dashboard {name} does not exist anymore for some reason.")
return self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Creating new dashboard {name}")
new_dashboard = self._ws.lakeview.create(
display_name=self._name_with_prefix(name),
parent_path=self._install_state.install_folder(),
serialized_dashboard=dashboard_data,
def _update_or_create_dashboard(
self,
name: str,
dashboard_id: str,
metadata: DashboardMetadata,
parent_path: str,
) -> str:

dashboard = Dashboards(self._ws).create_dashboard(
metadata,
dashboard_id=dashboard_id,
parent_path=parent_path,
warehouse_id=self._ws.config.warehouse_id,
publish=True,
)
assert new_dashboard.dashboard_id is not None
self._install_state.dashboards[name] = new_dashboard.dashboard_id
return new_dashboard

def _substitute_params(self, dashboard_file: Traversable, parameters: dict[str, Any]) -> str:
if not parameters:
return dashboard_file.read_text()

with dashboard_file.open() as f:
dashboard_data = json.load(f)

for dataset in dashboard_data.get("datasets", []):
for param in dataset.get("parameters", []):
if param["keyword"] in parameters:
param["defaultSelection"] = {
"values": {
"dataType": "STRING",
"values": [
{"value": parameters[param["keyword"]]},
],
},
}

return json.dumps(dashboard_data)
assert dashboard.dashboard_id is not None
self._install_state.dashboards[name] = dashboard.dashboard_id
return dashboard.dashboard_id

def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/remorph/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(

def install(self, config: RemorphConfigs):
if config.reconcile:
logger.info("Installing Remorph reconcile Metadata components.")
self._recon_deployment.install(config.reconcile)

def uninstall(self, config: RemorphConfigs):
Expand Down
15 changes: 5 additions & 10 deletions src/databricks/labs/remorph/deployment/recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from databricks.labs.blueprint.wheels import ProductInfo
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue, NotFound

from databricks.labs.blueprint.wheels import find_project_root
import databricks.labs.remorph.resources
from databricks.labs.remorph.config import ReconcileConfig
from databricks.labs.remorph.deployment.dashboard import DashboardDeployment
Expand Down Expand Up @@ -40,9 +40,10 @@ def __init__(
self._dashboard_deployer = dashboard_deployer

def install(self, recon_config: ReconcileConfig | None):
logger.info("Installing reconcile components.")
if not recon_config:
logger.warning("Recon Config is empty")
return
logger.info("Installing reconcile components.")
self._deploy_tables(recon_config)
self._deploy_dashboards(recon_config)
self._deploy_jobs(recon_config)
Expand Down Expand Up @@ -91,15 +92,9 @@ def _deploy_dashboards(self, recon_config: ReconcileConfig):
continue

def _deploy_recon_metrics_dashboard(self, name: str, recon_config: ReconcileConfig):
dashboard_params = {
"catalog": recon_config.metadata_config.catalog,
"schema": recon_config.metadata_config.schema,
}

reconcile_dashboard_path = "reconcile/dashboards/Remorph-Reconciliation.lvdash.json"
dashboard_file = files(databricks.labs.remorph.resources).joinpath(reconcile_dashboard_path)
queries_folder = find_project_root(__file__) / "src/databricks/labs/remorph/resources/reconcile/dashboards"
logger.info(f"Creating Reconciliation Dashboard `{name}`")
self._dashboard_deployer.deploy(name, dashboard_file, parameters=dashboard_params)
self._dashboard_deployer.deploy(name, queries_folder, recon_config.metadata_config)

def _get_dashboards(self) -> list[tuple[str, str]]:
return [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
select
main.recon_id,
main.source_type,
main.report_type,
main.source_table.`catalog` as source_catalog,
main.source_table.`schema` as source_schema,
main.source_table.table_name as source_table_name,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
main.target_table.`catalog` as target_catalog,
main.target_table.`schema` as target_schema,
main.target_table.table_name as target_table_name,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
sundarshankar89 marked this conversation as resolved.
Show resolved Hide resolved
metrics.run_metrics.status as status,
metrics.run_metrics.exception_message as exception,
metrics.recon_metrics.row_comparison.missing_in_source as missing_in_source,
metrics.recon_metrics.row_comparison.missing_in_target as missing_in_target,
metrics.recon_metrics.column_comparison.absolute_mismatch as absolute_mismatch,
metrics.recon_metrics.column_comparison.threshold_mismatch as threshold_mismatch,
metrics.recon_metrics.column_comparison.mismatch_columns as mismatch_columns,
metrics.recon_metrics.schema_comparison as schema_comparison,
metrics.run_metrics.run_by_user as executed_by,
main.start_ts as start_ts,
main.end_ts as end_ts
from IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
inner join
IDENTIFIER(:catalog || '.' || :schema || '.metrics' ) metrics
on main.recon_table_id = metrics.recon_table_id
order by metrics.inserted_ts desc, main.recon_id, main.target_table.table_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
with tmp as (select
recon_table_id,
inserted_ts,
explode(data) as schema_data
from IDENTIFIER(:catalog || '.' || :schema || '.details' )
where recon_type='schema'
)
select
main.recon_id,
main.source_table.`catalog` as source_catalog,
main.source_table.`schema` as source_schema,
main.source_table.table_name as source_table_name,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
main.target_table.`catalog` as target_catalog,
main.target_table.`schema` as target_schema,
main.target_table.table_name as target_table_name,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
schema_data['source_column'] as source_column,
schema_data['source_datatype'] as source_datatype,
schema_data['databricks_column'] as databricks_column,
schema_data['databricks_datatype'] as databricks_datatype,
schema_data['is_valid'] as is_valid
from
IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
inner join
tmp
on main.recon_table_id = tmp.recon_table_id
order by tmp.inserted_ts desc, main.recon_id, main.target_table
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
with tmp as (select recon_table_id, inserted_ts ,recon_type, explode(data) as data,
row_number() over(partition by recon_table_id,recon_type order by recon_table_id) as rn
from IDENTIFIER(:catalog || '.' || :schema || '.details' )
where recon_type != 'schema')
select main.recon_id,
main.source_table.`catalog` as source_catalog,
main.source_table.`schema` as source_schema,
main.source_table.table_name as source_table_name,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
main.target_table.`catalog` as target_catalog,
main.target_table.`schema` as target_schema,
main.target_table.table_name as target_table_name,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
recon_type, key, value, rn
from tmp
inner join
IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
on main.recon_table_id = tmp.recon_table_id
lateral view explode(data) exploded_data AS key, value
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
select
main.recon_id,
main.source_type,
main.report_type,
main.source_table.`catalog` as source_catalog,
main.source_table.`schema` as source_schema,
main.source_table.table_name as source_table_name,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
main.target_table.`catalog` as target_catalog,
main.target_table.`schema` as target_schema,
main.target_table.table_name as target_table_name,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
metrics.run_metrics.status as status,
metrics.run_metrics.run_by_user as executed_by,
main.start_ts as start_ts,
main.end_ts as end_ts,
date(main.start_ts) as start_date
from IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
inner join
IDENTIFIER(:catalog || '.' || :schema || '.metrics' ) metrics
on main.recon_table_id = metrics.recon_table_id
where metrics.run_metrics.status = false
order by metrics.inserted_ts desc, main.recon_id, main.target_table.table_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
select
main.recon_id,
main.source_type,
main.report_type,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
metrics.run_metrics.status as status,
metrics.run_metrics.run_by_user as executed_by,
main.start_ts as start_ts,
main.end_ts as end_ts,
date(main.start_ts) as start_date
from IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
inner join
IDENTIFIER(:catalog || '.' || :schema || '.metrics' ) metrics
on main.recon_table_id = metrics.recon_table_id
where metrics.run_metrics.status = true
order by metrics.inserted_ts desc, main.recon_id, main.target_table.table_name

Large diffs are not rendered by default.

sundarshankar89 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
display_name: "Reconciliation Metrics"

This file was deleted.

1 change: 0 additions & 1 deletion tests/resources/Remorph-Reconciliation.lvdash.json

This file was deleted.

1 change: 0 additions & 1 deletion tests/resources/Test_Dashboard_No_Param.lvdash.json

This file was deleted.

1 change: 1 addition & 0 deletions tests/resources/dashboards/queries/00_description.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#Reconcile Dashboard Queries Test
31 changes: 31 additions & 0 deletions tests/resources/dashboards/queries/01_queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
select
main.recon_id,
main.source_type,
main.report_type,
main.source_table.`catalog` as source_catalog,
main.source_table.`schema` as source_schema,
main.source_table.table_name as source_table_name,
CASE
WHEN COALESCE(MAIN.SOURCE_TABLE.CATALOG, '') <> '' THEN CONCAT(MAIN.SOURCE_TABLE.CATALOG, '.', MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
ELSE CONCAT(MAIN.SOURCE_TABLE.SCHEMA, '.', MAIN.SOURCE_TABLE.TABLE_NAME)
END AS source_table,
main.target_table.`catalog` as target_catalog,
main.target_table.`schema` as target_schema,
main.target_table.table_name as target_table_name,
CONCAT(MAIN.TARGET_TABLE.CATALOG, '.', MAIN.TARGET_TABLE.SCHEMA, '.', MAIN.TARGET_TABLE.TABLE_NAME) AS target_table,
metrics.run_metrics.status as status,
metrics.run_metrics.exception_message as exception,
metrics.recon_metrics.row_comparison.missing_in_source as missing_in_source,
metrics.recon_metrics.row_comparison.missing_in_target as missing_in_target,
metrics.recon_metrics.column_comparison.absolute_mismatch as absolute_mismatch,
metrics.recon_metrics.column_comparison.threshold_mismatch as threshold_mismatch,
metrics.recon_metrics.column_comparison.mismatch_columns as mismatch_columns,
metrics.recon_metrics.schema_comparison as schema_comparison,
metrics.run_metrics.run_by_user as executed_by,
main.start_ts as start_ts,
main.end_ts as end_ts
from IDENTIFIER(:catalog || '.' || :schema || '.main' ) main
inner join
IDENTIFIER(:catalog || '.' || :schema || '.metrics' ) metrics
on main.recon_table_id = metrics.recon_table_id
order by metrics.inserted_ts desc, main.recon_id, main.target_table.table_name
Loading
Loading