From f3e217a686f76774415efa732ea9383df3b50c9c Mon Sep 17 00:00:00 2001 From: fivetran-juliengoulley <104826606+fivetran-juliengoulley@users.noreply.github.com> Date: Mon, 5 Sep 2022 16:38:01 +0200 Subject: [PATCH 1/3] Remove partition over by to fix memory limit issue Refer to this bug - https://github.com/fivetran/dbt_fivetran_log/issues/56 --- models/fivetran_log__connector_status.sql | 80 ++++++++++++++--------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/models/fivetran_log__connector_status.sql b/models/fivetran_log__connector_status.sql index e859139..1c09863 100644 --- a/models/fivetran_log__connector_status.sql +++ b/models/fivetran_log__connector_status.sql @@ -1,27 +1,43 @@ +{{ + config( + materialized='incremental', + unique_key = 'CONNECTOR_ID' + ) +}} + with transformation_removal as ( - select * - from {{ ref('stg_fivetran_log__log') }} - where transformation_id is null + select ROW_NUMBER() OVER(order by sfll.connector_id, sfll.created_at) as sync_batch_id, sfll.* + from {{ ref('stg_fivetran_log__log') }} sfll + {% if is_incremental() %} + , {{this}} flcs + where sfll.created_at >= Case When flcs.last_sync_started_at <= flcs.last_successful_sync_completed_at Then flcs.last_sync_started_at + When flcs.last_successful_sync_completed_at <= flcs.last_sync_started_at Then flcs.last_successful_sync_completed_at + Else flcs.set_up_at + End + and sfll.connector_id = flcs.connector_id + and + {% else %} + where + {% endif %} + sfll.transformation_id is null ), connector_log as ( - select - *, - sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id - order by created_at rows unbounded preceding) as sync_batch_id + select + * from transformation_removal -- only looking at errors, warnings, and syncs here where event_type = 'SEVERE' or event_type = 'WARNING' or event_subtype like 'sync%' - or (event_subtype = 'status' + or (event_subtype = 'status' and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED' - + and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%' ) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure - or (event_subtype = 'status' + or (event_subtype = 'status' and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL' ) -- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected." @@ -35,7 +51,7 @@ schema_changes as ( from {{ ref('stg_fivetran_log__log') }} - where + where {{ dbt_utils.datediff('created_at', dbt_utils.current_timestamp(), 'day') }} <= 30 and event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') @@ -52,7 +68,7 @@ connector as ( destination as ( - select * + select * from {{ ref('stg_fivetran_log__destination') }} ), @@ -67,30 +83,32 @@ connector_metrics as ( connector.set_up_at, max(case when connector_log.event_subtype = 'sync_start' then connector_log.created_at else null end) as last_sync_started_at, - max(case when connector_log.event_subtype = 'sync_end' + max(case when connector_log.event_subtype = 'sync_end' then connector_log.created_at else null end) as last_sync_completed_at, max(case when connector_log.event_subtype in ('status', 'sync_end') and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL' then connector_log.created_at else null end) as last_successful_sync_completed_at, + max(case when connector_log.event_subtype = 'sync_start' + then connector_log.sync_batch_id else null end) as last_sync_start_id, - max(case when connector_log.event_subtype = 'sync_end' - then connector_log.sync_batch_id else null end) as last_sync_batch_id, + max(case when connector_log.event_subtype = 'sync_end' + then connector_log.sync_batch_id else null end) as last_sync_end_id, max(case when connector_log.event_subtype in ('status', 'sync_end') and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%' then connector_log.created_at else null end) as last_priority_first_sync_completed_at, - + max(case when connector_log.event_type = 'SEVERE' then connector_log.created_at else null end) as last_error_at, max(case when connector_log.event_type = 'SEVERE' then connector_log.sync_batch_id else null end) as last_error_batch, max(case when event_type = 'WARNING' then connector_log.created_at else null end) as last_warning_at - from connector - left join connector_log + from connector + left join connector_log on connector_log.connector_id = connector.connector_id {{ dbt_utils.group_by(n=6) }} @@ -100,7 +118,7 @@ connector_health as ( select *, - case + case -- connector is paused when is_paused then 'paused' @@ -117,7 +135,7 @@ connector_health as ( when last_sync_completed_at is null and last_error_at is null then 'initial sync in progress' -- the last attempted sync had an error - when last_sync_batch_id = last_error_batch then 'broken' + when last_sync_start_id <= last_error_batch and last_error_batch <= last_sync_end_id then 'broken' -- there's never been a successful sync and there have been errors when last_sync_completed_at is null and last_error_at is not null then 'broken' @@ -130,7 +148,7 @@ connector_health as ( -- Joining with log to grab pertinent error/warning messagees connector_recent_logs as ( - select + select connector_health.connector_id, connector_health.connector_name, connector_health.connector_type, @@ -144,20 +162,20 @@ connector_recent_logs as ( connector_log.event_type, connector_log.message_data - from connector_health - left join connector_log + from connector_health + left join connector_log on connector_log.connector_id = connector_health.connector_id -- limiting relevance to since the last successful sync completion (if there has been one) - and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01') + and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01') -- only looking at errors and warnings (excluding syncs - both normal and priority first) - and connector_log.event_type != 'INFO' + and connector_log.event_type != 'INFO' -- need to explicitly avoid priority first statuses because they are of event_type WARNING - and not (connector_log.event_subtype = 'status' + and not (connector_log.event_subtype = 'status' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%') {{ dbt_utils.group_by(n=12) }} -- de-duping error messages - + ), @@ -175,18 +193,18 @@ final as ( connector_recent_logs.last_sync_completed_at, connector_recent_logs.set_up_at, coalesce(schema_changes.number_of_schema_changes_last_month, 0) as number_of_schema_changes_last_month - + {% if var('fivetran_log_using_sync_alert_messages', true) %} , {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'SEVERE' then connector_recent_logs.message_data else null end", "'\\n'") }} as errors_since_last_completed_sync , {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'WARNING' then connector_recent_logs.message_data else null end", "'\\n'") }} as warnings_since_last_completed_sync {% endif %} from connector_recent_logs - left join schema_changes - on connector_recent_logs.connector_id = schema_changes.connector_id + left join schema_changes + on connector_recent_logs.connector_id = schema_changes.connector_id join destination on destination.destination_id = connector_recent_logs.destination_id {{ dbt_utils.group_by(n=11) }} ) -select * from final \ No newline at end of file +select * from final From 1f2bdacb1822bbe93197124961070768748a24f0 Mon Sep 17 00:00:00 2001 From: fivetran-juliengoulley <104826606+fivetran-juliengoulley@users.noreply.github.com> Date: Mon, 5 Sep 2022 17:32:14 +0200 Subject: [PATCH 2/3] Changed project version --- dbt_project.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_project.yml b/dbt_project.yml index 5575940..a51e75f 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,7 +1,7 @@ config-version: 2 name: 'fivetran_log' -version: '0.6.2' +version: '0.6.3' require-dbt-version: [">=1.0.0", "<2.0.0"] From 09db6dc61e00324f0289136c675b9edc5770cdd2 Mon Sep 17 00:00:00 2001 From: fivetran-juliengoulley <104826606+fivetran-juliengoulley@users.noreply.github.com> Date: Mon, 5 Sep 2022 17:39:25 +0200 Subject: [PATCH 3/3] Update CHANGELOG.md --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd6a1b6..07d046c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# dbt_fivetran_log v0.6.3 +## Fixes +- Changed partitioning logic in the connector_status model in order to avoid "allocated memory limit". ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62)) +## Contributors +- [@fivetran-juliengoulley](https://github.com/fivetran-juliengoulley) ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62)) + + # dbt_fivetran_log v0.6.2 ## Fixes - Extend model disablement with `config: is_enabled` setting in sources to avoid running source freshness when a model is disabled. ([#58](https://github.com/fivetran/dbt_fivetran_log/pull/58))