Skip to content

Commit

Permalink
🎨 notify frontend about current efs disk space (#6520)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Oct 29, 2024
1 parent 1067924 commit c0b276a
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..api.rpc.routes import setup_rpc_routes
from ..services.background_tasks_setup import setup as setup_background_tasks
from ..services.efs_manager_setup import setup as setup_efs_manager
from ..services.fire_and_forget_setup import setup as setup_fire_and_forget
from ..services.modules.db import setup as setup_db
from ..services.modules.rabbitmq import setup as setup_rabbitmq
from ..services.modules.redis import setup as setup_redis
Expand Down Expand Up @@ -56,6 +57,8 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
setup_background_tasks(app) # requires Redis, DB
setup_process_messages(app) # requires Rabbit

setup_fire_and_forget(app)

# EVENTS
async def _on_startup() -> None:
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ async def get_project_node_data_size(

return await efs_manager_utils.get_size_bash_async(_dir_path)

async def list_project_node_state_names(
self, project_id: ProjectID, node_id: NodeID
) -> list[str]:
"""
These are currently state volumes that are mounted via docker volume to dynamic sidecar and user services
(ex. ".data_assets" and "home_user_workspace")
"""
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

project_node_states = []
for child in _dir_path.iterdir():
if child.is_dir():
project_node_states.append(child.name)
else:
_logger.error(
"This is not a directory. This should not happen! %s",
_dir_path / child.name,
)
return project_node_states

async def remove_project_node_data_write_permissions(
self, project_id: ProjectID, node_id: NodeID
) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from collections.abc import Awaitable, Callable

from fastapi import FastAPI
from servicelib.logging_utils import log_catch, log_context

_logger = logging.getLogger(__name__)


def _on_app_startup(_app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian setup fire and forget tasks.."
), log_catch(_logger, reraise=False):
_app.state.efs_guardian_fire_and_forget_tasks = set()

return _startup


def _on_app_shutdown(
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian fire and forget tasks shutdown.."
), log_catch(_logger, reraise=False):
assert _app # nosec
if _app.state.efs_guardian_fire_and_forget_tasks:
for task in _app.state.efs_guardian_fire_and_forget_tasks:
task.cancel()

return _stop


def setup(app: FastAPI) -> None:
app.add_event_handler("startup", _on_app_startup(app))
app.add_event_handler("shutdown", _on_app_shutdown(app))
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ async def on_startup() -> None:
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
client_name="efs_guardian_rpc_server", settings=settings
)
app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create(
client_name="efs_guardian_rpc_client", settings=settings
)

async def on_shutdown() -> None:
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()
if app.state.rabbitmq_rpc_server:
await app.state.rabbitmq_rpc_server.close()
if app.state.rabbitmq_rpc_client:
await app.state.rabbitmq_rpc_client.close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Expand All @@ -53,4 +58,9 @@ def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)


def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient:
assert app.state.rabbitmq_rpc_client # nosec
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client)


__all__ = ("RabbitMQClient",)
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging

from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage
from models_library.rabbitmq_messages import DynamicServiceRunningMessage
from pydantic import parse_raw_as
from servicelib.logging_utils import log_context
from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client
from servicelib.rabbitmq import RabbitMQRPCClient
from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar.disk_usage import (
update_disk_usage,
)
from servicelib.utils import fire_and_forget_task

from ..core.settings import get_application_settings
from ..services.efs_manager import EfsManager
from ..services.modules.rabbitmq import get_rabbitmq_rpc_client
from ..services.modules.redis import get_redis_lock_client

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,6 +57,23 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) ->
rabbit_message.user_id,
)

project_node_state_names = await efs_manager.list_project_node_state_names(
rabbit_message.project_id, node_id=rabbit_message.node_id
)
rpc_client: RabbitMQRPCClient = get_rabbitmq_rpc_client(app)
_used = min(size, settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES)
usage: dict[str, DiskUsage] = {}
for name in project_node_state_names:
usage[name] = DiskUsage.from_efs_guardian(
used=_used, total=settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES
)

fire_and_forget_task(
update_disk_usage(rpc_client, node_id=rabbit_message.node_id, usage=usage),
task_suffix_name=f"update_disk_usage_efs_user_id{rabbit_message.user_id}_node_id{rabbit_message.node_id}",
fire_and_forget_tasks_collection=app.state.efs_guardian_fire_and_forget_tasks,
)

