Skip to content

Commit

Permalink
WIP: PR Graduation using spackbot
Browse files Browse the repository at this point in the history
Changes migrated from PR spack#45 using the new workers
and a slightly more general name for the long running
process queue.

TODO: Add a pruning task that is occasionally inserted after a copy task but before reindex
  • Loading branch information
kwryankrattiger committed Oct 14, 2022
1 parent 7accb04 commit b810f0e
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .env-dummy
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ REDIS_HOST=rq-server
# Optionally customize redis port
REDIS_PORT=6379

# Base url of pr binaries mirror
PR_BINARIES_MIRROR_BASE_URL=s3://spack-binaries-prs

# Name of expected base branch (we react to PRs merged to this branch)
PR_BINARIES_BASE_BRANCH=develop

# Optionally customize the name of the task queue
TASK_QUEUE_NAME=devtasks

Expand Down
50 changes: 50 additions & 0 deletions spackbot/handlers/mirrors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)


import spackbot.helpers as helpers
from spackbot.helpers import pr_expected_base, pr_mirror_base_url
from spackbot.workers import copy_pr_binaries, update_mirror_index, work_queue

# If we don't provide a timeout, the default in RQ is 180 seconds
WORKER_JOB_TIMEOUT = 6 * 60 * 60

logger = helpers.getLogger(__name__)


async def graduate_pr_binaries(event, gh):
payload = event.data

base_branch = payload["pull_request"]["base"]["ref"]
is_merged = payload["pull_request"]["merged"]

if is_merged and base_branch == pr_expected_base:
pr_number = payload["number"]
pr_branch = payload["pull_request"]["head"]["ref"]

shared_mirror_url = f"{pr_mirror_base_url}/shared_pr_mirror"

logger.info(
f"PR {pr_number}/{pr_branch} merged to develop, graduating binaries"
)

ltask_q = work_queue.get_lqueue()
copy_job = ltask_q.enqueue(
copy_pr_binaries,
pr_number,
pr_branch,
shared_mirror_url,
job_timeout=WORKER_JOB_TIMEOUT,
)
logger.info(f"Copy job queued: {copy_job.id}")

# If the index job queue has a job queued already, there is no need to
# schedule another one
update_job = ltask_q.enqueue(
update_mirror_index,
shared_mirror_url,
job_timeout=WORKER_JOB_TIMEOUT,
)
logger.info(f"Reindex job queued: {update_job.id}")
17 changes: 17 additions & 0 deletions spackbot/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


import aiohttp
import asyncio
import contextlib
import gidgethub
import json
Expand Down Expand Up @@ -39,6 +40,11 @@
# Bucket where pr binary mirrors live
pr_mirror_bucket = "spack-binaries-prs"

pr_mirror_base_url = os.environ.get(
"PR_BINARIES_MIRROR_BASE_URL", "s3://spack-binaries-prs"
)
pr_expected_base = os.environ.get("PR_BINARIES_BASE_BRANCH", "develop")

# Aliases for spackbot so spackbot doesn't respond to himself
aliases = ["spack-bot", "spackbot", "spack-bot-develop", botname]
alias_regex = "(%s)" % "|".join(aliases)
Expand Down Expand Up @@ -155,6 +161,17 @@ def run_command(control, cmd, ok_codes=None):

return res.getvalue(), err.getvalue()

