Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the parse function to accept an entity id #189

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/buildingcrawler.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,13 @@ parse:
author: .//meta[@name="author"]/@content
publishedAt: .//*[@class="date"]/text()
description: .//meta[@property="og:description"]/@content
keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we use syntax similar to ftm mappings for consistency. Something like https://github.com/alephdata/aleph/blob/main/mappings/md_companies.yml#L15-L17

So the keys section will look like:

keys:
  - title
  - author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keys section needs to be updated now I think?

- key: .//meta[@property="og:title"]/@content
- key: .//meta[@name="author"]/@content
```

We can also supply a set of values to act as a unique key for this entity. If a key isn't supplied then memorious will default to using the url of the page to generate a key for you.

The `data` `alpeh_emit_entity` emits to the next stages includes the following new items:

* `aleph_id`: id of the uploaded entity in Alpeh
Expand Down
4 changes: 4 additions & 0 deletions example/config/simple_article_scraper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pipeline:
author: .//meta[@name="author"]/@content
publishedAt: .//*[@class="date"]/text()
description: .//meta[@property="og:description"]/@content
keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be the method needs to be changed to use the built-in parse method instead of a custom Python method to match the documentation example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The problem with doing that though is that the scraper won't be able to extract the body of the article, which is why the custom script exists. I guess, as it's an example it doesn't really matter too much, but that is why we have a difference.

Personally I don't have an issue with having the documentation not match the example in the repo

- title
- author
- publishedAt
handle:
store: store
fetch: fetch
Expand Down
24 changes: 24 additions & 0 deletions memorious/logic/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# from typing_extensions import TypedDict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is not generic enough to be its own module. We should put it in the aleph operation module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not a module. It's the definition of a type. I would type definitions seperate from modules that actually do stuff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting the type definition near the relevant code is the convention we're already using in other places. See for example https://github.com/alephdata/followthemoney/blob/master/followthemoney/schema.py#L24

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I'll move the meta file into the operations module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant the Meta type definition should live in operations/aleph.py and not in a separate file since it's very specific to the aleph_emit operations and unlikely to be reused anywhere else.

from typing import Optional, TypedDict


class MetaBase(TypedDict):
crawler: Optional[str]
foreign_id: Optional[str]
source_url: Optional[str]
title: Optional[str]
author: Optional[str]
publisher: Optional[str]
file_name: Optional[str]
retrieved_at: Optional[str]
modified_at: Optional[str]
published_at: Optional[str]
headers: any
keywords: any


class Meta(MetaBase, total=False):
parent: any
languages: any
countries: any
mime_type: any
111 changes: 64 additions & 47 deletions memorious/operations/aleph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from pprint import pprint # noqa
from pprint import pprint
from typing import Optional # noqa
from banal import clean_dict # type: ignore

from servicelayer.cache import make_key # type: ignore
Expand All @@ -8,69 +9,75 @@
from alephclient.util import backoff
from alephclient.errors import AlephException
from memorious.core import get_rate_limit # type: ignore
from memorious.logic.context import Context
from memorious.logic.meta import Meta


def _create_document_metadata(context, data) -> dict:
meta = {}
languages = context.params.get("languages")
meta["languages"] = data.get("languages", languages)
countries = context.params.get("countries")
meta["countries"] = data.get("countries", countries)
mime_type = context.params.get("mime_type")
meta["mime_type"] = data.get("mime_type", mime_type)
return meta
def _create_document_metadata(meta_base: Meta, context: Context, data: dict) -> Meta:
languages: list[str] = list(context.params.get("languages", []))
countries: list[str] = list(context.params.get("countries", []))
mime_type: str = context.params.get("mime_type", "")

context.log.warn(languages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray warning

meta_base["languages"] = data.get("languages", languages)
meta_base["countries"] = data.get("countries", countries)
meta_base["mime_type"] = data.get("mime_type", mime_type)

return meta_base


def _create_meta_object(context, data) -> dict:
def _create_meta_object(context: Context, data: dict) -> Meta:
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))

meta = {
"crawler": context.crawler.name,
"foreign_id": foreign_id,
"source_url": source_url,
"title": data.get("title"),
"author": data.get("author"),
"publisher": data.get("publisher"),
"file_name": data.get("file_name"),
"retrieved_at": data.get("retrieved_at"),
"modified_at": data.get("modified_at"),
"published_at": data.get("published_at"),
"headers": data.get("headers", {}),
"keywords": data.get("keywords", []),
}
meta = Meta(
crawler=context.crawler.name,
foreign_id=foreign_id,
source_url=source_url,
title=data.get("title"),
author=data.get("author"),
publisher=data.get("publisher"),
file_name=data.get("file_name"),
retrieved_at=data.get("retrieved_at"),
modified_at=data.get("modified_at"),
published_at=data.get("published_at"),
headers=data.get("headers", {}),
keywords=data.get("keywords", []),
)

if data.get("aleph_folder_id"):
meta["parent"] = {"id": data.get("aleph_folder_id")}
Copy link
Contributor

@sunu sunu Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using 2 different styles here. We are treating meta as a object in the line above and as a dict here. I think we should be consistent and use a single style throughout the function.

And imo we can merge _create_meta_object and _create_document_metadata together into just one function and only set whatever metadata is available.


return meta


def aleph_emit(context, data):
def aleph_emit(context: Context, data: dict):
aleph_emit_document(context, data)


def aleph_emit_document(context, data):
def aleph_emit_document(context: Context, data: dict):
api = get_api(context)
if api is None:
return
collection_id = get_collection_id(context, api)
content_hash = data.get("content_hash")
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))
# Fetch document id from cache
document_id = context.get_tag(make_key(collection_id, foreign_id, content_hash))
collection_id: str = get_collection_id(context, api)
content_hash: Optional[str] = data.get("content_hash")
source_url: str = data.get("source_url", data.get("url"))
foreign_id: str = data.get("foreign_id", data.get("request_id", source_url))
document_id: Optional[str] = context.get_tag(
make_key(collection_id, foreign_id, content_hash)
)

if document_id:
context.log.info("Skip aleph upload: %s", foreign_id)
data["aleph_id"] = document_id
context.emit(data=data, optional=True)
return

meta = clean_dict(_create_meta_object(context, data))
meta.update(_create_document_metadata(context, data))

meta.update(_create_document_metadata(meta, context, data))
label = meta.get("file_name", meta.get("source_url"))
context.log.info("Upload: %s", label)

with context.load_file(content_hash) as fh:
if fh is None:
return
Expand Down Expand Up @@ -100,7 +107,7 @@ def aleph_emit_document(context, data):
backoff(exc, try_number)


def aleph_folder(context, data):
def aleph_folder(context: Context, data: dict):
api = get_api(context)
if api is None:
return
Expand Down Expand Up @@ -134,19 +141,29 @@ def aleph_folder(context, data):
backoff(ae, try_number)


def aleph_emit_entity(context, data):
def aleph_emit_entity(context: Context, data: dict) -> None:
context.log.info("Emit to entity: {}".format(data.get("entity_id")))

api = get_api(context)
if api is None:
return
collection_id = get_collection_id(context, api)
entity_id = data.get("entity_id")
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))
collection_id: str = get_collection_id(context, api)
entity_id: Optional[str] = data.get("entity_id")
source_url: Optional[str] = data.get("source_url", data.get("url"))
foreign_id: Optional[str] = data.get(
"foreign_id", data.get("request_id", source_url)
)

# Fetch id from cache
if entity_id is None:
context.log.warn("No entity_id found. Skipping store")
context.emit(data=data, optional=True)
return

cached_key = context.get_tag(make_key(collection_id, foreign_id, entity_id))

if cached_key:
context.log.info("Skip entity creation: {}".format(foreign_id))
context.log.info("Entity exists. Skip creation: {}".format(cached_key))
data["aleph_id"] = cached_key
context.emit(data=data, optional=True)
return
Expand All @@ -156,7 +173,7 @@ def aleph_emit_entity(context, data):
rate_limit = get_rate_limit("aleph", limit=rate)
rate_limit.comply()
try:
res = api.write_entity(
res: dict[str, str] = api.write_entity(
collection_id,
{
"schema": data.get("schema"),
Expand All @@ -166,7 +183,7 @@ def aleph_emit_entity(context, data):
)

aleph_id = res.get("id")
context.log.info("Aleph entity ID: %s", aleph_id)
context.log.info("Entity created. entity_id is: %s", aleph_id)

# Save the entity id in cache for future use
context.set_tag(make_key(collection_id, foreign_id, entity_id), aleph_id)
Expand All @@ -182,19 +199,19 @@ def aleph_emit_entity(context, data):
backoff(exc, try_number)


def get_api(context):
def get_api(context: Context) -> Optional[AlephAPI]:
if not settings.HOST:
context.log.warning("No $ALEPHCLIENT_HOST, skipping upload...")
return None
if not settings.API_KEY:
context.log.warning("No $ALEPHCLIENT_API_KEY, skipping upload...")
return None

session_id = "memorious:%s" % context.crawler.name
session_id: str = "memorious:%s" % context.crawler.name
return AlephAPI(settings.HOST, settings.API_KEY, session_id=session_id)


def get_collection_id(context, api):
def get_collection_id(context: Context, api: AlephAPI) -> str:
if not hasattr(context.stage, "aleph_cid"):
foreign_id = context.get("collection", context.crawler.name)
config = {"label": context.crawler.description}
Expand Down
33 changes: 28 additions & 5 deletions memorious/operations/parse.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging

from typing import Optional
from banal import ensure_list
from urllib.parse import urljoin
from normality import collapse_spaces
from servicelayer.cache import make_key

from memorious.helpers.key import make_id
from memorious.helpers.rule import Rule
from memorious.helpers.dates import iso_date
from memorious.logic.context import Context


log = logging.getLogger(__name__)
Expand All @@ -17,7 +21,7 @@
]


def parse_html(context, data, result):
def parse_html(context: Context, data, result: dict) -> None:
context.log.info("Parse: %r", result.url)

for title in result.html.xpath(".//title/text()"):
Expand Down Expand Up @@ -68,7 +72,7 @@ def parse_html(context, data, result):
context.emit(rule="fetch", data=data)


def parse_for_metadata(context, data, html):
def parse_for_metadata(context: Context, data, html) -> dict:
meta = context.params.get("meta", {})
meta_date = context.params.get("meta_date", {})

Expand All @@ -88,22 +92,41 @@ def parse_for_metadata(context, data, html):
if value is not None:
data[key] = value
break
meta_paths.update(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

return data


def parse_ftm(context, data, html):
def get_entity_id_from_keys(keys: Optional[list], properties: dict, html) -> str:
temp_key = ""

if isinstance(keys, list):
for key in keys:
temp_key = "".join(properties[key])

if not temp_key == "":
return make_id(temp_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_id can take multiple arguments. You don't need to join them beforehand.



def parse_ftm(context: Context, data, html):
properties = context.params.get("properties")
properties_dict = {}

for key, value in properties.items():
properties_dict[key] = html.xpath(value)

data["schema"] = context.params.get("schema")
data["properties"] = properties_dict

generated_entity_id = get_entity_id_from_keys(
context.params.get("keys"), properties_dict, html
)

def parse(context, data):
with context.http.rehash(data) as result:
if generated_entity_id:
data["entity_id"] = generated_entity_id


def parse(context: Context, data):
with context.http.rehash(data) as result:
if result.html is not None:
# Get extra metadata from the DOM
parse_for_metadata(context, data, result.html)
Expand Down
Loading