Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob][Feature] add light weight job submitter in kuberay image #2587

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/e2e-tests-ray-job-submitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: e2e-ray-job-submitter

on:
pull_request:
branches:
- master
- 'release-*'
push:
branches:
- master
- 'release-*'

concurrency:
group: ${{ github.head_ref }}-${{ github.workflow }}
cancel-in-progress: true

jobs:
ray-job-submitter:
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
ray-version: [ '2.39.0' ]
go-version: [ '1.22.0' ]
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
submodules: recursive

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.x'

- name: Install Ray
run: pip install ray[default]==${{ matrix.ray-version }}

- name: Run e2e tests
run: |
cd ray-operator
go test -timeout 30m -v ./test/e2erayjobsubmitter
87 changes: 87 additions & 0 deletions .github/workflows/image-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,90 @@ jobs:
ref: 'refs/tags/ray-operator/${{ github.event.inputs.tag }}',
sha: '${{ github.event.inputs.commit }}'
})

release_submitter_image:
env:
working-directory: ./ray-operator
name: Release Submitter Docker Images
runs-on: ubuntu-latest
steps:

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: v1.22

- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
ref: ${{ github.event.inputs.commit }}

- name: Get revision SHA
id: vars
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"

- name: Get dependencies
run: go mod download
working-directory: ${{env.working-directory}}

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.x'

- name: Install Ray
run: pip install ray[default]==2.39.0

- name: Test
run: make test-e2erayjobsubmitter
working-directory: ${{env.working-directory}}

- name: Set up Docker
uses: docker-practice/actions-setup-docker@master

- name: Build Docker Image - Submitter
run: |
IMG=kuberay/submitter:${{ steps.vars.outputs.sha_short }} make docker-submitter-image
working-directory: ${{env.working-directory}}

- name: Log in to Quay.io
uses: docker/login-action@v2
with:
registry: quay.io
username: ${{ secrets.QUAY_USERNAME }}
password: ${{ secrets.QUAY_ROBOT_TOKEN }}

# Build submitter inside the gh runner vm directly and then copy the go binaries to docker images using the Dockerfile.submitter.buildx
- name: Build linux/amd64 submitter go binary
env:
CGO_ENABLED: 0
GOOS: linux
GOARCH: amd64
run: |
CGO_ENABLED=$CGO_ENABLED GOOS=$GOOS GOARCH=$GOARCH go build -tags strictfipsruntime -a -o submitter-$GOARCH ./rayjobsubmitter/cmd/main.go
working-directory: ${{env.working-directory}}

- name: Build linux/arm64 submitter binary
env:
CGO_ENABLED: 0
GOOS: linux
GOARCH: arm64
run: |
CGO_ENABLED=$CGO_ENABLED GOOS=$GOOS GOARCH=$GOARCH go build -tags strictfipsruntime -a -o submitter-$GOARCH ./rayjobsubmitter/cmd/main.go
working-directory: ${{env.working-directory}}

- name: Build MultiArch Image
uses: docker/build-push-action@v5
env:
PUSH: true
REPO_ORG: kuberay
REPO_NAME: submitter
with:
platforms: linux/amd64,linux/arm64
context: ${{env.working-directory}}
file: ${{env.working-directory}}/Dockerfile.submitter.buildx
push: ${{env.PUSH}}
provenance: false
tags: |
quay.io/${{env.REPO_ORG}}/${{env.REPO_NAME}}:${{ steps.vars.outputs.sha_short }}
quay.io/${{env.REPO_ORG}}/${{env.REPO_NAME}}:${{ github.event.inputs.tag }}
11 changes: 10 additions & 1 deletion ray-operator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.22.4-bullseye as builder
FROM golang:1.22.4-bullseye AS builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand All @@ -15,10 +15,19 @@ COPY apis/ apis/
COPY controllers/ controllers/
COPY pkg/features pkg/features
COPY pkg/utils pkg/utils
COPY rayjobsubmitter/ rayjobsubmitter/

# Build
USER root
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go
RUN CGO_ENABLED=0 GOOS=linux go build -tags strictfipsruntime -a -o submitter ./rayjobsubmitter/cmd/main.go

FROM scratch AS submitter
WORKDIR /
COPY --from=builder /workspace/submitter .
USER 65532:65532

ENTRYPOINT ["/submitter"]

FROM gcr.io/distroless/base-debian12:nonroot
WORKDIR /
Expand Down
7 changes: 7 additions & 0 deletions ray-operator/Dockerfile.submitter.buildx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM scratch
ARG TARGETARCH
WORKDIR /
COPY ./submitter-${TARGETARCH} ./submitter
USER 65532:65532

