Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unpp rebased #124

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
14 changes: 13 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ x-datamart-defaults: &datamart_defaults
DATABASE_URL: postgis://postgres:@db:5432/etools_datamart
DATABASE_URL_ETOOLS: postgis://postgres:@db-etools:5432/etools
DATABASE_URL_PRP: postgis://postgres:@db-prp:5432/prp
DATABASE_URL_UNPP: postgis://postgres:@db-unpp:5432/unpp
AUTOCREATE_USERS: "admin,123"
CACHE_URL: "redis://redis:6379/1"
CACHE_URL_LOCK: "redis://redis:6379/1"
Expand Down Expand Up @@ -74,7 +75,7 @@ services:
volumes:
- "$PWD/build/db:/var/lib/postgresql/data"

# Rely on etools and prp database instances running locally
# Rely on etools, prp, and unpp database instances running locally
db-etools:
image: mdillon/postgis:9.6
container_name: datamart_etools
Expand All @@ -97,6 +98,17 @@ services:
volumes:
- "$PWD/build/prp:/var/lib/postgresql/data"

db-unpp:
image: mdillon/postgis:9.6
shm_size: '1gb'
container_name: datamart_unpp
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD:
POSTGRES_DB: unpp
volumes:
- "$PWD/build/unpp:/var/lib/postgresql/data"

