Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Change Relation Type error between Distributed and non-distributed Materializations #206

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
if(engine not in ('MaterializedView', 'View'), 'table', 'view') as type,
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
count(distinct _shard_num) > 1
or (
select count() as cluster_cnt from system.clusters where cluster = '{{ adapter.get_clickhouse_cluster_name().strip("\"") }}'
) == 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
Expand Down Expand Up @@ -126,3 +129,19 @@
EXCHANGE {{ obj_types }} {{ old_relation }} AND {{ target_relation }} {{ on_cluster_clause(target_relation)}}
{% endcall %}
{% endmacro %}

{% macro validate_relation_existence(relation, type='table', on_cluster=False) -%}
{%- if relation is not none -%}
{%- if relation.can_on_cluster != on_cluster -%}
{%- if should_full_refresh() or relation.type != type -%}
{{ log('Dropping relation ' + relation.name )}}
{{ drop_relation_if_exists(relation) }}
{% do return(False) %}
{%- else -%}
{% do exceptions.raise_compiler_error('Incompatible relation status. Relation ' ~ relation ~ ' already exists but not in the correct mode, on cluster should be ' ~ on_cluster ~ '. Please do a full-refresh on the model ' ~ this ) %}
{%- endif -%}
{%- endif -%}
{% do return(True) %}
{%- endif -%}
{% do return(False) %}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}

{% set on_cluster = on_cluster_clause(target_relation) %}
{% if on_cluster.strip() == '' %}
{%- set on_cluster = on_cluster_clause(target_relation) -%}
{%- if on_cluster.strip() == '' -%}
{% do exceptions.raise_compiler_error('To use distributed materialization cluster setting in dbt profile must be set') %}
{% endif %}
{%- endif -%}

-- check if existing relation is valid
{%- set is_existing_valid = validate_relation_existence(existing_relation, on_cluster=True) -%}
{%- if is_existing_valid -%}
{%- set existing_relation_local = load_cached_relation(this.incorporate(path={"identifier": this.identifier + local_suffix})) -%}
{%- if not validate_relation_existence(existing_relation_local, on_cluster=True) -%}
{%- set existing_relation_local = none -%}
{%- endif -%}
{%- else -%}
{%- set existing_relation_local = none -%}
{%- endif -%}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %}
{%- set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none -%}

{%- set backup_relation = none -%}
{%- set preexisting_backup_relation = none -%}
Expand Down Expand Up @@ -48,7 +58,7 @@

{% if backup_relation is none %}
{{ create_distributed_local_table(target_relation, target_relation_local, view_relation) }}
{% elif existing_relation.can_exchange %}
{% elif existing_relation.can_exchange and existing_relation_local.can_on_cluster == backup_relation.can_on_cluster %}
-- We can do an atomic exchange, so no need for an intermediate
{% call statement('main') -%}
{{ create_empty_table_from_relation(backup_relation, view_relation) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@
{% do exceptions.raise_compiler_error('To use distributed materializations cluster setting in dbt profile must be set') %}
{% endif %}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if existing_relation is not none else none %}
-- check if existing relation is valid
{%- set is_existing_valid = validate_relation_existence(existing_relation, on_cluster=True) -%}
{%- if is_existing_valid -%}
{%- set existing_relation_local = load_cached_relation(this.incorporate(path={"identifier": this.identifier + local_suffix})) -%}
{%- if not validate_relation_existence(existing_relation_local, on_cluster=True) -%}
{%- set existing_relation_local = none -%}
{%- endif -%}
{%- else -%}
{%- set existing_relation_local = none -%}
{%- endif -%}

{% set target_relation_local = target_relation.incorporate(path={"identifier": this.identifier + local_suffix}) if target_relation is not none else none %}

{%- set unique_key = config.get('unique_key') -%}
Expand Down Expand Up @@ -52,7 +62,7 @@
{{ create_view_as(view_relation, sql) }}
{% endcall %}

{% if existing_relation is none %}
{% if existing_relation_local is none %}
-- No existing table, simply create a new one
{{ create_distributed_local_table(target_relation, target_relation_local, view_relation, sql) }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}

{%- set is_existing_valid = validate_relation_existence(existing_relation, on_cluster=False) -%}
{%- set unique_key = config.get('unique_key') -%}
{% if unique_key is not none and unique_key|length == 0 %}
{% set unique_key = none %}
Expand All @@ -28,7 +29,7 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% set to_drop = [] %}

