-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(sinan DAGS): create DAG to fetch dengue data from SINAN #201
Open
luabida
wants to merge
12
commits into
thegraphnetwork:main
Choose a base branch
from
luabida:create-sinan-dags
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+566
−13
Open
Changes from 6 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
c0cee37
chore(sinan DAGS): create DAG to fetch dengue data from SINAN
luabida e940b7a
Include EGH_CONN var to image
luabida 97fb367
fix sql statements
luabida 73f2fec
finish SINAN_DENG DAG
luabida 08e498b
Use parquets chunks, preventing the RAM to get fulfilled
luabida caebfb6
Handle UndefinedColumn error & add column
luabida f743f5c
Use recursion to handle to_sql
luabida 8ffb904
Add columns to table before inserting the dataframe
luabida 7d276b5
Parse all columns to TEXT before inserting to db
luabida d7435c2
minor fixes
luabida afc99a1
Include SINAN_ZIKA DAG
luabida c76c431
Include SINAN_CHIK DAG
luabida File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import pendulum | ||
|
||
from datetime import timedelta | ||
from airflow import DAG | ||
from airflow.decorators import task | ||
from airflow.models import Variable | ||
|
||
|
||
default_args = { | ||
"owner": "epigraphhub", | ||
"depends_on_past": False, | ||
"start_date": pendulum.datetime(2023, 1, 1), | ||
"email": ["[email protected]"], | ||
"email_on_failure": True, | ||
"email_on_retry": False, | ||
"retries": 2, | ||
"retry_delay": timedelta(minutes=1), | ||
} | ||
|
||
with DAG( | ||
dag_id='SINAN_DENG', | ||
tags=['SINAN', 'Brasil', 'Dengue'], | ||
schedule='@monthly', | ||
default_args=default_args, | ||
catchup=False, | ||
) as dag: | ||
|
||
CONN = Variable.get('egh_conn', deserialize_json=True) | ||
|
||
@task.external_python( | ||
task_id='update_dengue', | ||
python='/opt/py311/bin/python3.11' | ||
) | ||
def update_dengue(egh_conn: dict): | ||
""" | ||
This task will run in an isolated python environment, containing PySUS | ||
package. The task will fetch for all | ||
""" | ||
import os | ||
import logging | ||
import pandas as pd | ||
|
||
from sqlalchemy import create_engine, text | ||
from sqlalchemy.exc import ProgrammingError | ||
from pysus.online_data import parquets_to_dataframe | ||
from pysus.ftp.databases.sinan import SINAN | ||
|
||
sinan = SINAN().load() | ||
dis_code = "DENG" | ||
tablename = "sinan_dengue_m" | ||
files = sinan.get_files(dis_code=dis_code) | ||
|
||
|
||
def insert_parquets(parquet_dir: str, year: int): | ||
""" | ||
Insert parquet dir into database using its chunks. Delete the chunk | ||
and the directory after insertion. | ||
""" | ||
for parquet in os.listdir(parquet_dir): | ||
file = os.path.join(parquet_dir, parquet) | ||
df = pd.read_parquet(str(file), engine='fastparquet') | ||
df.columns = df.columns.str.lower() | ||
df['year'] = year | ||
df['prelim'] = False | ||
df.to_sql( | ||
name=tablename, | ||
con=create_engine(egh_conn['URI']), | ||
schema="brasil", | ||
if_exists='append', | ||
index=False | ||
) | ||
del df | ||
os.remove(file) | ||
logging.debug(f"{file} inserted into db") | ||
os.rmdir(parquets.path) | ||
|
||
|
||
f_stage = {} | ||
for file in files: | ||
code, year = sinan.format(file) | ||
stage = 'prelim' if 'PRELIM' in file.path else 'final' | ||
|
||
if not stage in f_stage: | ||
f_stage[stage] = [year] | ||
else: | ||
f_stage[stage].append(year) | ||
|
||
for year in f_stage['final']: | ||
# Check if final is already in DB | ||
with create_engine(egh_conn['URI']).connect() as conn: | ||
cur = conn.execute(text( | ||
f'SELECT COUNT(*) FROM brasil.{tablename}' | ||
f" WHERE year = '{year}' AND prelim = False" | ||
)) | ||
count = cur.fetchone()[0] | ||
|
||
logging.info(f"Final year {year}: {count}") | ||
|
||
if not count: | ||
# Check on prelims | ||
with create_engine(egh_conn['URI']).connect() as conn: | ||
cur = conn.execute(text( | ||
f'SELECT COUNT(*) FROM brasil.{tablename}' | ||
f" WHERE year = '{year}' AND prelim = True" | ||
)) | ||
count = cur.fetchone()[0] | ||
|
||
if count: | ||
# Update prelim to final | ||
cur = conn.execute(text( | ||
f'DELETE FROM brasil.{tablename}' | ||
f" WHERE year = '{year}' AND prelim = True" | ||
)) | ||
|
||
parquets = sinan.download(sinan.get_files(dis_code, year)) | ||
|
||
try: | ||
insert_parquets(parquets.path, year) | ||
except ProgrammingError as error: | ||
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): | ||
# Include new columns to table | ||
column_name = str(error).split('"')[1] | ||
with create_engine(egh_conn['URI']).connect() as conn: | ||
conn.execute(text( | ||
f'ALTER TABLE brasil.{tablename}' | ||
f' ADD COLUMN {column_name} TEXT' | ||
)) | ||
conn.commit() | ||
logging.warning(f"Column {column_name} added into {tablename}") | ||
insert_parquets(parquets.path, year) | ||
|
||
os.rmdir(parquets.path) | ||
|
||
for year in f_stage['prelim']: | ||
with create_engine(egh_conn['URI']).connect() as conn: | ||
# Update prelim | ||
cur = conn.execute(text( | ||
f'DELETE FROM brasil.{tablename}' | ||
f" WHERE year = '{year}' AND prelim = True" | ||
)) | ||
|
||
parquets = sinan.download(sinan.get_files(dis_code, year)) | ||
|
||
try: | ||
insert_parquets(parquets.path, year) | ||
except ProgrammingError as error: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above |
||
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"): | ||
# Include new columns to table | ||
column_name = str(error).split('"')[1] | ||
with create_engine(egh_conn['URI']).connect() as conn: | ||
conn.execute(text( | ||
f'ALTER TABLE brasil.{tablename}' | ||
f' ADD COLUMN {column_name} TEXT' | ||
)) | ||
conn.commit() | ||
logging.warning(f"Column {column_name} added into {tablename}") | ||
insert_parquets(parquets.path, year) | ||
|
||
os.rmdir(parquets.path) | ||
|
||
update_dengue(CONN) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
AIRFLOW_PROJ_DIR=${AIRFLOW_PROJ_DIR} | ||
AIRFLOW_UID=${AIRFLOW_UID} | ||
AIRFLOW_PORT=${AIRFLOW_PORT} | ||
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME} | ||
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD} | ||
|
||
AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY} | ||
|
||
AIRFLOW__SMTP__SMTP_HOST=${AIRFLOW__SMTP__SMTP_HOST} | ||
AIRFLOW__SMTP__SMTP_USER=${AIRFLOW__SMTP__SMTP_USER} | ||
AIRFLOW__SMTP__SMTP_PASSWORD=${AIRFLOW__SMTP__SMTP_PASSWORD} | ||
AIRFLOW__SMTP__SMTP_PORT=${AIRFLOW__SMTP__SMTP_PORT:-587} | ||
AIRFLOW__SMTP__SMTP_MAIL_FROM=${AIRFLOW__SMTP__SMTP_MAIL_FROM} | ||
|
||
POSTGRES_EPIGRAPH_DB=${POSTGRES_EPIGRAPH_DB} | ||
POSTGRES_EPIGRAPH_HOST=${POSTGRES_EPIGRAPH_HOST} | ||
POSTGRES_EPIGRAPH_PORT=${POSTGRES_EPIGRAPH_PORT} | ||
POSTGRES_EPIGRAPH_USER=${POSTGRES_EPIGRAPH_USER} | ||
POSTGRES_EPIGRAPH_PASSWORD=${POSTGRES_EPIGRAPH_PASSWORD} | ||
AIRFLOW_VAR_EGH_CONN='{"URI":"postgresql://${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
pysus >= 0.10.2 | ||
SQLAlchemy >= 2.0.21 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that obtaining the missing column name from the error message is not a good approach, because if psycopg2 changes the wording in their error messages it will break our code. I think we should instead look at the list of column names of the parquet files and compare them with the columns in the current schema. From the difference in these lists, which can be efficiently obtained as
list(set(cols1)-set(cols2))
, we can then create the alter table query adding the new columns to the database table. With this approach, we don't even need to rely on an exception being raised. This determination of the missing columns can be done before the first insert.