async def run_in_subprocess(cmd):
proc = await asyncio.create_subprocess_shell(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

stdout, stderr = await proc.communicate()

print(f"[{cmd_string!r} exited with {proc.returncode}]")
if stdout:
print(f"[stdout]\n{stdout.decode()}")
if stderr:
print(f"[stderr]\n{stderr.decode()}")

async def found(coroutine):
"""
Expand Down
171 changes: 168 additions & 3 deletions spackbot/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import os
import urllib.parse

import aiohttp
import boto3
from datetime import datetime
from gidgethub import aiohttp as gh_aiohttp
import tempfile
import zipfile

from sh.contrib import git
import sh

Expand All @@ -25,6 +28,7 @@
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379"))
TASK_QUEUE_NAME = os.environ.get("TASK_QUEUE_NAME", "tasks")
QUERY_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

# If we don't provide a timeout, the default in RQ is 180 seconds
WORKER_JOB_TIMEOUT = int(os.environ.get("WORKER_JOB_TIMEOUT", "21600"))
Expand All @@ -39,10 +43,14 @@ def __init__(self):
self.redis_conn = Redis(host=REDIS_HOST, port=REDIS_PORT)
# Name of queue workers use is defined in "workers/entrypoint.sh"
self.task_q = Queue(name=TASK_QUEUE_NAME, connection=self.redis_conn)
self.ltask_q = Queue(name=TASK_QUEUE_NAME+"_ltask", connection=self.redis_conn)

def get_queue(self):
return self.task_q

def get_lqueue(self):
return self.ltask_q


work_queue = WorkQueue()

Expand Down Expand Up @@ -103,7 +111,7 @@ async def run_pipeline_task(event):
token = job.meta["token"]
rebuild_everything = job.meta.get("rebuild_everything")

async with aiohttp.ClientSession() as session:
async with gh_aiohttp.ClientSession() as session:
gh = gh_aiohttp.GitHubAPI(session, REQUESTER, oauth_token=token)

# Early exit if not authenticated
Expand Down Expand Up @@ -202,7 +210,7 @@ async def fix_style_task(event):
job = get_current_job()
token = job.meta["token"]

async with aiohttp.ClientSession() as session:
async with gh_aiohttp.ClientSession() as session:
gh = gh_aiohttp.GitHubAPI(session, REQUESTER, oauth_token=token)

pr_url = event.data["issue"]["pull_request"]["url"]
Expand Down Expand Up @@ -328,3 +336,160 @@ async def fix_style_task(event):
await gh.post(
event.data["issue"]["comments_url"], {}, data={"body": message}
)


async def find_latest_pipeline(url, headers, session):
async with session.get(url, headers=headers) as response:
pipeline_objects = await response.json()

latest_p_obj = None

if pipeline_objects:
latest_p_obj = pipeline_objects[0]
latest_time = datetime.strptime(latest_p_obj["updated_at"], QUERY_TIME_FORMAT)

for i in range(1, len(pipeline_objects)):
p_obj = pipeline_objects[i]
updated = datetime.strptime(p_obj["updated_at"], QUERY_TIME_FORMAT)
if updated > latest_time:
latest_time = updated
latest_p_obj = p_obj

return latest_p_obj


async def retrieve_artifacts(url, headers, dl_folder, session):
save_path = os.path.join(dl_folder, "artifacts.zip")

async with session.get(url, headers=headers) as response:
if not os.path.exists(dl_folder):
os.makedirs(dl_folder)

with open(save_path, "wb") as fd:
async for chunk in response.content.iter_chunked(65536):
fd.write(chunk)

zip_file = zipfile.ZipFile(save_path)
zip_file.extractall(dl_folder)
zip_file.close()

os.remove(save_path)


async def download_spack_lock_files(url, headers, download_dir, session):
async with session.get(url, headers=headers) as response:
job_objects = await response.json()

folder_list = []

if job_objects:
for job in job_objects:
artifacts_url = f"{helpers.gitlab_spack_project_url}/jobs/{job['id']}/artifacts"
dl_folder = os.path.join(download_dir, job["name"])

await retrieve_artifacts(artifacts_url, headers, dl_folder, session)

for root, _, files in os.walk(dl_folder):
if "spack.lock" in files:
folder_list.append(root)
break
else:
print(
f"Error: unable to find spack.lock in download folder {dl_folder}"
)

return folder_list

async def copy_pr_binaries(pr_number, pr_branch, shared_pr_mirror_url):
"""Find the latest gitlab pipeline for the PR, get the spack.lock
for each child pipeline, and for each one, activate the environment
and issue the spack buildcache sync command to copy between the
per-pr mirror and the shared pr mirror.
"""
pipeline_ref = f"github/pr{pr_number}_{pr_branch}"
pr_mirror_url = f"{helpers.pr_mirror_base_url}/{pipeline_ref}"
pipelines_url = (
f"{helpers.gitlab_spack_project_url}/pipelines?ref={pipeline_ref}&per_page=100"
)
headers = {"PRIVATE-TOKEN": GITLAB_TOKEN}

# Create single new session for gitlab requests
async with gh_aiohttp.ClientSession() as session:
latest_pipeline = await find_latest_pipeline(pipelines_url, headers, session)

if not latest_pipeline:
print(f"Unable to find latest pipeline for {pipeline_ref}")
return

print(f"found latest pipeline for {pipeline_ref}:")
print(latest_pipeline)

p_id = latest_pipeline["id"]

jobs_url = f"{helpers.gitlab_spack_project_url}/pipelines/{p_id}/jobs"

with tempfile.TemporaryDirectory() as tmp_dir_path:
print(f"Downloading spack.lock files under: {tmp_dir_path}")
folders = await download_spack_lock_files(
jobs_url, headers, tmp_dir_path, session
)

for env_dir in folders:
print(
f"Copying binaries from {pr_mirror_url} to {shared_pr_mirror_url}"
)
print(f" using spack environment: {env_dir}")

await helpers.run_in_subprocess([
"spack",
"-e",
env_dir,
"-d",
"buildcache",
"sync",
"--src-mirror-url",
pr_mirror_url,
"--dest-mirror-url",
shared_pr_mirror_url,
])

# Clean up the per-pr mirror
print(f"Deleting mirror: {pr_mirror_url}")

await helpers.run_in_subprocess([
"spack",
"mirror"
"destroy",
"--mirror-url",
pr_mirror_url
])

def rq_has_reindex():
ltask_q = work_queue.get_lqueue()
for job in ltask_q.jobs:
if "update_mirror_index" in job.func_name:
return True
return False

async def update_mirror_index(mirror_url):
"""Use spack buildcache command to update index on remote mirror"""

# Check the queue for more reindex jobs, if there are none,
# run reindex on the graduated PR mirror.
if not rq_has_reindex():
print(f"Updating binary index at {mirror_url}")
await helpers.run_in_subprocess([
"spack",
"-d",
"buildcache",
"update-index",
"--mirror-url",
f"'{mirror_url}'",
])

async def prune_mirror_duplicates(mirror_url, duplicates_mirror_url):
with mirror_url.get_specs() as specs:
for spec in specs:
if spec in duplicates_mirror_url:
mirror_url.remove_spec(spec)

0 comments on commit b810f0e

Please sign in to comment.