redis:
image: redis:alpine
container_name: datamart_redis
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ max-complexity = 19
max-line-length = 160
filename = .py
exclude = .tox,migrations,.git,docs,diff_match_patch.py,deploy/**,settings
ignore = E731
ignore = E731, W504
1 change: 1 addition & 0 deletions src/etools_datamart/api/endpoints/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from .prp import * # noqa
from .system import * # noqa
from .unicef import * # noqa
from .unpp import * # noqa
1 change: 1 addition & 0 deletions src/etools_datamart/api/endpoints/unpp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .application import ApplicationViewSet
25 changes: 25 additions & 0 deletions src/etools_datamart/api/endpoints/unpp/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from unicef_rest_framework.ds import DynamicSerializerFilter
from unicef_rest_framework.ordering import OrderingFilter

from etools_datamart.api.endpoints.common import DataMartViewSet
from etools_datamart.api.endpoints.datamart.serializers import DataMartSerializer
from etools_datamart.api.filtering import DatamartQueryStringFilterBackend
from etools_datamart.apps.mart.unpp import models


class ApplicationSerializer(DataMartSerializer):
class Meta(DataMartSerializer.Meta):
model = models.Application
exclude = None
fields = '__all__'


class ApplicationViewSet(DataMartViewSet):
serializer_class = ApplicationSerializer
queryset = models.Application.objects.all()
serializers_fieldsets = {'std': ApplicationSerializer, }
filter_backends = [
DatamartQueryStringFilterBackend,
OrderingFilter,
DynamicSerializerFilter,
]
6 changes: 5 additions & 1 deletion src/etools_datamart/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ class ReadOnlyRouter(APIReadOnlyRouter):

router.register(r'system/monitor', endpoints.MonitorViewSet)

from etools_datamart.apps.sources.source_prp import api_urls # noqa isort:skip
router.register(r'unpp/application', endpoints.ApplicationViewSet)

import etools_datamart.apps.sources.source_prp.api_urls # noqa isort:skip
from etools_datamart.apps.sources.source_prp.backward_api_urls import backward_compatible_router # noqa isort:skip

import etools_datamart.apps.sources.unpp.api_urls # noqa isort:skip

from .endpoints.rapidpro import _urls_ # noqa isort:skip

urlpatterns = [
Expand Down
111 changes: 111 additions & 0 deletions src/etools_datamart/apps/etl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.cache import caches
from django.db import transaction
from django.utils import timezone
from django.utils.functional import cached_property

Expand All @@ -22,6 +23,7 @@
RequiredIsRunning,
)
from etools_datamart.celery import app
from etools_datamart.sentry import process_exception

loadeables = set()
locks = caches['lock']
Expand Down Expand Up @@ -461,3 +463,112 @@ def load(self, *, verbosity=0, stdout=None, ignore_dependencies=False, max_recor

def consistency_check(self):
pass


class CommonLoader(BaseLoader):
IGNORED_FIELDS = ['source_id', 'id', 'last_modify_date']

def get_queryset(self):
if self.config.queryset:
ret = self.config.queryset()
elif self.config.source:
ret = self.config.source.objects.all()
else: # pragma: no cover
raise ValueError(
"Option must define 'queryset' or 'source' attribute"
)
return ret

def filter_queryset(self, qs):
use_delta = self.context['only_delta'] and not self.context['is_empty']
if self.config.filters:
qs = qs.filter(**self.config.filters)
if use_delta and (self.config.last_modify_field and self.last_run):
logger.debug(f"Loader {self}: use deltas")
qs = qs.filter(
**{f"{self.config.last_modify_field}__gte": self.last_run}
)
return qs

def load(
self,
*,
verbosity=0,
stdout=None,
ignore_dependencies=False,
max_records=None,
only_delta=True,
run_type=RUN_UNKNOWN,
**kwargs,
):
self.on_start(run_type)
self.results = EtlResult()
logger.debug(f"Running loader {self}")
lock = self.lock()
truncate = self.config.truncate
try:
if lock: # pragma: no branch
if not ignore_dependencies:
for requirement in self.config.depends:
if requirement.loader.is_running():
raise RequiredIsRunning(requirement)
if requirement.loader.need_refresh(self):
raise RequiredIsMissing(requirement)
else:
logger.info(f"Loader {requirement} is uptodate")
self.mapping = {}
mart_fields = self.model._meta.concrete_fields
for field in mart_fields:
if field.name not in self.IGNORED_FIELDS:
self.mapping[field.name] = field.name
if self.config.mapping: # pragma: no branch
self.mapping.update(self.config.mapping)
self.update_context(today=timezone.now(),
max_records=max_records,
verbosity=verbosity,
records=0,
only_delta=only_delta,
is_empty=not self.model.objects.exists(),
stdout=stdout)
sid = transaction.savepoint()
try:
self.results.context = self.context
self.fields_to_compare = [
f for f in self.mapping.keys() if f not in ["seen"]
]
if truncate:
self.model.objects.truncate()
qs = self.filter_queryset(self.get_queryset())
for record in qs.all():
filters = self.config.key(self, record)
values = self.get_values(record)
op = self.process_record(filters, values)
self.increment_counter(op)

if stdout and verbosity > 0:
stdout.write("\n")
except MaxRecordsException:
pass
except Exception:
transaction.savepoint_rollback(sid)
raise
else:
logger.info(f"Unable to get lock for {self}")

except (RequiredIsMissing, RequiredIsRunning) as e:
self.on_end(error=e, retry=True)
raise
except BaseException as e:
self.on_end(e)
process_exception(e)
raise
else:
self.on_end(None)
finally:
if lock: # pragma: no branch
try:
lock.release()
except LockError as e: # pragma: no cover
logger.warning(e)

return self.results
111 changes: 5 additions & 106 deletions src/etools_datamart/apps/mart/prp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,12 @@
"""
from ast import literal_eval

from django.db import models, transaction
from django.db import models
from django.db.models import JSONField, Q
from django.utils import timezone

from redis.exceptions import LockError
from strategy_field.utils import get_attr

from etools_datamart.apps.etl.exceptions import MaxRecordsException, RequiredIsMissing, RequiredIsRunning
from etools_datamart.apps.etl.loader import BaseLoader, EtlResult, logger, RUN_UNKNOWN
from etools_datamart.apps.etl.loader import CommonLoader
from etools_datamart.apps.sources.source_prp.models import (
CoreCountry,
CoreGatewaytype,
Expand All @@ -70,109 +67,11 @@
UnicefProgrammedocumentSections,
UnicefProgressreport,
)
from etools_datamart.sentry import process_exception

from .base import PrpDataMartModel


class PrpBaseLoader(BaseLoader):

def get_queryset(self):
if self.config.queryset:
ret = self.config.queryset()
elif self.config.source:
ret = self.config.source.objects.all()
else: # pragma: no cover
raise ValueError("Option must define 'queryset' or 'source' attribute")

return ret

def filter_queryset(self, qs):
use_delta = self.context['only_delta'] and not self.context['is_empty']
if self.config.filters:
qs = qs.filter(**self.config.filters)
if use_delta and (self.config.last_modify_field and self.last_run):
logger.debug(f"Loader {self}: use deltas")
qs = qs.filter(**{f"{self.config.last_modify_field}__gte": self.last_run})
return qs

def load(self, *, verbosity=0, stdout=None,
ignore_dependencies=False, max_records=None,
only_delta=True, run_type=RUN_UNKNOWN, **kwargs):
self.on_start(run_type)
self.results = EtlResult()
logger.debug(f"Running loader {self}")
lock = self.lock()
truncate = self.config.truncate
try:
if lock: # pragma: no branch
if not ignore_dependencies:
for requirement in self.config.depends:
if requirement.loader.is_running():
raise RequiredIsRunning(requirement)
if requirement.loader.need_refresh(self):
raise RequiredIsMissing(requirement)
else:
logger.info(f"Loader {requirement} is uptodate")
self.mapping = {}
mart_fields = self.model._meta.concrete_fields
for field in mart_fields:
if field.name not in ['source_id', 'id', 'last_modify_date']:
self.mapping[field.name] = field.name
if self.config.mapping: # pragma: no branch
self.mapping.update(self.config.mapping)
self.update_context(today=timezone.now(),
max_records=max_records,
verbosity=verbosity,
records=0,
only_delta=only_delta,
is_empty=not self.model.objects.exists(),
stdout=stdout)
sid = transaction.savepoint()
try:
self.results.context = self.context
self.fields_to_compare = [f for f in self.mapping.keys() if f not in ["seen"]]
if truncate:
self.model.objects.truncate()
qs = self.filter_queryset(self.get_queryset())
for record in qs.all():
filters = self.config.key(self, record)
values = self.get_values(record)
op = self.process_record(filters, values)
self.increment_counter(op)

if stdout and verbosity > 0:
stdout.write("\n")
# deleted = self.model.objects.exclude(seen=today).delete()[0]
# self.results.deleted = deleted
except MaxRecordsException:
pass
except Exception:
transaction.savepoint_rollback(sid)
raise
else:
logger.info(f"Unable to get lock for {self}")

except (RequiredIsMissing, RequiredIsRunning) as e:
self.on_end(error=e, retry=True)
raise
except BaseException as e:
self.on_end(e)
process_exception(e)
raise
else:
self.on_end(None)
finally:
if lock: # pragma: no branch
try:
lock.release()
except LockError as e: # pragma: no cover
logger.warning(e)

return self.results


class IndicatorByLocationLoader(PrpBaseLoader):
class IndicatorByLocationLoader(CommonLoader):
def get_location_levelname(self, record, values, field_name):
pass

Expand Down Expand Up @@ -229,7 +128,7 @@ class Options:
}


class DataReportLoader(PrpBaseLoader):
class DataReportLoader(CommonLoader):

def get_queryset(self):
# all_progress_reports = UnicefProgressreport.objects.all()
Expand Down Expand Up @@ -275,7 +174,7 @@ def get_locations(self, record: IndicatorIndicatorlocationdata, values, **kwargs
levelname=location.gateway.name
))
values['locations_data'] = locs
return ", ".join([l['name'] for l in locs])
return ", ".join([loc['name'] for loc in locs])

def get_submitted_by(self, record: IndicatorIndicatorlocationdata, values, **kwargs):
user = get_attr(record, 'indicator_report.progress_report.submitted_by')
Expand Down
Empty file.
24 changes: 24 additions & 0 deletions src/etools_datamart/apps/mart/unpp/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from django.contrib.admin import ModelAdmin, register

from etools_datamart.apps.mart.unpp import models
from etools_datamart.libs.truncate import TruncateTableMixin

logger = logging.getLogger(__name__)


@register(models.Location)
class LocationAdmin(TruncateTableMixin, ModelAdmin):
list_display = (
'source_id',
'name',
'country_code',
'latitude',
'longitude',
)


@register(models.Application)
class ApplicationAdmin(TruncateTableMixin, ModelAdmin):
list_display = ('__str__',)
Loading