Skip to content

Commit

Permalink
add registration on first use when an enrollment token is used
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Sep 24, 2024
1 parent f78517c commit e5c4f8c
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 15 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/miolini/datacounter v1.0.3
github.com/oapi-codegen/runtime v1.1.1
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.15.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.19.0
github.com/rs/xid v1.5.0
Expand Down Expand Up @@ -70,7 +72,6 @@ require (
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/open-telemetry/opamp-go v0.15.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6U
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/open-telemetry/opamp-go v0.15.0 h1:X2TWhEsGQ8GP7Uos3Ic9v/1aFUqoECZXKS7xAF5HqsA=
github.com/open-telemetry/opamp-go v0.15.0/go.mod h1:QyPeN56JXlcZt5yG5RMdZ50Ju+zMFs1Ihy/hwHyF8Oo=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand All @@ -135,6 +137,7 @@ github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrB
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -291,8 +294,6 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
45 changes: 36 additions & 9 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/opamp"
Expand Down Expand Up @@ -63,14 +64,40 @@ func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at
handlerFn, contextWithConn, _ := ompampServer.Attach(opampserver.Settings{
Callbacks: opampserver.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
// NOTE: We don't have an agent ID at this stage so we can only check if the API key is valid.
// TODO enrollment
// 1. we should probably here detect if the oamp client use an enrollment token
// 2. create the .fleet-agent document
// 3. create an access token
// 4. return it to the agent with the connection settings flow https://opentelemetry.io/docs/specs/opamp/#opamp-connection-setting-offer-flow
var policyID string
var namespaces []string
agent, err := authAgent(request, nil, bulker, cache)
if err != nil {
if errors.Is(err, ErrAgentNotFound) { // No agent associated, get the enrollment token's associated policyID for a register on first use flow
enrollKey, err := authAPIKey(request, bulker, cache)
if err != nil {
zerolog.Ctx(request.Context()).Warn().Err(err).Msg("Opamp registration api key auth failed.")
return types.ConnectionResponse{
Accept: false,
HTTPStatusCode: http.StatusUnauthorized,
}
}
// TODO handle static enrollment tokens
key, ok := cache.GetEnrollmentAPIKey(enrollKey.ID)
if !ok {
rec, err := dl.FindEnrollmentAPIKey(request.Context(), bulker, dl.QueryEnrollmentAPIKeyByID, dl.FieldAPIKeyID, enrollKey.ID)
if err != nil {
return types.ConnectionResponse{
Accept: false,
HTTPStatusCode: http.StatusInternalServerError,
}
}
if !rec.Active {
return types.ConnectionResponse{
Accept: false,
HTTPStatusCode: http.StatusUnauthorized,
}
}
cache.SetEnrollmentAPIKey(enrollKey.ID, rec, int64(len(rec.APIKey)))
key = rec
}
policyID = key.PolicyID
namespaces = key.Namespaces
} else if err != nil {
zerolog.Ctx(request.Context()).Warn().Err(err).Msg("Opamp request api key auth failed.")
return types.ConnectionResponse{
Accept: false,
Expand All @@ -84,8 +111,8 @@ func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at
zerolog.Ctx(ctx).Info().Msg("Opamp connection started.")
},
OnMessageFunc: func(ctx context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
zerolog.Ctx(ctx).Info().Msg("Opamp message recieved.")
response, err := op.Process(ctx, agent, message)
zerolog.Ctx(ctx).Info().Msg("Opamp message received.")
response, err := op.Process(ctx, agent, policyID, namespaces, message)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Error processing opamp request.")
return &protobufs.ServerToAgent{
Expand Down
114 changes: 111 additions & 3 deletions internal/pkg/opamp/opamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,42 @@ package opamp
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/rs/zerolog"

"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
)

const (
healthy = "healthy"
unhealthy = "unhealthy"
)

const serverCapabilities uint64 = 0x00000001 | 0x00000002 | 0x00000004 // status, offers remote config, accepts effective config
const serverCapabilities uint64 = 0x00000001 | 0x00000002 | 0x00000004 | 0x00000020 // status, offers remote config, accepts effective config, offers connection settings

const kFleetAccessRolesJSON = `
{
"fleet-apikey-access": {
"cluster": [],
"applications": [{
"application": "fleet",
"privileges": ["no-privileges"],
"resources": ["*"]
}]
}
}
`

type opamp struct {
bulk bulk.Bulk
Expand All @@ -44,10 +61,26 @@ func NewHandler(bulk bulk.Bulk, cache cache.Cache, pm policy.Monitor) *opamp {
}
}

func (o *opamp) Process(ctx context.Context, agent *model.Agent, message *protobufs.AgentToServer) (*protobufs.ServerToAgent, error) {
// Process handles AgentToServer messages
// TODO use optional funcs to pass agent, policy id, namespaces, etc.
func (o *opamp) Process(ctx context.Context, agent *model.Agent, policyID string, namespaces []string, message *protobufs.AgentToServer) (*protobufs.ServerToAgent, error) {
if message.GetCapabilities()&0x00000001 == 0 { // ReportsStatus must be set on all agents
return nil, fmt.Errorf("ReportsStatus capability is unset.")

Check failure on line 68 in internal/pkg/opamp/opamp.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ST1005: error strings should not end with punctuation or newlines (stylecheck)
}

if agent == nil && policyID != "" {
return o.register(ctx, policyID, namespaces, message)
}
if agent.Id != string(message.InstanceUid) {
return nil, fmt.Errorf("API key's associated agent does not match InstanceUid")
}
return o.process(ctx, agent, message)
}

// process is a func that is similar to the api checkin path
// it will update health status (but not metadata yet) and dispatch new config
// configs are dispatched if the sent config has a lower revision number than the current policy, or if no config is sent if the agent doc has a lower revision number.
func (o *opamp) process(ctx context.Context, agent *model.Agent, message *protobufs.AgentToServer) (*protobufs.ServerToAgent, error) {
ts := time.Now().UTC()
tsStr := ts.Format(time.RFC3339)

Expand Down Expand Up @@ -75,7 +108,7 @@ func (o *opamp) Process(ctx context.Context, agent *model.Agent, message *protob
if err != nil {
return nil, fmt.Errorf("failed to marshal agent update: %w", err)
}
if err := o.bulk.Update(ctx, dl.FleetAgents, agent.Agent.ID, updateBody); err != nil {
if err := o.bulk.Update(ctx, dl.FleetAgents, agent.Id, updateBody); err != nil {
return nil, fmt.Errorf("failed to update agent doc: %w", err)
}

Expand Down Expand Up @@ -134,6 +167,10 @@ func (o *opamp) Process(ctx context.Context, agent *model.Agent, message *protob
}
}
data.Inputs = pp.Inputs

// FIXME: We should be sure to handle outputs seperately here

Check failure on line 171 in internal/pkg/opamp/opamp.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`seperately` is a misspelling of `separately` (misspell)
// At a minimum we need to set ConnectionSettingsOffers.opamp

body, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("unable to marshal policy: %w", err)
Expand All @@ -159,3 +196,74 @@ func (o *opamp) Process(ctx context.Context, agent *model.Agent, message *protob
Capabilities: serverCapabilities,
}, nil
}

func (o *opamp) register(ctx context.Context, policyID string, namespaces []string, message *protobufs.AgentToServer) (*protobufs.ServerToAgent, error) {
if message.GetCapabilities()&0x00000100 == 0 {
return nil, fmt.Errorf("AcceptsOpAMPConnectionSettings capability must be set in order for agent to register")
}
// NOTE: message.ConnectionSettingsRequest.Opamp is used for a CSR flow
// TODO: Should we also read message.Flags to see if we need to generate a new uid?
replaceID := false
uid := ulid.ULID(message.InstanceUid)
// TODO: our current enroll flow does something different on collisions
_, err := dl.FindAgent(ctx, o.bulk, dl.QueryAgentByID, dl.FieldID, uid.String())
if err == nil {
zerolog.Ctx(ctx).Debug().Msg("Agent registration has detected uid collision")
uid = ulid.Make() // TODO replace this with a better call?
replaceID = true
} else if errors.Is(err, dl.ErrNotFound) {
zerolog.Ctx(ctx).Trace().Msg("Agent registration no uid collision")
} else {
return nil, fmt.Errorf("unable to check for uid collision on registration")
}

key, err := o.bulk.APIKeyCreate(ctx, uid.String(), "", []byte(kFleetAccessRolesJSON), apikey.NewMetadata(uid.String(), "", apikey.TypeAccess))
if err != nil {
return nil, fmt.Errorf("registration failed to make ApiKey: %w", err)
}
// TODO need a way to split agent description into local metadata, tags, and version info
var localMeta json.RawMessage
if ad := message.GetAgentDescription(); ad != nil {
localMeta, err = json.Marshal(ad)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Unable to marshal agent description")
}
}
// TODO Invalidate key if func returns error after this
agent := model.Agent{
Active: true,
PolicyID: policyID,
Namespaces: namespaces,
Type: "opamp", // regular agents use PERMANENT, might be nice to distinguish
EnrolledAt: time.Now().UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessAPIKeyID: key.ID,
ActionSeqNo: []int64{sqn.UndefinedSeqNo},
Agent: &model.AgentMetadata{
ID: uid.String(),
// TODO version
},
// TODO tags
// TODO handle enrolmentId
}
body, err := json.Marshal(agent)
if err != nil {
return nil, fmt.Errorf("unable to marshal agent doc: %w", err)
}
if _, err := o.bulk.Create(ctx, dl.FleetAgents, uid.String(), body, bulk.WithRefresh()); err != nil {
return nil, fmt.Errorf("unable to index agent doc: %w", err)
}
// TODO: Set agent to inactive if error is returned below
o.cache.SetAPIKey(*key, true)

resp, err := o.process(ctx, &agent, message)
if err != nil {
return nil, err
}
if replaceID {
resp.AgentIdentification = &protobufs.AgentIdentification{
NewInstanceUid: uid.Bytes(),
}
}
return resp, nil
}

0 comments on commit e5c4f8c

Please sign in to comment.