{% if existing_relation is none %}
{% if not is_existing_valid %}
-- No existing table, simply create a new one
{% call statement('main') %}
{{ get_create_table_as_sql(False, target_relation, sql) }}
Expand Down
9 changes: 5 additions & 4 deletions dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
{%- set preexisting_backup_relation = none -%}
{%- set preexisting_intermediate_relation = none -%}

{% if existing_relation is not none %}
{% set is_existing_valid = validate_relation_existence(existing_relation, on_cluster=False) %}
{% if is_existing_valid %}
{%- set backup_relation_type = existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{% if not existing_relation.can_exchange %}
{% if not existing_relation.can_exchange or existing_relation.can_on_cluster != backup_relation.can_on_cluster %}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
Expand All @@ -27,13 +28,13 @@
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if backup_relation is none %}
{% if not is_existing_valid %}
{{ log('Creating new relation ' + target_relation.name )}}
-- There is not existing relation, so we can just create
{% call statement('main') -%}
{{ get_create_table_as_sql(False, target_relation, sql) }}
{%- endcall %}
{% elif existing_relation.can_exchange %}
{% elif existing_relation.can_exchange and existing_relation.can_on_cluster == backup_relation.can_on_cluster %}
-- We can do an atomic exchange, so no need for an intermediate
{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/clickhouse/macros/materializations/view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{%- set backup_relation_type = existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{% if not existing_relation.can_exchange %}
{% if not existing_relation.can_exchange or existing_relation.can_on_cluster != target_relation.can_on_cluster %}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
Expand All @@ -33,7 +33,7 @@
{% call statement('main') -%}
{{ get_create_view_as_sql(target_relation, sql) }}
{%- endcall %}
{% elif existing_relation.can_exchange %}
{% elif existing_relation.can_exchange and existing_relation.can_on_cluster == backup_relation.can_on_cluster %}
-- We can do an atomic exchange, so no need for an intermediate
{% call statement('main') -%}
{{ get_create_view_as_sql(backup_relation, sql) }}
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/adapter/test_changing_relation_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,41 @@
import os
from typing import List, Optional

import pytest
from dbt.tests.adapter.relations.test_changing_relation_type import BaseChangeRelationTypeValidator
from dbt.tests.util import run_dbt


class TestChangeRelationTypes(BaseChangeRelationTypeValidator):
pass


class TestChangeRelationTypesWithDistributedMaterializations(BaseChangeRelationTypeValidator):

# changing relation from distributed to non-distrubted should raise compilation error
# unless with a full-refresh flag
def _run_and_check_materialization_error(
self, materialization, extra_args: Optional[List] = None
):
run_args = ["run", '--vars', f'materialized: {materialization}']
if extra_args:
run_args.extend(extra_args)
results = run_dbt(run_args, expect_pass=False)
assert results[0].status == "error"
assert "Incompatible relation status" in results[0].message

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_changing_materialization_changes_relation_type(self, project):
self._run_and_check_materialization('view')
self._run_and_check_materialization('distributed_table')
self._run_and_check_materialization('distributed_incremental')
self._run_and_check_materialization_error('table')
self._run_and_check_materialization('table', extra_args=['--full-refresh'])
self._run_and_check_materialization(
'distributed_incremental', extra_args=['--full-refresh']
)
self._run_and_check_materialization_error('incremental')
self._run_and_check_materialization('incremental', extra_args=['--full-refresh'])
self._run_and_check_materialization('distributed_table', extra_args=['--full-refresh'])