-
Notifications
You must be signed in to change notification settings - Fork 5
/
sf_clone
executable file
·157 lines (145 loc) · 8.44 KB
/
sf_clone
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
from cmdlineparse import CmdlineParseClone
from sfconfig import SfConfig
from sfconn import SfConn
from sflogger import SfLogger
from snowflake.connector.errors import ProgrammingError
sf_cfg = SfConfig('config.json')
cmdline = CmdlineParseClone()
logger = SfLogger(cmdline.args.log_level, __file__)
sf_conn = SfConn(sf_cfg.config, logger)
# sf_clone - clone all regular tables (non-external and non-dynamic and non-cloned tables)
# from one schema to another existing schema
# init - Create cloned tables in target schema from source schema
# refresh - Refresh cloned tables in target schema from source schema incrementally
# remove - Remove all cloned tables in target schema from source schema
type = cmdline.args.type
dryrun = cmdline.args.dryrun
owner_role = cmdline.owner_role
to_db_nm,to_sc_nm = cmdline.to_db_nm,cmdline.to_sc_nm
to_db_sc = f"{to_db_nm}.{to_sc_nm}"
from_db_nm,from_sc_nm = cmdline.from_db_nm,cmdline.from_sc_nm
from_db_sc = f"{from_db_nm}.{from_sc_nm}"
if type == 'init':
# ./sf_clone init
# --owner_role <_FR>
# --clone_role <_FR>
# --from_sc FROM_DB.SC
# --to_sc TO_DB.SC
# [--dryrun]
# [--delete_existing]
logger.info(f"Initializing clones of tables in {to_db_sc} from {from_db_sc}")
delete_existing = cmdline.args.delete_existing
clone_role = cmdline.clone_role
if sf_conn.validate_role(clone_role, from_db_nm, from_sc_nm) == False or sf_conn.validate_role(clone_role, to_db_nm, to_sc_nm) == False or sf_conn.validate_role(owner_role, to_db_nm, to_sc_nm) == False:
logger.error(f"Role(s) {clone_role},{owner_role} does not have access in {from_db_sc},{to_db_sc}")
exit(1)
logger.debug(f"Using owner_role {owner_role} and clone_role {clone_role}")
# get a list of tables in source schema that are not themselves clones
logger.debug(f"Retrieving from tables in {from_db_sc}")
from_tables = sf_conn.get_tables(clone_role, from_db_nm, from_sc_nm)
tables_to_clone = []
tables_seen = {}
for from_table in from_tables:
if from_table[6] is True or (from_table[0] is None and from_table[1] is None):
logger.warning(f"Skipping {from_table[2]}.{from_table[3]}.{from_table[4]} because it is a cloned table or does not have data available account_usage.table_storage_metrics")
continue
tables_to_clone.append(from_table)
tables_seen[from_table[4]] = True
logger.debug(f"Found {len(tables_to_clone)} tables to clone")
# get a list of tables in target schema if --delete_existing is not specified - the tables will need to be
# but we are checking here to see if the table exists and if --delete_existing is false then we will fail
if delete_existing is False:
logger.debug(f"Retrieving to tables in {to_db_sc}")
to_tables = sf_conn.get_tables(clone_role, to_db_nm, to_sc_nm)
for to_table in to_tables:
if to_table[4] in tables_seen:
logger.error(f"Table {to_table[4]} exists in {to_db_sc}, but --delete_existing is not specified")
exit(-1)
else:
logger.debug(f"Overwriting any existing tables from {from_db_sc} in {to_db_sc}")
# if delete_existing the use create or replace table
for table in tables_to_clone:
logger.debug(f"Cloning {table[2]}.{table[3]}.{table[4]} to {to_db_sc}")
if delete_existing is True:
logger.debug(f"CREATE OR REPLACE TABLE {to_db_sc}.{table[4]} CLONE {from_db_sc}.{table[4]}")
if dryrun is False:
sf_conn.run_query(f"CREATE OR REPLACE TABLE {to_db_sc}.{table[4]} CLONE {from_db_sc}.{table[4]}")
else:
logger.debug(f"CREATE TABLE {to_db_sc}.{table[4]} CLONE {from_db_sc}.{table[4]}")
if dryrun is False:
sf_conn.run_query(f"CREATE TABLE {to_db_sc}.{table[4]} CLONE {from_db_sc}.{table[4]}")
logger.debug(f"GRANT OWNERSHIP ON TABLE {to_db_sc}.{table[4]} TO ROLE {owner_role} COPY CURRENT GRANTS")
if dryrun is True:
sf_conn.run_query(f"GRANT OWNERSHIP ON TABLE {to_db_sc}.{table[4]} TO ROLE {owner_role} COPY CURRENT GRANTS")
logger.info("Done")
elif type == 'refresh':
# ./sf_clone refresh
# --owner_role <_FR>
# --clone_role <_FR>
# --from_sc FROM_DB.SC
# --to_sc TO_DB.SC
# [--dryrun]
logger.info(f"Incrementally refreshing clones of tables in {to_db_sc} from {from_db_sc}")
clone_role = cmdline.clone_role
if sf_conn.validate_role(clone_role, from_db_nm, from_sc_nm) == False or sf_conn.validate_role(clone_role, to_db_nm, to_sc_nm) == False or sf_conn.validate_role(owner_role, to_db_nm, to_sc_nm) == False:
logger.error(f"Role(s) {clone_role},{owner_role} does not have access in {from_db_sc},{to_db_sc}")
exit(1)
logger.debug(f"Using owner_role {owner_role} and clone_role {clone_role}")
# get a list of tables in source schema that are not themselves clones
logger.debug(f"Retrieving table clones that need to be refreshed from {from_db_sc} pointing {to_db_sc}")
clone_tables = sf_conn.get_clone_tables(clone_role, from_db_nm, from_sc_nm, to_db_nm, to_sc_nm)
logger.debug(f"Found {len(clone_tables)} tables to refresh clone")
for clone in clone_tables:
clone_db_sc, src_db_sc, tbl_nm, clone_active_bytes, src_retained_for_clone_bytes, src_deleted, row_count_diff, bytes_diff, dml_since_clone = clone
logger.debug(f"Validating if {tbl_nm} needs to be updated from {src_db_sc} to {clone_db_sc}")
if clone_active_bytes == False and src_retained_for_clone_bytes == False and src_deleted == False and row_count_diff == False and bytes_diff == False and dml_since_clone == False:
logger.debug(f"Skipping {tbl_nm} because no change to source table detected")
continue
else:
error_msg = ''
if clone_active_bytes == True:
error_msg += '"clone table has active_bytes" '
if src_retained_for_clone_bytes == True:
error_msg += '"src table has retained_for_clone_bytes" '
if src_deleted == True:
error_msg += '"src table deleted" '
if row_count_diff == True:
error_msg += '"clone and src row_count_diff" '
if bytes_diff == True:
error_msg += '"clone and src bytes_diff" '
if dml_since_clone == True:
error_msg += '"src has had dml_since_clone" '
logger.info(f"Refreshing {tbl_nm} from {src_db_sc} because {error_msg}")
logger.debug(f"CREATE OR REPLACE TABLE {clone_db_sc}.{tbl_nm} CLONE {src_db_sc}.{tbl_nm}")
if dryrun is False:
sf_conn.run_query(f"CREATE OR REPLACE TABLE {clone_db_sc}.{tbl_nm} CLONE {src_db_sc}.{tbl_nm}")
logger.debug(f"GRANT OWNERSHIP ON TABLE {clone_db_sc}.{tbl_nm} TO ROLE {owner_role} COPY CURRENT GRANTS")
if dryrun is False:
sf_conn.run_query(f"GRANT OWNERSHIP ON TABLE {clone_db_sc}.{tbl_nm} TO ROLE {owner_role} COPY CURRENT GRANTS")
logger.info("Done")
elif type == 'remove':
# ./sf_clone remove
# --owner_role <_FR>
# --from_sc FROM_DB.SC
# --to_sc TO_DB.SC
# [--dryrun]
logger.debug(f"Using owner_role {owner_role} to remove all cloned objects from {to_db_nm}.{to_sc_nm} with a source of {from_db_nm}.{from_sc_nm}")
logger.info(f"Removing clones of tables in {to_db_sc} from {from_db_sc}")
if sf_conn.validate_role(owner_role, to_db_nm, to_sc_nm) == False:
logger.error(f"Role(s) {owner_role} does not have access in {to_db_sc}")
exit(1)
# Get a list of clones to remove
logger.debug(f"Retrieving table clones that need to be removed from {from_db_sc} pointing to {to_db_sc}")
clone_tables = sf_conn.get_clone_tables(owner_role, from_db_nm, from_sc_nm, to_db_nm, to_sc_nm)
# fix get_clone_tables so it returns all the clones and then have a
# function to remove the non-changed clones
logger.debug(f"Found {len(clone_tables)} cloned tables to remove clone")
sf_conn.run_query(f"USE ROLE {owner_role}")
for clone in clone_tables:
clone_db_sc, src_db_sc, tbl_nm, _ = clone
logger.info(f"Removing cloned table {clone_db_sc}.{tbl_nm}")
logger.debug(f"DROP TABLE {clone_db_sc}.{tbl_nm}")
if dryrun is False:
sf_conn.run_query(f"DROP TABLE {clone_db_sc}.{tbl_nm}")
logger.info("Done")