Skip to content

Commit

Permalink
[a r] Index supplementary files for AnVIL (#5000, PR #5059)
Browse files Browse the repository at this point in the history
  • Loading branch information
achave11-ucsc committed May 5, 2023
2 parents 48fcfd5 + c2ee74e commit e06862b
Show file tree
Hide file tree
Showing 23 changed files with 760 additions and 415 deletions.
7 changes: 3 additions & 4 deletions scripts/can_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
)
from azul.indexer import (
Bundle,
SourcedBundleFQID,
)
from azul.logging import (
configure_script_logging,
Expand Down Expand Up @@ -88,9 +87,9 @@ def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle:
for plugin_source_spec in plugin.sources:
if source_ref.spec.contains(plugin_source_spec):
plugin_source_ref = plugin.resolve_source(str(plugin_source_spec))
fqid = SourcedBundleFQID(source=plugin_source_ref,
uuid=bundle_uuid,
version=bundle_version)
fqid = plugin.resolve_bundle(dict(source=plugin_source_ref.to_json(),
uuid=bundle_uuid,
version=bundle_version))
bundle = plugin.fetch_bundle(fqid)
log.info('Fetched bundle %r version %r from catalog %r.',
fqid.uuid, fqid.version, catalog)
Expand Down
23 changes: 13 additions & 10 deletions src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from typing import (
Union,
cast,
)
import uuid

Expand All @@ -47,6 +48,7 @@
SignatureHelper,
)
from azul.indexer import (
SourceJSON,
SourceRef,
SourcedBundleFQID,
)
Expand Down Expand Up @@ -90,16 +92,12 @@ def synthesize_notification(self, bundle_fqid: SourcedBundleFQID) -> JSON:
"""
Generate a indexer notification for the given bundle.
"""
# Organic notifications sent by DSS wouldn't contain the `source` entry,
# Organic notifications sent by DSS have a different structure,
# but since DSS is end-of-life these synthetic notifications are now the
# only variant that would ever occur in the wild.
return {
'source': bundle_fqid.source.to_json(),
'transaction_id': str(uuid.uuid4()),
'match': {
'bundle_uuid': bundle_fqid.uuid,
'bundle_version': bundle_fqid.version
},
'bundle_fqid': bundle_fqid.to_json()
}

