Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove partition over by to fix memory limit issue #62

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# dbt_fivetran_log v0.6.3
## Fixes
- Changed partitioning logic in the connector_status model in order to avoid "allocated memory limit". ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62))
## Contributors
- [@fivetran-juliengoulley](https://github.com/fivetran-juliengoulley) ([#62](https://github.com/fivetran/dbt_fivetran_log/pull/62))


# dbt_fivetran_log v0.6.2
## Fixes
- Extend model disablement with `config: is_enabled` setting in sources to avoid running source freshness when a model is disabled. ([#58](https://github.com/fivetran/dbt_fivetran_log/pull/58))
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
config-version: 2

name: 'fivetran_log'
version: '0.6.2'
version: '0.6.3'

require-dbt-version: [">=1.0.0", "<2.0.0"]

Expand Down
80 changes: 49 additions & 31 deletions models/fivetran_log__connector_status.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
{{
config(
materialized='incremental',
unique_key = 'CONNECTOR_ID'
)
}}

with transformation_removal as (

select *
from {{ ref('stg_fivetran_log__log') }}
where transformation_id is null
select ROW_NUMBER() OVER(order by sfll.connector_id, sfll.created_at) as sync_batch_id, sfll.*
from {{ ref('stg_fivetran_log__log') }} sfll
{% if is_incremental() %}
, {{this}} flcs
where sfll.created_at >= Case When flcs.last_sync_started_at <= flcs.last_successful_sync_completed_at Then flcs.last_sync_started_at
When flcs.last_successful_sync_completed_at <= flcs.last_sync_started_at Then flcs.last_successful_sync_completed_at
Else flcs.set_up_at
End
and sfll.connector_id = flcs.connector_id
and
{% else %}
where
{% endif %}
sfll.transformation_id is null

),

connector_log as (
select
*,
sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id
order by created_at rows unbounded preceding) as sync_batch_id
select
*
from transformation_removal
-- only looking at errors, warnings, and syncs here
where event_type = 'SEVERE'
or event_type = 'WARNING'
or event_subtype like 'sync%'
or (event_subtype = 'status'
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED'

and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%'
) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure
or (event_subtype = 'status'
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL'
)
-- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected."
Expand All @@ -35,7 +51,7 @@ schema_changes as (

from {{ ref('stg_fivetran_log__log') }}

where
where
{{ dbt_utils.datediff('created_at', dbt_utils.current_timestamp(), 'day') }} <= 30
and event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config')

Expand All @@ -52,7 +68,7 @@ connector as (

destination as (

select *
select *
from {{ ref('stg_fivetran_log__destination') }}
),

Expand All @@ -67,30 +83,32 @@ connector_metrics as (
connector.set_up_at,
max(case when connector_log.event_subtype = 'sync_start' then connector_log.created_at else null end) as last_sync_started_at,

max(case when connector_log.event_subtype = 'sync_end'
max(case when connector_log.event_subtype = 'sync_end'
then connector_log.created_at else null end) as last_sync_completed_at,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL'
then connector_log.created_at else null end) as last_successful_sync_completed_at,

max(case when connector_log.event_subtype = 'sync_start'
then connector_log.sync_batch_id else null end) as last_sync_start_id,

max(case when connector_log.event_subtype = 'sync_end'
then connector_log.sync_batch_id else null end) as last_sync_batch_id,
max(case when connector_log.event_subtype = 'sync_end'
then connector_log.sync_batch_id else null end) as last_sync_end_id,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%'
then connector_log.created_at else null end) as last_priority_first_sync_completed_at,


max(case when connector_log.event_type = 'SEVERE' then connector_log.created_at else null end) as last_error_at,

max(case when connector_log.event_type = 'SEVERE' then connector_log.sync_batch_id else null end) as last_error_batch,
max(case when event_type = 'WARNING' then connector_log.created_at else null end) as last_warning_at

from connector
left join connector_log
from connector
left join connector_log
on connector_log.connector_id = connector.connector_id
{{ dbt_utils.group_by(n=6) }}

Expand All @@ -100,7 +118,7 @@ connector_health as (

select
*,
case
case
-- connector is paused
when is_paused then 'paused'

Expand All @@ -117,7 +135,7 @@ connector_health as (
when last_sync_completed_at is null and last_error_at is null then 'initial sync in progress'

-- the last attempted sync had an error
when last_sync_batch_id = last_error_batch then 'broken'
when last_sync_start_id <= last_error_batch and last_error_batch <= last_sync_end_id then 'broken'

-- there's never been a successful sync and there have been errors
when last_sync_completed_at is null and last_error_at is not null then 'broken'
Expand All @@ -130,7 +148,7 @@ connector_health as (
-- Joining with log to grab pertinent error/warning messagees
connector_recent_logs as (

select
select
connector_health.connector_id,
connector_health.connector_name,
connector_health.connector_type,
Expand All @@ -144,20 +162,20 @@ connector_recent_logs as (
connector_log.event_type,
connector_log.message_data

from connector_health
left join connector_log
from connector_health
left join connector_log
on connector_log.connector_id = connector_health.connector_id
-- limiting relevance to since the last successful sync completion (if there has been one)
and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01')
and connector_log.created_at > coalesce(connector_health.last_sync_completed_at, connector_health.last_priority_first_sync_completed_at, '2000-01-01')
-- only looking at errors and warnings (excluding syncs - both normal and priority first)
and connector_log.event_type != 'INFO'
and connector_log.event_type != 'INFO'
-- need to explicitly avoid priority first statuses because they are of event_type WARNING
and not (connector_log.event_subtype = 'status'
and not (connector_log.event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%')

{{ dbt_utils.group_by(n=12) }} -- de-duping error messages


),

Expand All @@ -175,18 +193,18 @@ final as (
connector_recent_logs.last_sync_completed_at,
connector_recent_logs.set_up_at,
coalesce(schema_changes.number_of_schema_changes_last_month, 0) as number_of_schema_changes_last_month

{% if var('fivetran_log_using_sync_alert_messages', true) %}
, {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'SEVERE' then connector_recent_logs.message_data else null end", "'\\n'") }} as errors_since_last_completed_sync
, {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'WARNING' then connector_recent_logs.message_data else null end", "'\\n'") }} as warnings_since_last_completed_sync
{% endif %}

from connector_recent_logs
left join schema_changes
on connector_recent_logs.connector_id = schema_changes.connector_id
left join schema_changes
on connector_recent_logs.connector_id = schema_changes.connector_id

join destination on destination.destination_id = connector_recent_logs.destination_id
{{ dbt_utils.group_by(n=11) }}
)

select * from final
select * from final