if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES:
msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}"
with log_context(_logger, logging.WARNING, msg=msg):
Expand Down
1 change: 1 addition & 0 deletions services/efs-guardian/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"pytest_simcore.environment_configs",
"pytest_simcore.faker_projects_data",
"pytest_simcore.faker_users_data",
"pytest_simcore.faker_products_data",
"pytest_simcore.faker_projects_data",
"pytest_simcore.pydantic_models",
"pytest_simcore.pytest_global_environs",
Expand Down
10 changes: 10 additions & 0 deletions services/efs-guardian/tests/unit/test_efs_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ async def test_remove_write_access_rights(
is False
)

with pytest.raises(FileNotFoundError):
await efs_manager.list_project_node_state_names(
project_id=project_id, node_id=node_id
)

with patch(
"simcore_service_efs_guardian.services.efs_manager.os.chown"
) as mocked_chown:
Expand All @@ -108,6 +113,11 @@ async def test_remove_write_access_rights(
is True
)

project_node_state_names = await efs_manager.list_project_node_state_names(
project_id=project_id, node_id=node_id
)
assert project_node_state_names == [_storage_directory_name]

size_before = await efs_manager.get_project_node_data_size(
project_id=project_id, node_id=node_id
)
Expand Down
111 changes: 111 additions & 0 deletions services/efs-guardian/tests/unit/test_process_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
# pylint: disable=unused-variable


from unittest.mock import AsyncMock, patch

import pytest
from faker import Faker
from fastapi import FastAPI
from models_library.products import ProductName
from models_library.rabbitmq_messages import DynamicServiceRunningMessage
from models_library.users import UserID
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from simcore_service_efs_guardian.services.efs_manager import NodeID, ProjectID
from simcore_service_efs_guardian.services.process_messages import (
process_dynamic_service_running_message,
)

pytest_simcore_core_services_selection = ["rabbit"]
pytest_simcore_ops_services_selection = []


@pytest.fixture
def app_environment(
monkeypatch: pytest.MonkeyPatch,
app_environment: EnvVarsDict,
rabbit_env_vars_dict: EnvVarsDict,
with_disabled_redis_and_background_tasks: None,
with_disabled_postgres: None,
) -> EnvVarsDict:
return setenvs_from_dict(
monkeypatch,
{
**app_environment,
**rabbit_env_vars_dict,
"EFS_DEFAULT_USER_SERVICE_SIZE_BYTES": "10000",
},
)


@patch("simcore_service_efs_guardian.services.process_messages.update_disk_usage")
async def test_process_msg(
mock_update_disk_usage,
faker: Faker,
app: FastAPI,
efs_cleanup: None,
project_id: ProjectID,
node_id: NodeID,
user_id: UserID,
product_name: ProductName,
):
# Create mock data for the message
model_instance = DynamicServiceRunningMessage(
project_id=project_id,
node_id=node_id,
user_id=user_id,
product_name=product_name,
)
json_str = model_instance.json()
model_bytes = json_str.encode("utf-8")

_expected_project_node_states = [".data_assets", "home_user_workspace"]
# Mock efs_manager and its methods
mock_efs_manager = AsyncMock()
app.state.efs_manager = mock_efs_manager
mock_efs_manager.check_project_node_data_directory_exits.return_value = True
mock_efs_manager.get_project_node_data_size.return_value = 4000
mock_efs_manager.list_project_node_state_names.return_value = (
_expected_project_node_states
)

result = await process_dynamic_service_running_message(app, data=model_bytes)

# Check the actual arguments passed to notify_service_efs_disk_usage
_, kwargs = mock_update_disk_usage.call_args
assert kwargs["usage"]
assert len(kwargs["usage"]) == 2
for key, value in kwargs["usage"].items():
assert key in _expected_project_node_states
assert value.used == 4000
assert value.free == 6000
assert value.total == 10000
assert value.used_percent == 40.0

assert result is True


async def test_process_msg__dir_not_exists(
app: FastAPI,
efs_cleanup: None,
project_id: ProjectID,
node_id: NodeID,
user_id: UserID,
product_name: ProductName,
):
# Create mock data for the message
model_instance = DynamicServiceRunningMessage(
project_id=project_id,
node_id=node_id,
user_id=user_id,
product_name=product_name,
)
json_str = model_instance.json()
model_bytes = json_str.encode("utf-8")

result = await process_dynamic_service_running_message(app, data=model_bytes)
assert result is True

0 comments on commit c0b276a

Please sign in to comment.