def bundle_message(self,
Expand Down Expand Up @@ -246,8 +244,11 @@ def message(partition_prefix: str) -> JSON:
def remote_reindex_partition(self, message: JSON) -> None:
catalog = message['catalog']
prefix = message['prefix']
# FIXME: Adopt `trycast` for casting JSON to TypeDict
# https://github.com/DataBiosphere/azul/issues/5171
source = cast(SourceJSON, message['source'])
validate_uuid_prefix(prefix)
source = self.repository_plugin(catalog).source_from_json(message['source'])
source = self.repository_plugin(catalog).source_from_json(source)
bundle_fqids = self.list_bundles(catalog, source, prefix)
bundle_fqids = self.filter_obsolete_bundle_versions(bundle_fqids)
logger.info('After filtering obsolete versions, '
Expand Down Expand Up @@ -382,9 +383,11 @@ def delete_bundle(self, catalog: CatalogName, bundle_uuid, bundle_version):
bundle_uuid, bundle_version, catalog)
notifications = [
{
'match': {
'bundle_uuid': bundle_uuid,
'bundle_version': bundle_version
# FIXME: delete_bundle script fails with KeyError: 'source'
# https://github.com/DataBiosphere/azul/issues/5105
'bundle_fqid': {
'uuid': bundle_uuid,
'version': bundle_version
}
}
]
Expand Down
51 changes: 40 additions & 11 deletions src/azul/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
Optional,
Type,
TypeVar,
get_args,
TypedDict,
)

import attr
from more_itertools import (
one,
)

from azul import (
RequirementError,
Expand All @@ -38,6 +35,7 @@
MutableJSON,
MutableJSONs,
SupportsLessThan,
get_generic_type_params,
)
from azul.uuids import (
UUIDPartition,
Expand All @@ -55,6 +53,9 @@ class BundleFQID(SupportsLessThan):
uuid: BundleUUID
version: BundleVersion

def to_json(self) -> MutableJSON:
return attr.asdict(self, recurse=False)


@attr.s(frozen=True, auto_attribs=True, kw_only=True)
class Prefix:
Expand Down Expand Up @@ -273,6 +274,11 @@ def contains(self, other: 'SourceSpec') -> bool:
)


class SourceJSON(TypedDict):
id: str
spec: str


SOURCE_REF = TypeVar('SOURCE_REF', bound='SourceRef')


Expand All @@ -296,7 +302,7 @@ class SourceRef(Generic[SOURCE_SPEC, SOURCE_REF]):
id: str
spec: SOURCE_SPEC

_lookup: ClassVar[dict[tuple[Type['SourceRef'], str], 'SourceRef']] = {}
_lookup: ClassVar[dict[tuple[Type['SourceRef'], str, 'SourceSpec'], 'SourceRef']] = {}
_lookup_lock = RLock()

def __new__(cls: Type[SOURCE_REF], *, id: str, spec: SOURCE_SPEC) -> SOURCE_REF:
Expand Down Expand Up @@ -344,32 +350,55 @@ def __new__(cls: Type[SOURCE_REF], *, id: str, spec: SOURCE_SPEC) -> SOURCE_REF:
assert self.spec == spec, (self.spec, spec)
return self

def to_json(self):
def to_json(self) -> SourceJSON:
return dict(id=self.id, spec=str(self.spec))

@classmethod
def from_json(cls, ref: JSON) -> 'SourceRef':
def from_json(cls, ref: SourceJSON) -> 'SourceRef':
return cls(id=ref['id'], spec=cls.spec_cls().parse(ref['spec']))

@classmethod
def spec_cls(cls) -> Type[SourceSpec]:
base_cls = one(getattr(cls, '__orig_bases__'))
spec_cls, ref_cls = get_args(base_cls)
spec_cls, ref_cls = get_generic_type_params(cls, SourceSpec, SourceRef)
return spec_cls


class SourcedBundleFQIDJSON(TypedDict):
uuid: BundleUUID
version: BundleVersion
source: SourceJSON


BUNDLE_FQID = TypeVar('BUNDLE_FQID', bound='SourcedBundleFQID')


@attr.s(auto_attribs=True, frozen=True, kw_only=True, order=True)
class SourcedBundleFQID(BundleFQID, Generic[SOURCE_REF]):
source: SOURCE_REF

@classmethod
def source_ref_cls(cls) -> Type[SOURCE_REF]:
ref_cls, = get_generic_type_params(cls, SourceRef)
return ref_cls

@classmethod
def from_json(cls, json: SourcedBundleFQIDJSON) -> 'SourcedBundleFQID':
json = dict(json)
source = cls.source_ref_cls().from_json(json.pop('source'))
return cls(source=source, **json)

def upcast(self):
return BundleFQID(uuid=self.uuid,
version=self.version)

def to_json(self) -> SourcedBundleFQIDJSON:
return dict(super().to_json(),
source=self.source.to_json())


@attr.s(auto_attribs=True, kw_only=True)
class Bundle(ABC, Generic[SOURCE_REF]):
fqid: SourcedBundleFQID[SOURCE_REF]
class Bundle(ABC, Generic[BUNDLE_FQID]):
fqid: BUNDLE_FQID
manifest: MutableJSONs
"""
Each item of the `manifest` attribute's value has this shape:
Expand Down
27 changes: 16 additions & 11 deletions src/azul/indexer/index_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import json
import logging
import time
from typing import (
cast,
)
import uuid

import chalice
Expand Down Expand Up @@ -49,6 +52,7 @@
)
from azul.indexer import (
BundlePartition,
SourcedBundleFQIDJSON,
)
from azul.indexer.document import (
Contribution,
Expand Down Expand Up @@ -130,25 +134,25 @@ def _queue_notification(self,

def _validate_notification(self, notification):
try:
match = notification['match']
bundle_fqid = notification['bundle_fqid']
except KeyError:
raise chalice.BadRequestError('Missing notification entry: match')
raise chalice.BadRequestError('Missing notification entry: bundle_fqid')

try:
bundle_uuid = match['bundle_uuid']
bundle_uuid = bundle_fqid['uuid']
except KeyError:
raise chalice.BadRequestError('Missing notification entry: bundle_uuid')
raise chalice.BadRequestError('Missing notification entry: bundle_fqid.uuid')

try:
bundle_version = match['bundle_version']
bundle_version = bundle_fqid['version']
except KeyError:
raise chalice.BadRequestError('Missing notification entry: bundle_version')
raise chalice.BadRequestError('Missing notification entry: bundle_fqid.version')

if not isinstance(bundle_uuid, str):
raise chalice.BadRequestError(f'Invalid type: bundle_uuid: {type(bundle_uuid)} (should be str)')
raise chalice.BadRequestError(f'Invalid type: uuid: {type(bundle_uuid)} (should be str)')

if not isinstance(bundle_version, str):
raise chalice.BadRequestError(f'Invalid type: bundle_version: {type(bundle_version)} (should be str)')
raise chalice.BadRequestError(f'Invalid type: version: {type(bundle_version)} (should be str)')

if bundle_uuid.lower() != str(uuid.UUID(bundle_uuid)).lower():
raise chalice.BadRequestError(f'Invalid syntax: {bundle_uuid} (should be a UUID)')
Expand Down Expand Up @@ -196,16 +200,17 @@ def transform(self, catalog: CatalogName, notification: JSON, delete: bool) -> l
notification into a list of contributions to documents, each document
representing one metadata entity in the index.
"""
match, source = notification['match'], notification['source']
bundle_uuid, bundle_version = match['bundle_uuid'], match['bundle_version']
# FIXME: Adopt `trycast` for casting JSON to TypeDict
# https://github.com/DataBiosphere/azul/issues/5171
bundle_fqid = cast(SourcedBundleFQIDJSON, notification['bundle_fqid'])
try:
partition = notification['partition']
except KeyError:
partition = BundlePartition.root
else:
partition = BundlePartition.from_json(partition)
service = self.index_service
bundle = service.fetch_bundle(catalog, source, bundle_uuid, bundle_version)
bundle = service.fetch_bundle(catalog, bundle_fqid)
results = service.transform(catalog, bundle, partition, delete=delete)
result = first(results)
if isinstance(result, BundlePartition):
Expand Down
11 changes: 3 additions & 8 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
BundlePartition,
BundleUUID,
BundleVersion,
SourcedBundleFQID,
SourcedBundleFQIDJSON,
)
from azul.indexer.aggregate import (
Entities,
Expand Down Expand Up @@ -172,15 +172,10 @@ def index_names(self, catalog: CatalogName) -> list[str]:

def fetch_bundle(self,
catalog: CatalogName,
source: JSON,
bundle_uuid: str,
bundle_version: str
bundle_fqid: SourcedBundleFQIDJSON
) -> Bundle:
plugin = self.repository_plugin(catalog)
source = plugin.source_from_json(source)
bundle_fqid = SourcedBundleFQID(source=source,
uuid=bundle_uuid,
version=bundle_version)
bundle_fqid = plugin.resolve_bundle(bundle_fqid)
return plugin.fetch_bundle(bundle_fqid)

def index(self, catalog: CatalogName, bundle: Bundle) -> None:
Expand Down
Loading

0 comments on commit e06862b

Please sign in to comment.