Skip to content

Commit

Permalink
[r] Fix: Partition sizing ignores supplementary bundles (#5207)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed May 18, 2023
1 parent 4f3d562 commit b1c28fe
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 22 deletions.
20 changes: 10 additions & 10 deletions deployments/anvilbox/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ def mkdict(previous_catalog: dict[str, str],


anvil_sources = mkdict({}, 11, mkdelta([
mksrc('datarepo-1ba591a6', 'ANVIL_1000G_high_coverage_2019_20221019_ANV5_202303081523', 6404),
mksrc('datarepo-aa67671a', 'ANVIL_CMG_UWASH_DS_BAV_IRB_PUB_RD_20221020_ANV5_202303081451', 177),
mksrc('datarepo-1ba591a6', 'ANVIL_1000G_high_coverage_2019_20221019_ANV5_202303081523', 6804),
mksrc('datarepo-aa67671a', 'ANVIL_CMG_UWASH_DS_BAV_IRB_PUB_RD_20221020_ANV5_202303081451', 181),
mksrc('datarepo-b4e0bfd5', 'ANVIL_CMG_UWASH_DS_BDIS_20221020_ANV5_202303081501', 10),
mksrc('datarepo-333dd883', 'ANVIL_CMG_UWASH_DS_HFA_20221020_ANV5_202303081456', 83),
mksrc('datarepo-b968cbdb', 'ANVIL_CMG_UWASH_DS_NBIA_20221020_ANV5_202303081459', 107),
mksrc('datarepo-6a5b13ea', 'ANVIL_CMG_UWASH_HMB_20221020_ANV5_202303081455', 419),
mksrc('datarepo-3d4c42f7', 'ANVIL_CMG_UWASH_HMB_IRB_20221020_ANV5_202303081454', 41),
mksrc('datarepo-080b2c9e', 'ANVIL_CMG_UWash_DS_EP_20221020_ANV5_202303081452', 49),
mksrc('datarepo-4f75c9e3', 'ANVIL_CMG_UWash_GRU_20230308_ANV5_202303081731', 2113),
mksrc('datarepo-ec9365be', 'ANVIL_CMG_UWash_GRU_IRB_20221020_ANV5_202303081458', 559),
mksrc('datarepo-8392ac2c', 'ANVIL_GTEx_V8_hg38_20221013_ANV5_202303081502', 18361),
mksrc('datarepo-333dd883', 'ANVIL_CMG_UWASH_DS_HFA_20221020_ANV5_202303081456', 198),
mksrc('datarepo-b968cbdb', 'ANVIL_CMG_UWASH_DS_NBIA_20221020_ANV5_202303081459', 110),
mksrc('datarepo-6a5b13ea', 'ANVIL_CMG_UWASH_HMB_20221020_ANV5_202303081455', 423),
mksrc('datarepo-3d4c42f7', 'ANVIL_CMG_UWASH_HMB_IRB_20221020_ANV5_202303081454', 45),
mksrc('datarepo-080b2c9e', 'ANVIL_CMG_UWash_DS_EP_20221020_ANV5_202303081452', 53),
mksrc('datarepo-4f75c9e3', 'ANVIL_CMG_UWash_GRU_20230308_ANV5_202303081731', 5861),
mksrc('datarepo-ec9365be', 'ANVIL_CMG_UWash_GRU_IRB_20221020_ANV5_202303081458', 563),
mksrc('datarepo-8392ac2c', 'ANVIL_GTEx_V8_hg38_20221013_ANV5_202303081502', 101205)
]))


Expand Down
1 change: 1 addition & 0 deletions deployments/anvilbox/sources.json
20 changes: 10 additions & 10 deletions deployments/anvildev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ def mkdict(previous_catalog: dict[str, str],


anvil_sources = mkdict({}, 11, mkdelta([
mksrc('datarepo-1ba591a6', 'ANVIL_1000G_high_coverage_2019_20221019_ANV5_202303081523', 6404),
mksrc('datarepo-aa67671a', 'ANVIL_CMG_UWASH_DS_BAV_IRB_PUB_RD_20221020_ANV5_202303081451', 177),
mksrc('datarepo-1ba591a6', 'ANVIL_1000G_high_coverage_2019_20221019_ANV5_202303081523', 6804),
mksrc('datarepo-aa67671a', 'ANVIL_CMG_UWASH_DS_BAV_IRB_PUB_RD_20221020_ANV5_202303081451', 181),
mksrc('datarepo-b4e0bfd5', 'ANVIL_CMG_UWASH_DS_BDIS_20221020_ANV5_202303081501', 10),
mksrc('datarepo-333dd883', 'ANVIL_CMG_UWASH_DS_HFA_20221020_ANV5_202303081456', 83),
mksrc('datarepo-b968cbdb', 'ANVIL_CMG_UWASH_DS_NBIA_20221020_ANV5_202303081459', 107),
mksrc('datarepo-6a5b13ea', 'ANVIL_CMG_UWASH_HMB_20221020_ANV5_202303081455', 419),
mksrc('datarepo-3d4c42f7', 'ANVIL_CMG_UWASH_HMB_IRB_20221020_ANV5_202303081454', 41),
mksrc('datarepo-080b2c9e', 'ANVIL_CMG_UWash_DS_EP_20221020_ANV5_202303081452', 49),
mksrc('datarepo-4f75c9e3', 'ANVIL_CMG_UWash_GRU_20230308_ANV5_202303081731', 2113),
mksrc('datarepo-ec9365be', 'ANVIL_CMG_UWash_GRU_IRB_20221020_ANV5_202303081458', 559),
mksrc('datarepo-8392ac2c', 'ANVIL_GTEx_V8_hg38_20221013_ANV5_202303081502', 18361),
mksrc('datarepo-333dd883', 'ANVIL_CMG_UWASH_DS_HFA_20221020_ANV5_202303081456', 198),
mksrc('datarepo-b968cbdb', 'ANVIL_CMG_UWASH_DS_NBIA_20221020_ANV5_202303081459', 110),
mksrc('datarepo-6a5b13ea', 'ANVIL_CMG_UWASH_HMB_20221020_ANV5_202303081455', 423),
mksrc('datarepo-3d4c42f7', 'ANVIL_CMG_UWASH_HMB_IRB_20221020_ANV5_202303081454', 45),
mksrc('datarepo-080b2c9e', 'ANVIL_CMG_UWash_DS_EP_20221020_ANV5_202303081452', 53),
mksrc('datarepo-4f75c9e3', 'ANVIL_CMG_UWash_GRU_20230308_ANV5_202303081731', 5861),
mksrc('datarepo-ec9365be', 'ANVIL_CMG_UWash_GRU_IRB_20221020_ANV5_202303081458', 563),
mksrc('datarepo-8392ac2c', 'ANVIL_GTEx_V8_hg38_20221013_ANV5_202303081502', 101205)
]))


Expand Down
16 changes: 16 additions & 0 deletions deployments/anvildev/sources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"anvil": [
"datarepo-1ba591a6.ANVIL_1000G_high_coverage_2019_20221019_ANV5_202303081523",
"datarepo-aa67671a.ANVIL_CMG_UWASH_DS_BAV_IRB_PUB_RD_20221020_ANV5_202303081451",
"datarepo-b4e0bfd5.ANVIL_CMG_UWASH_DS_BDIS_20221020_ANV5_202303081501",
"datarepo-333dd883.ANVIL_CMG_UWASH_DS_HFA_20221020_ANV5_202303081456",
"datarepo-b968cbdb.ANVIL_CMG_UWASH_DS_NBIA_20221020_ANV5_202303081459",
"datarepo-6a5b13ea.ANVIL_CMG_UWASH_HMB_20221020_ANV5_202303081455",
"datarepo-3d4c42f7.ANVIL_CMG_UWASH_HMB_IRB_20221020_ANV5_202303081454",
"datarepo-080b2c9e.ANVIL_CMG_UWash_DS_EP_20221020_ANV5_202303081452",
"datarepo-4f75c9e3.ANVIL_CMG_UWash_GRU_20230308_ANV5_202303081731",
"datarepo-ec9365be.ANVIL_CMG_UWash_GRU_IRB_20221020_ANV5_202303081458",
"datarepo-8392ac2c.ANVIL_GTEx_V8_hg38_20221013_ANV5_202303081502"
]
}

95 changes: 95 additions & 0 deletions scripts/generate_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Determine sizes
"""

import argparse
import json
from pathlib import (
Path,
)
import sys

import attr

from azul import (
config,
)
from azul.args import (
AzulArgumentHelpFormatter,
)
from azul.azulclient import (
AzulClient,
)
from azul.indexer import (
Prefix,
)
from azul.terra import (
TDRSourceSpec,
)

parser = argparse.ArgumentParser(description=__doc__,
formatter_class=AzulArgumentHelpFormatter)

parser.add_argument('--catalogs',
nargs='+',
metavar='NAME',
default=[
c for c in config.catalogs
if c not in config.integration_test_catalogs
],
help='The names of the catalogs to determine source specs for.')


@attr.s(auto_attribs=True, frozen=True, kw_only=True)
class SourceSpecArgs:
project: str
snapshot: str
subgraph_count: int

def __str__(self) -> str:
return f'mksrc({self.project!r}, {self.snapshot!r}, {self.subgraph_count!r})'


azul = AzulClient()

active_deployment_dir = Path(config.project_root) / 'deployments' / '.active'
with open(active_deployment_dir / 'sources.json') as f:
raw_sources = json.load(f)


def generate_sources(catalog: str) -> list[SourceSpecArgs]:
catalog_sources = raw_sources[catalog]
plugin = azul.repository_plugin(catalog)
sources = []
for source in catalog_sources:
project, snapshot = source.split('.')
spec = TDRSourceSpec(project=project,
name=snapshot,
is_snapshot=True,
prefix=Prefix.of_everything)
ref = plugin.resolve_source(str(spec))
partitions = plugin.list_partitions(ref)
sources.append(SourceSpecArgs(project=project,
snapshot=snapshot,
subgraph_count=sum(partitions.values())))

return sources


def main(args: list[str]):
args = parser.parse_args(args)

for catalog in args.catalogs:
print(catalog)
print('-' * len(catalog))
spec_args_list = generate_sources(catalog)
spec_args_list.sort(key=lambda spec_args: spec_args.snapshot)
print(',\n'.join(map(str, spec_args_list)))

msg = "Don't forget to manually specify flags for `ma` and `pop`."
print('-' * len(msg))
print(msg)


if __name__ == '__main__':
main(sys.argv[1:])
10 changes: 8 additions & 2 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,16 @@ def list_partitions(self,
for partition_prefix in prefix.partition_prefixes()
]
assert prefixes, prefix
entity_type = BundleEntityType.primary.value
primary = BundleEntityType.primary.value
supplementary = BundleEntityType.supplementary.value
rows = self._run_sql(f'''
SELECT prefix, COUNT(datarepo_row_id) AS subgraph_count
FROM {backtick(self._full_table_name(source.spec, entity_type))}
FROM (
SELECT datarepo_row_id FROM {backtick(self._full_table_name(source.spec, primary))}
UNION ALL
SELECT datarepo_row_id FROM {backtick(self._full_table_name(source.spec, supplementary))}
WHERE is_supplementary
)
JOIN UNNEST({prefixes}) AS prefix ON STARTS_WITH(datarepo_row_id, prefix)
GROUP BY prefix
''')
Expand Down

0 comments on commit b1c28fe

Please sign in to comment.