ENTRYPOINT ["/submitter"]
7 changes: 7 additions & 0 deletions ray-operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ test-sampleyaml: WHAT ?= ./test/sampleyaml
test-sampleyaml: manifests fmt vet
go test -timeout 30m -v $(WHAT)

test-e2erayjobsubmitter: WHAT ?= ./test/e2erayjobsubmitter
test-e2erayjobsubmitter: fmt vet
go test -timeout 30m -v $(WHAT)

sync: helm api-docs
./hack/update-codegen.sh

Expand All @@ -100,6 +104,9 @@ build: fmt vet ## Build manager binary.
docker-image: ## Build image only
${ENGINE} build -t ${IMG} .

docker-submitter-image: ## Build image only
${ENGINE} build -t ${IMG} --target submitter .

docker-build: build docker-image ## Build image with the manager.

docker-push: ## Push image with the manager.
Expand Down
1 change: 1 addition & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions ray-operator/rayjobsubmitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Ray Job Submitter

This is a Go Ray Job Submitter for KubeRay to submit a Ray Job
and tail its logs without installing Ray which is very large.

Note that this tool is designed specifically for KubeRay and
will not support some `ray job submit` features that people
don't use with KubeRay, for example, uploading local files to
a Ray cluster will not be supported by this tool.

## Testing

Tests are located at [../test/e2erayjobsubmitter](../test/e2erayjobsubmitter).

As the e2e suggests, you need to have `ray` installed for these tests
because they need to start a real Ray Head. You can run the tests with:

```sh
make test-e2erayjobsubmitter
```
or GitHub Action: [../../.github/workflows/e2e-tests-ray-job-submitter.yaml](../../.github/workflows/e2e-tests-ray-job-submitter.yaml)
61 changes: 61 additions & 0 deletions ray-operator/rayjobsubmitter/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"encoding/json"
"os"
"strings"

flag "github.com/spf13/pflag"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/rayjobsubmitter"
)

func main() {
var (
runtimeEnvJson string
metadataJson string
entrypointResources string
entrypointNumCpus float32
entrypointNumGpus float32
)

flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "")
flag.StringVar(&metadataJson, "metadata-json", "", "")
flag.StringVar(&entrypointResources, "entrypoint-resources", "", "")
flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "")
flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "")
flag.Parse()

address := os.Getenv("RAY_DASHBOARD_ADDRESS")
if address == "" {
panic("Missing RAY_DASHBOARD_ADDRESS")
}
submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID")
if submissionId == "" {
panic("Missing RAY_JOB_SUBMISSION_ID")
}

req := utils.RayJobRequest{
Entrypoint: strings.Join(flag.Args(), " "),
SubmissionId: submissionId,
NumCpus: entrypointNumCpus,
NumGpus: entrypointNumGpus,
}
if len(runtimeEnvJson) > 0 {
if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil {
panic(err)
}
}
if len(metadataJson) > 0 {
if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil {
panic(err)
}
}
if len(entrypointResources) > 0 {
if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil {
panic(err)
}
}
rayjobsubmitter.Submit(address, req, os.Stdout)
}
98 changes: 98 additions & 0 deletions ray-operator/rayjobsubmitter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rayjobsubmitter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/coder/websocket"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) {
rayJobJson, err := json.Marshal(request)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()

body, _ := io.ReadAll(resp.Body)

if strings.Contains(string(body), "Please use a different submission_id") {
return request.SubmissionId, nil
}

if resp.StatusCode < 200 || resp.StatusCode > 299 {
return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body))
}

return request.SubmissionId, nil
}

func jobSubmissionURL(address string) string {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required.
if err != nil {
panic(err)
}
return address
}

func logTailingURL(address, submissionId string) string {
address = strings.Replace(address, "http", "ws", 1)
address, err := url.JoinPath(address, submissionId, "/logs/tail")
if err != nil {
panic(err)
}
return address
}

func Submit(address string, req utils.RayJobRequest, out io.Writer) {
_, _ = fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address)

address = jobSubmissionURL(address)
submissionId, err := submitJobReq(address, req)
if err != nil {
panic(err)
}

_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId)
_, _ = fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n")

wsAddr := logTailingURL(address, submissionId)
c, _, err := websocket.Dial(context.Background(), wsAddr, nil)
if err != nil {
panic(err)
}
defer func() { _ = c.CloseNow() }()
for {
_, msg, err := c.Read(context.Background())
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId)
return
}
panic(err)
}
_, _ = out.Write(msg)
}
}
Loading
Loading