diff --git a/CHANGELOG.md b/CHANGELOG.md index bd6a1b6..07d046c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# dbt_fivetran_log v0.6.3 +## Fixes +- Changed partitioning logic in the connector_status model in order to avoid "allocated memory limit". ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62)) +## Contributors +- [@fivetran-juliengoulley](https://github.com/fivetran-juliengoulley) ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62)) + + # dbt_fivetran_log v0.6.2 ## Fixes - Extend model disablement with `config: is_enabled` setting in sources to avoid running source freshness when a model is disabled. ([#58](https://github.com/fivetran/dbt_fivetran_log/pull/58)) diff --git a/dbt_project.yml b/dbt_project.yml index 5575940..a51e75f 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,7 +1,7 @@ config-version: 2 name: 'fivetran_log' -version: '0.6.2' +version: '0.6.3' require-dbt-version: [">=1.0.0", "<2.0.0"] diff --git a/models/fivetran_log__connector_status.sql b/models/fivetran_log__connector_status.sql index e859139..1c09863 100644 --- a/models/fivetran_log__connector_status.sql +++ b/models/fivetran_log__connector_status.sql @@ -1,27 +1,43 @@ +{{ + config( + materialized='incremental', + unique_key = 'CONNECTOR_ID' + ) +}} + with transformation_removal as ( - select * - from {{ ref('stg_fivetran_log__log') }} - where transformation_id is null + select ROW_NUMBER() OVER(order by sfll.connector_id, sfll.created_at) as sync_batch_id, sfll.* + from {{ ref('stg_fivetran_log__log') }} sfll + {% if is_incremental() %} + , {{this}} flcs + where sfll.created_at >= Case When flcs.last_sync_started_at <= flcs.last_successful_sync_completed_at Then flcs.last_sync_started_at + When flcs.last_successful_sync_completed_at <= flcs.last_sync_started_at Then flcs.last_successful_sync_completed_at + Else flcs.set_up_at + End + and sfll.connector_id = flcs.connector_id + and + {% else %} + where + {% endif %} + sfll.transformation_id is null ), connector_log as ( - select - *, - sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id - order by created_at rows unbounded preceding) as sync_batch_id + select + * from transformation_removal -- only looking at errors, warnings, and syncs here where event_type = 'SEVERE' or event_type = 'WARNING' or event_subtype like 'sync%' - or (event_subtype = 'status' + or (event_subtype = 'status' and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED' - + and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%' ) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure - or (event_subtype = 'status' + or (event_subtype = 'status' and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL' ) -- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected." @@ -35,7 +51,7 @@ schema_changes as ( from {{ ref('stg_fivetran_log__log') }} - where + where {{ dbt_utils.datediff('created_at', dbt_utils.current_timestamp(), 'day') }} <= 30 and event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') @@ -52,7 +68,7 @@ connector as ( destination as ( - select * + select * from {{ ref('stg_fivetran_log__destination') }} ), @@ -67,30 +83,32 @@ connector_metrics as ( connector.set_up_at, max(case when connector_log.event_subtype = 'sync_start' then connector_log.created_at else null end) as last_sync_started_at, - max(case when connector_log.event_subtype = 'sync_end' + max(case when connector_log.event_subtype = 'sync_end' then connector_log.created_at else null end) as last_sync_completed_at, max(case when connector_log.event_subtype in ('status', 'sync_end') and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL' then connector_log.created_at else null end) as last_successful_sync_completed_at, + max(case when connector_log.event_subtype = 'sync_start' + then connector_log.sync_batch_id else null end) as last_sync_start_id, - max(case when connector_log.event_subtype = 'sync_end' - then connector_log.sync_batch_id else null end) as last_sync_batch_id, + max(case when connector_log.event_subtype = 'sync_end' + then connector_log.sync_batch_id else null end) as last_sync_end_id, max(case when connector_log.event_subtype in ('status', 'sync_end') and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%' then connector_log.created_at else null end) as last_priority_first_sync_completed_at, - + max(case when connector_log.event_type = 'SEVERE' then connector_log.created_at else null end) as last_error_at, max(case when connector_log.event_type = 'SEVERE' then connector_log.sync_batch_id else null end) as last_error_batch, max(case when event_type = 'WARNING' then connector_log.created_at else null end) as last_warning_at - from connector - left join connector_log + from connector + left join connector_log on connector_log.connector_id = connector.connector_id {{ dbt_utils.group_by(n=6) }} @@ -100,7 +118,7 @@ connector_health as ( select *, - case + case -- connector is paused when is_paused then 'paused' @@ -117,7 +135,7 @@ connector_health as ( when last_sync_completed_at is null and last_error_at is null then 'initial sync in progress' -- the last attempted sync had an error - when last_sync_batch_id = last_error_batch then 'broken' + when last_sync_start_id <= last_error_batch and last_error_batch <= last_sync_end_id then 'broken' -- there's never been a successful sync and there have been errors when last_sync_completed_at is null and last_error_at is not null then 'broken' @@ -130,7 +148,7 @@ connector_health as ( -- Joining with log to grab pertinent error/warning messagees connector_recent_logs as ( - select + select connector_health.connector_id, connector_health.connector_name, connector_health.connector_type, @@ -144,20 +162,20 @@ connector_recent_logs as ( connector_log.event_type, connector_log.message_data - from connector_health - left join connector_log + from connector_health + left join connector_log on connector_log.connector_id = connector_health.connector_id -- limiting relevance to since the last successful sync completion (if there has been one) - and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01') + and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01') -- only looking at errors and warnings (excluding syncs - both normal and priority first) - and connector_log.event_type != 'INFO' + and connector_log.event_type != 'INFO' -- need to explicitly avoid priority first statuses because they are of event_type WARNING - and not (connector_log.event_subtype = 'status' + and not (connector_log.event_subtype = 'status' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED' and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%') {{ dbt_utils.group_by(n=12) }} -- de-duping error messages - + ), @@ -175,18 +193,18 @@ final as ( connector_recent_logs.last_sync_completed_at, connector_recent_logs.set_up_at, coalesce(schema_changes.number_of_schema_changes_last_month, 0) as number_of_schema_changes_last_month - + {% if var('fivetran_log_using_sync_alert_messages', true) %} , {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'SEVERE' then connector_recent_logs.message_data else null end", "'\\n'") }} as errors_since_last_completed_sync , {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'WARNING' then connector_recent_logs.message_data else null end", "'\\n'") }} as warnings_since_last_completed_sync {% endif %} from connector_recent_logs - left join schema_changes - on connector_recent_logs.connector_id = schema_changes.connector_id + left join schema_changes + on connector_recent_logs.connector_id = schema_changes.connector_id join destination on destination.destination_id = connector_recent_logs.destination_id {{ dbt_utils.group_by(n=11) }} ) -select * from final \ No newline at end of file +select * from final