Skip to content

Commit

Permalink
Parameterize repository plugin with bundle FQID class
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Mar 18, 2023
1 parent 099c33b commit 3eb1587
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 57 deletions.
5 changes: 1 addition & 4 deletions src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def synthesize_notification(self, bundle_fqid: SourcedBundleFQID) -> JSON:
return {
'source': bundle_fqid.source.to_json(),
'transaction_id': str(uuid.uuid4()),
'match': {
'bundle_uuid': bundle_fqid.uuid,
'bundle_version': bundle_fqid.version
},
'match': bundle_fqid.fqid_json()
}

def bundle_message(self,
Expand Down
24 changes: 22 additions & 2 deletions src/azul/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class BundleFQID(SupportsLessThan):
uuid: BundleUUID
version: BundleVersion

def fqid_json(self) -> MutableJSON:
return {
'bundle_uuid': self.uuid,
'bundle_version': self.version
}


@attr.s(frozen=True, auto_attribs=True, kw_only=True)
class Prefix:
Expand Down Expand Up @@ -351,18 +357,32 @@ def spec_cls(cls) -> Type[SourceSpec]:
return spec_cls


BUNDLE_FQID = TypeVar('BUNDLE_FQID', bound='SourcedBundleFQID')


@attr.s(auto_attribs=True, frozen=True, kw_only=True, order=True)
class SourcedBundleFQID(BundleFQID, Generic[SOURCE_REF]):
source: SOURCE_REF

@classmethod
def source_ref_cls(cls) -> Type[SOURCE_REF]:
ref_cls, = get_generic_type_params(cls, SourceRef)
return ref_cls

@classmethod
def from_json(cls, source: SOURCE_REF, json: JSON) -> 'SourcedBundleFQID':
return cls(source=source,
uuid=json['bundle_uuid'],
version=json['bundle_version'])

def upcast(self):
return BundleFQID(uuid=self.uuid,
version=self.version)


@attr.s(auto_attribs=True, kw_only=True)
class Bundle(ABC, Generic[SOURCE_REF]):
fqid: SourcedBundleFQID[SOURCE_REF]
class Bundle(ABC, Generic[BUNDLE_FQID]):
fqid: BUNDLE_FQID
manifest: MutableJSONs
"""
Each item of the `manifest` attribute's value has this shape:
Expand Down
3 changes: 1 addition & 2 deletions src/azul/indexer/index_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,14 @@ def transform(self, catalog: CatalogName, notification: JSON, delete: bool) -> l
representing one metadata entity in the index.
"""
match, source = notification['match'], notification['source']
bundle_uuid, bundle_version = match['bundle_uuid'], match['bundle_version']
try:
partition = notification['partition']
except KeyError:
partition = BundlePartition.root
else:
partition = BundlePartition.from_json(partition)
service = self.index_service
bundle = service.fetch_bundle(catalog, source, bundle_uuid, bundle_version)
bundle = service.fetch_bundle(catalog, source, match)
results = service.transform(catalog, bundle, partition, delete=delete)
result = first(results)
if isinstance(result, BundlePartition):
Expand Down
9 changes: 2 additions & 7 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
BundlePartition,
BundleUUID,
BundleVersion,
SourcedBundleFQID,
)
from azul.indexer.aggregate import (
Entities,
Expand Down Expand Up @@ -173,14 +172,10 @@ def index_names(self, catalog: CatalogName) -> list[str]:
def fetch_bundle(self,
catalog: CatalogName,
source: JSON,
bundle_uuid: str,
bundle_version: str
match: JSON
) -> Bundle:
plugin = self.repository_plugin(catalog)
source = plugin.source_from_json(source)
bundle_fqid = SourcedBundleFQID(source=source,
uuid=bundle_uuid,
version=bundle_version)
bundle_fqid = plugin.resolve_bundle(source, match)
return plugin.fetch_bundle(bundle_fqid)

def index(self, catalog: CatalogName, bundle: Bundle) -> None:
Expand Down
31 changes: 24 additions & 7 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from azul import (
CatalogName,
cache,
cached_property,
config,
)
Expand All @@ -43,6 +44,7 @@
http_client,
)
from azul.indexer import (
BUNDLE_FQID,
Bundle,
SOURCE_REF,
SOURCE_SPEC,
Expand Down Expand Up @@ -402,7 +404,7 @@ def filter_stage(self) -> 'Type[FilterStage]':
raise NotImplementedError


class RepositoryPlugin(Generic[SOURCE_SPEC, SOURCE_REF], Plugin):
class RepositoryPlugin(Generic[SOURCE_SPEC, SOURCE_REF, BUNDLE_FQID], Plugin):

@classmethod
def type_name(cls) -> str:
Expand Down Expand Up @@ -439,13 +441,24 @@ def list_sources(self,
"""
raise NotImplementedError

@cached_property
def _source_ref_cls(self) -> Type[SOURCE_REF]:
cls = type(self)
spec_cls, ref_cls = get_generic_type_params(cls, SourceSpec, SourceRef)
@classmethod
@cache
def _get_params(cls) -> tuple:
spec_cls, ref_cls, fqid_cls = get_generic_type_params(cls, SourceSpec, SourceRef, SourcedBundleFQID)
assert fqid_cls.source_ref_cls() is ref_cls
assert ref_cls.spec_cls() is spec_cls
return spec_cls, ref_cls, fqid_cls

@property
def _source_ref_cls(self) -> Type[SOURCE_REF]:
spec_cls, ref_cls, fqid_cls = self._get_params()
return ref_cls

@property
def _bundle_fqid_cls(self) -> Type[BUNDLE_FQID]:
spec_cls, ref_cls, fqid_cls = self._get_params()
return fqid_cls

def source_from_json(self, ref: JSON) -> SOURCE_REF:
"""
Instantiate a :class:`SourceRef` from its JSON representation. The
Expand All @@ -472,11 +485,15 @@ def _lookup_source_id(self, spec: SOURCE_SPEC) -> str:
"""
raise NotImplementedError

def resolve_bundle(self, source: JSON, fqid: JSON) -> BUNDLE_FQID:
bundle_fqid_cls = self._bundle_fqid_cls
return bundle_fqid_cls.from_json(self.source_from_json(source), fqid)

@abstractmethod
def list_bundles(self,
source: SOURCE_REF,
prefix: str
) -> list[SourcedBundleFQID[SOURCE_REF]]:
) -> list[BUNDLE_FQID]:
"""
List the bundles in the given source whose UUID starts with the given
prefix.
Expand Down Expand Up @@ -504,7 +521,7 @@ def list_partitions(self, source: SOURCE_REF) -> Optional[Mapping[str, int]]:
return None

@abstractmethod
def fetch_bundle(self, bundle_fqid: SourcedBundleFQID[SOURCE_REF]) -> Bundle:
def fetch_bundle(self, bundle_fqid: BUNDLE_FQID) -> Bundle[BUNDLE_FQID]:
"""
Fetch the given bundle.
Expand Down
19 changes: 10 additions & 9 deletions src/azul/plugins/repository/canned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ class CannedSourceRef(SourceRef[SimpleSourceSpec, 'CannedSourceRef']):
pass


CannedBundleFQID = SourcedBundleFQID[CannedSourceRef]
class CannedBundleFQID(SourcedBundleFQID[CannedSourceRef]):
pass


@dataclass(frozen=True)
class Plugin(RepositoryPlugin[SimpleSourceSpec, CannedSourceRef]):
class Plugin(RepositoryPlugin[SimpleSourceSpec, CannedSourceRef, CannedBundleFQID]):
_sources: Set[SimpleSourceSpec]

@classmethod
Expand Down Expand Up @@ -118,9 +119,9 @@ def list_bundles(self, source: CannedSourceRef, prefix: str) -> list[CannedBundl
bundle_fqids = []
for link in self.staging_area(source.spec).links.values():
if link.uuid.startswith(prefix):
bundle_fqids.append(SourcedBundleFQID(source=source,
uuid=link.uuid,
version=link.version))
bundle_fqids.append(CannedBundleFQID(source=source,
uuid=link.uuid,
version=link.version))
log.info('There are %i bundle(s) with prefix %r in source %r.',
len(bundle_fqids), prefix, source)
return bundle_fqids
Expand All @@ -131,9 +132,9 @@ def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> Bundle:
staging_area = self.staging_area(bundle_fqid.source.spec)
version, manifest, metadata = staging_area.get_bundle(bundle_fqid.uuid)
if bundle_fqid.version is None:
bundle_fqid = SourcedBundleFQID(source=bundle_fqid.source,
uuid=bundle_fqid.uuid,
version=version)
bundle_fqid = CannedBundleFQID(source=bundle_fqid.source,
uuid=bundle_fqid.uuid,
version=version)
bundle = CannedBundle(fqid=bundle_fqid,
manifest=cast(MutableJSONs, manifest),
metadata_files=cast(MutableJSON, metadata))
Expand Down Expand Up @@ -232,7 +233,7 @@ def retry_after(self) -> Optional[int]:
return self._retry_after


class CannedBundle(Bundle[CannedSourceRef]):
class CannedBundle(Bundle[CannedBundleFQID]):

def drs_path(self, manifest_entry: JSON) -> Optional[str]:
return None
7 changes: 4 additions & 3 deletions src/azul/plugins/repository/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ def id_from_spec(cls, spec: SimpleSourceSpec) -> str:
return str(uuid5(cls.namespace, spec.name))


DSSBundleFQID = SourcedBundleFQID[DSSSourceRef]
class DSSBundleFQID(SourcedBundleFQID[DSSSourceRef]):
pass


class Plugin(RepositoryPlugin[SimpleSourceSpec, DSSSourceRef]):
class Plugin(RepositoryPlugin[SimpleSourceSpec, DSSSourceRef, DSSBundleFQID]):

@classmethod
def create(cls, catalog: CatalogName) -> RepositoryPlugin:
Expand Down Expand Up @@ -460,7 +461,7 @@ def retry_after(self) -> Optional[int]:
return self._retry_after


class DSSBundle(Bundle[DSSSourceRef]):
class DSSBundle(Bundle[DSSBundleFQID]):

def drs_path(self, manifest_entry: JSON) -> str:
file_uuid = manifest_entry['uuid']
Expand Down
11 changes: 8 additions & 3 deletions src/azul/plugins/repository/tdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
DRSClient,
)
from azul.indexer import (
BUNDLE_FQID,
Bundle,
SOURCE_REF,
SOURCE_SPEC,
SourcedBundleFQID,
)
from azul.plugins import (
Expand All @@ -62,10 +65,12 @@

log = logging.getLogger(__name__)

TDRBundleFQID = SourcedBundleFQID[TDRSourceRef]

class TDRBundleFQID(SourcedBundleFQID[TDRSourceRef]):
pass

class TDRBundle(Bundle[TDRSourceRef]):

class TDRBundle(Bundle[TDRBundleFQID]):

def drs_path(self, manifest_entry: JSON) -> Optional[str]:
return manifest_entry.get('drs_path')
Expand All @@ -82,7 +87,7 @@ def _parse_drs_path(self, drs_uri: str) -> str:


@attr.s(kw_only=True, auto_attribs=True, frozen=True)
class TDRPlugin(RepositoryPlugin[TDRSourceSpec, TDRSourceRef]):
class TDRPlugin(RepositoryPlugin[SOURCE_SPEC, SOURCE_REF, BUNDLE_FQID]):
_sources: Set[TDRSourceSpec]

@classmethod
Expand Down
7 changes: 3 additions & 4 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
)
from azul.indexer import (
BundleFQID,
SourcedBundleFQID,
)
from azul.indexer.document import (
EntityReference,
Expand Down Expand Up @@ -181,7 +180,7 @@ def _parse_drs_uri(self, file_ref: Optional[str]) -> Optional[str]:
return self._parse_drs_path(file_ref)


class Plugin(TDRPlugin):
class Plugin(TDRPlugin[TDRSourceSpec, TDRSourceRef, TDRBundleFQID]):

@cached_property
def _version(self):
Expand Down Expand Up @@ -237,7 +236,7 @@ def list_partitions(self,
''')
return {row['prefix']: row['subgraph_count'] for row in rows}

def _emulate_bundle(self, bundle_fqid: SourcedBundleFQID) -> TDRAnvilBundle:
def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDRAnvilBundle:
source = bundle_fqid.source
bundle_entity = self._bundle_entity(bundle_fqid)

Expand Down Expand Up @@ -278,7 +277,7 @@ def _emulate_bundle(self, bundle_fqid: SourcedBundleFQID) -> TDRAnvilBundle:

return result

def _bundle_entity(self, bundle_fqid: SourcedBundleFQID) -> KeyReference:
def _bundle_entity(self, bundle_fqid: TDRBundleFQID) -> KeyReference:
source = bundle_fqid.source
bundle_uuid = bundle_fqid.uuid
entity_id = uuids.change_version(bundle_uuid,
Expand Down
25 changes: 12 additions & 13 deletions src/azul/plugins/repository/tdr_hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
)
from azul.indexer import (
BundleFQID,
SourcedBundleFQID,
)
from azul.indexer.document import (
EntityID,
Expand Down Expand Up @@ -282,7 +281,7 @@ def _parse_drs_uri(self,
return None


class Plugin(TDRPlugin):
class Plugin(TDRPlugin[TDRSourceSpec, TDRSourceRef, TDRBundleFQID]):

def list_partitions(self,
source: TDRSourceRef
Expand Down Expand Up @@ -311,9 +310,9 @@ def _list_bundles(self, source: TDRSourceRef, prefix: str) -> list[TDRBundleFQID
WHERE STARTS_WITH(links_id, '{source_prefix + prefix}')
''', group_by='links_id')
return [
SourcedBundleFQID(source=source,
uuid=row['links_id'],
version=self.format_version(row['version']))
TDRBundleFQID(source=source,
uuid=row['links_id'],
version=self.format_version(row['version']))
for row in current_bundles
]

Expand Down Expand Up @@ -377,8 +376,8 @@ def _stitch_bundles(self,
source = root_bundle.fqid.source
entities: EntitiesByType = defaultdict(set)
root_entities = None
unprocessed: set[SourcedBundleFQID] = {root_bundle.fqid}
processed: set[SourcedBundleFQID] = set()
unprocessed: set[TDRBundleFQID] = {root_bundle.fqid}
processed: set[TDRBundleFQID] = set()
stitched_links: list[JSON] = []
# Retrieving links in batches eliminates the risk of exceeding
# BigQuery's maximum query size. Using a batches size 1000 appears to be
Expand Down Expand Up @@ -420,8 +419,8 @@ def _stitch_bundles(self,
return entities, root_entities, stitched_links

def _retrieve_links(self,
links_ids: set[SourcedBundleFQID]
) -> dict[SourcedBundleFQID, JSON]:
links_ids: set[TDRBundleFQID]
) -> dict[TDRBundleFQID, JSON]:
"""
Retrieve links entities from BigQuery and parse the `content` column.
:param links_ids: Which links entities to retrieve.
Expand Down Expand Up @@ -512,7 +511,7 @@ def join(i):

def _find_upstream_bundles(self,
source: TDRSourceRef,
outputs: Entities) -> set[SourcedBundleFQID]:
outputs: Entities) -> set[TDRBundleFQID]:
"""
Search for bundles containing processes that produce the specified output
entities.
Expand All @@ -530,9 +529,9 @@ def _find_upstream_bundles(self,
bundles = set()
outputs_found = set()
for row in rows:
bundles.add(SourcedBundleFQID(source=source,
uuid=row['links_id'],
version=self.format_version(row['version'])))
bundles.add(TDRBundleFQID(source=source,
uuid=row['links_id'],
version=self.format_version(row['version'])))
outputs_found.add(row['output_id'])
missing = set(output_ids) - outputs_found
require(not missing,
Expand Down
Loading

0 comments on commit 3eb1587

Please sign in to comment.