From b810f0efaaf3d4ee2410e022f5c5f995391db945 Mon Sep 17 00:00:00 2001 From: Ryan Krattiger Date: Fri, 30 Sep 2022 13:54:42 -0500 Subject: [PATCH] WIP: PR Graduation using spackbot Changes migrated from PR #45 using the new workers and a slightly more general name for the long running process queue. TODO: Add a pruning task that is occasionally inserted after a copy task but before reindex --- .env-dummy | 6 ++ spackbot/handlers/mirrors.py | 50 ++++++++++ spackbot/helpers.py | 17 ++++ spackbot/workers.py | 171 ++++++++++++++++++++++++++++++++++- 4 files changed, 241 insertions(+), 3 deletions(-) create mode 100644 spackbot/handlers/mirrors.py diff --git a/.env-dummy b/.env-dummy index ad1afc3..d544a39 100644 --- a/.env-dummy +++ b/.env-dummy @@ -7,6 +7,12 @@ REDIS_HOST=rq-server # Optionally customize redis port REDIS_PORT=6379 +# Base url of pr binaries mirror +PR_BINARIES_MIRROR_BASE_URL=s3://spack-binaries-prs + +# Name of expected base branch (we react to PRs merged to this branch) +PR_BINARIES_BASE_BRANCH=develop + # Optionally customize the name of the task queue TASK_QUEUE_NAME=devtasks diff --git a/spackbot/handlers/mirrors.py b/spackbot/handlers/mirrors.py new file mode 100644 index 0000000..c2492a7 --- /dev/null +++ b/spackbot/handlers/mirrors.py @@ -0,0 +1,50 @@ +# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other +# Spack Project Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (Apache-2.0 OR MIT) + + +import spackbot.helpers as helpers +from spackbot.helpers import pr_expected_base, pr_mirror_base_url +from spackbot.workers import copy_pr_binaries, update_mirror_index, work_queue + +# If we don't provide a timeout, the default in RQ is 180 seconds +WORKER_JOB_TIMEOUT = 6 * 60 * 60 + +logger = helpers.getLogger(__name__) + + +async def graduate_pr_binaries(event, gh): + payload = event.data + + base_branch = payload["pull_request"]["base"]["ref"] + is_merged = payload["pull_request"]["merged"] + + if is_merged and base_branch == pr_expected_base: + pr_number = payload["number"] + pr_branch = payload["pull_request"]["head"]["ref"] + + shared_mirror_url = f"{pr_mirror_base_url}/shared_pr_mirror" + + logger.info( + f"PR {pr_number}/{pr_branch} merged to develop, graduating binaries" + ) + + ltask_q = work_queue.get_lqueue() + copy_job = ltask_q.enqueue( + copy_pr_binaries, + pr_number, + pr_branch, + shared_mirror_url, + job_timeout=WORKER_JOB_TIMEOUT, + ) + logger.info(f"Copy job queued: {copy_job.id}") + + # If the index job queue has a job queued already, there is no need to + # schedule another one + update_job = ltask_q.enqueue( + update_mirror_index, + shared_mirror_url, + job_timeout=WORKER_JOB_TIMEOUT, + ) + logger.info(f"Reindex job queued: {update_job.id}") diff --git a/spackbot/helpers.py b/spackbot/helpers.py index 37543fd..f3ea320 100644 --- a/spackbot/helpers.py +++ b/spackbot/helpers.py @@ -5,6 +5,7 @@ import aiohttp +import asyncio import contextlib import gidgethub import json @@ -39,6 +40,11 @@ # Bucket where pr binary mirrors live pr_mirror_bucket = "spack-binaries-prs" +pr_mirror_base_url = os.environ.get( + "PR_BINARIES_MIRROR_BASE_URL", "s3://spack-binaries-prs" +) +pr_expected_base = os.environ.get("PR_BINARIES_BASE_BRANCH", "develop") + # Aliases for spackbot so spackbot doesn't respond to himself aliases = ["spack-bot", "spackbot", "spack-bot-develop", botname] alias_regex = "(%s)" % "|".join(aliases) @@ -155,6 +161,17 @@ def run_command(control, cmd, ok_codes=None): return res.getvalue(), err.getvalue() +async def run_in_subprocess(cmd): + proc = await asyncio.create_subprocess_shell( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + + stdout, stderr = await proc.communicate() + + print(f"[{cmd_string!r} exited with {proc.returncode}]") + if stdout: + print(f"[stdout]\n{stdout.decode()}") + if stderr: + print(f"[stderr]\n{stderr.decode()}") async def found(coroutine): """ diff --git a/spackbot/workers.py b/spackbot/workers.py index 09a37b6..166278b 100644 --- a/spackbot/workers.py +++ b/spackbot/workers.py @@ -6,9 +6,12 @@ import os import urllib.parse -import aiohttp import boto3 +from datetime import datetime from gidgethub import aiohttp as gh_aiohttp +import tempfile +import zipfile + from sh.contrib import git import sh @@ -25,6 +28,7 @@ REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379")) TASK_QUEUE_NAME = os.environ.get("TASK_QUEUE_NAME", "tasks") +QUERY_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" # If we don't provide a timeout, the default in RQ is 180 seconds WORKER_JOB_TIMEOUT = int(os.environ.get("WORKER_JOB_TIMEOUT", "21600")) @@ -39,10 +43,14 @@ def __init__(self): self.redis_conn = Redis(host=REDIS_HOST, port=REDIS_PORT) # Name of queue workers use is defined in "workers/entrypoint.sh" self.task_q = Queue(name=TASK_QUEUE_NAME, connection=self.redis_conn) + self.ltask_q = Queue(name=TASK_QUEUE_NAME+"_ltask", connection=self.redis_conn) def get_queue(self): return self.task_q + def get_lqueue(self): + return self.ltask_q + work_queue = WorkQueue() @@ -103,7 +111,7 @@ async def run_pipeline_task(event): token = job.meta["token"] rebuild_everything = job.meta.get("rebuild_everything") - async with aiohttp.ClientSession() as session: + async with gh_aiohttp.ClientSession() as session: gh = gh_aiohttp.GitHubAPI(session, REQUESTER, oauth_token=token) # Early exit if not authenticated @@ -202,7 +210,7 @@ async def fix_style_task(event): job = get_current_job() token = job.meta["token"] - async with aiohttp.ClientSession() as session: + async with gh_aiohttp.ClientSession() as session: gh = gh_aiohttp.GitHubAPI(session, REQUESTER, oauth_token=token) pr_url = event.data["issue"]["pull_request"]["url"] @@ -328,3 +336,160 @@ async def fix_style_task(event): await gh.post( event.data["issue"]["comments_url"], {}, data={"body": message} ) + + +async def find_latest_pipeline(url, headers, session): + async with session.get(url, headers=headers) as response: + pipeline_objects = await response.json() + + latest_p_obj = None + + if pipeline_objects: + latest_p_obj = pipeline_objects[0] + latest_time = datetime.strptime(latest_p_obj["updated_at"], QUERY_TIME_FORMAT) + + for i in range(1, len(pipeline_objects)): + p_obj = pipeline_objects[i] + updated = datetime.strptime(p_obj["updated_at"], QUERY_TIME_FORMAT) + if updated > latest_time: + latest_time = updated + latest_p_obj = p_obj + + return latest_p_obj + + +async def retrieve_artifacts(url, headers, dl_folder, session): + save_path = os.path.join(dl_folder, "artifacts.zip") + + async with session.get(url, headers=headers) as response: + if not os.path.exists(dl_folder): + os.makedirs(dl_folder) + + with open(save_path, "wb") as fd: + async for chunk in response.content.iter_chunked(65536): + fd.write(chunk) + + zip_file = zipfile.ZipFile(save_path) + zip_file.extractall(dl_folder) + zip_file.close() + + os.remove(save_path) + + +async def download_spack_lock_files(url, headers, download_dir, session): + async with session.get(url, headers=headers) as response: + job_objects = await response.json() + + folder_list = [] + + if job_objects: + for job in job_objects: + artifacts_url = f"{helpers.gitlab_spack_project_url}/jobs/{job['id']}/artifacts" + dl_folder = os.path.join(download_dir, job["name"]) + + await retrieve_artifacts(artifacts_url, headers, dl_folder, session) + + for root, _, files in os.walk(dl_folder): + if "spack.lock" in files: + folder_list.append(root) + break + else: + print( + f"Error: unable to find spack.lock in download folder {dl_folder}" + ) + + return folder_list + +async def copy_pr_binaries(pr_number, pr_branch, shared_pr_mirror_url): + """Find the latest gitlab pipeline for the PR, get the spack.lock + for each child pipeline, and for each one, activate the environment + and issue the spack buildcache sync command to copy between the + per-pr mirror and the shared pr mirror. + """ + pipeline_ref = f"github/pr{pr_number}_{pr_branch}" + pr_mirror_url = f"{helpers.pr_mirror_base_url}/{pipeline_ref}" + pipelines_url = ( + f"{helpers.gitlab_spack_project_url}/pipelines?ref={pipeline_ref}&per_page=100" + ) + headers = {"PRIVATE-TOKEN": GITLAB_TOKEN} + + # Create single new session for gitlab requests + async with gh_aiohttp.ClientSession() as session: + latest_pipeline = await find_latest_pipeline(pipelines_url, headers, session) + + if not latest_pipeline: + print(f"Unable to find latest pipeline for {pipeline_ref}") + return + + print(f"found latest pipeline for {pipeline_ref}:") + print(latest_pipeline) + + p_id = latest_pipeline["id"] + + jobs_url = f"{helpers.gitlab_spack_project_url}/pipelines/{p_id}/jobs" + + with tempfile.TemporaryDirectory() as tmp_dir_path: + print(f"Downloading spack.lock files under: {tmp_dir_path}") + folders = await download_spack_lock_files( + jobs_url, headers, tmp_dir_path, session + ) + + for env_dir in folders: + print( + f"Copying binaries from {pr_mirror_url} to {shared_pr_mirror_url}" + ) + print(f" using spack environment: {env_dir}") + + await helpers.run_in_subprocess([ + "spack", + "-e", + env_dir, + "-d", + "buildcache", + "sync", + "--src-mirror-url", + pr_mirror_url, + "--dest-mirror-url", + shared_pr_mirror_url, + ]) + + # Clean up the per-pr mirror + print(f"Deleting mirror: {pr_mirror_url}") + + await helpers.run_in_subprocess([ + "spack", + "mirror" + "destroy", + "--mirror-url", + pr_mirror_url + ]) + +def rq_has_reindex(): + ltask_q = work_queue.get_lqueue() + for job in ltask_q.jobs: + if "update_mirror_index" in job.func_name: + return True + return False + +async def update_mirror_index(mirror_url): + """Use spack buildcache command to update index on remote mirror""" + + # Check the queue for more reindex jobs, if there are none, + # run reindex on the graduated PR mirror. + if not rq_has_reindex(): + print(f"Updating binary index at {mirror_url}") + await helpers.run_in_subprocess([ + "spack", + "-d", + "buildcache", + "update-index", + "--mirror-url", + f"'{mirror_url}'", + ]) + +async def prune_mirror_duplicates(mirror_url, duplicates_mirror_url): + with mirror_url.get_specs() as specs: + for spec in specs: + if spec in duplicates_mirror_url: + mirror_url.remove_spec(spec) +