From 29c9a1335905fd2f33e29f1468f3a4a85a589273 Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Wed, 21 Aug 2024 16:17:39 +0300 Subject: [PATCH] Use central entities table for EEA logic (#4229) This replaces the logic from the EEA from the convoluted per-entity column to use a single reference from the central entitites table. Closes: https://github.com/stacklok/minder/issues/4168 Signed-off-by: Juan Antonio Osorio --- .../000099_eea_entity_instance_idx.down.sql | 22 ++++++ .../000099_eea_entity_instance_idx.up.sql | 22 ++++++ database/mock/store.go | 2 +- database/query/entity_execution_lock.sql | 24 ++----- internal/db/entity_execution_lock.sql.go | 70 ++++--------------- internal/db/querier.go | 2 +- internal/eea/eea.go | 12 ++-- internal/engine/eval_status.go | 5 ++ internal/engine/executor.go | 24 ++++--- internal/engine/executor_test.go | 17 ++--- internal/engine/interfaces/interface.go | 1 + 11 files changed, 94 insertions(+), 107 deletions(-) create mode 100644 database/migrations/000099_eea_entity_instance_idx.down.sql create mode 100644 database/migrations/000099_eea_entity_instance_idx.up.sql diff --git a/database/migrations/000099_eea_entity_instance_idx.down.sql b/database/migrations/000099_eea_entity_instance_idx.down.sql new file mode 100644 index 0000000000..384bbb31ce --- /dev/null +++ b/database/migrations/000099_eea_entity_instance_idx.down.sql @@ -0,0 +1,22 @@ +-- Copyright 2024 Stacklok, Inc +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +BEGIN; + +-- Drop the unique index on entity_instance_id from the entity_execution_lock and flush_cache tables + +DROP INDEX entity_execution_lock_entity_instance_idx ON entity_execution_lock; +DROP INDEX flush_cache_entity_instance_idx ON flush_cache; + +COMMIT; diff --git a/database/migrations/000099_eea_entity_instance_idx.up.sql b/database/migrations/000099_eea_entity_instance_idx.up.sql new file mode 100644 index 0000000000..94971ece54 --- /dev/null +++ b/database/migrations/000099_eea_entity_instance_idx.up.sql @@ -0,0 +1,22 @@ +-- Copyright 2024 Stacklok, Inc +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +BEGIN; + +-- Add entity_instance_id as a unique index to the entity_execution_lock and flush_cache tables + +CREATE UNIQUE INDEX entity_execution_lock_entity_instance_idx ON entity_execution_lock (entity_instance_id); +CREATE UNIQUE INDEX flush_cache_entity_instance_idx ON flush_cache (entity_instance_id); + +COMMIT; \ No newline at end of file diff --git a/database/mock/store.go b/database/mock/store.go index 5614bcef13..3ada5e0b85 100644 --- a/database/mock/store.go +++ b/database/mock/store.go @@ -702,7 +702,7 @@ func (mr *MockStoreMockRecorder) FindProviders(arg0, arg1 any) *gomock.Call { } // FlushCache mocks base method. -func (m *MockStore) FlushCache(arg0 context.Context, arg1 db.FlushCacheParams) (db.FlushCache, error) { +func (m *MockStore) FlushCache(arg0 context.Context, arg1 uuid.UUID) (db.FlushCache, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FlushCache", arg0, arg1) ret0, _ := ret[0].(db.FlushCache) diff --git a/database/query/entity_execution_lock.sql b/database/query/entity_execution_lock.sql index 33e3066f69..787e97c65d 100644 --- a/database/query/entity_execution_lock.sql +++ b/database/query/entity_execution_lock.sql @@ -23,11 +23,10 @@ INSERT INTO entity_execution_lock( sqlc.narg(pull_request_id)::UUID, sqlc.arg(project_id)::UUID, sqlc.arg(entity_instance_id)::UUID -) ON CONFLICT(entity, COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) +) ON CONFLICT(entity_instance_id) DO UPDATE SET locked_by = gen_random_uuid(), - last_lock_time = NOW(), - entity_instance_id = sqlc.arg(entity_instance_id)::UUID + last_lock_time = NOW() WHERE entity_execution_lock.last_lock_time < (NOW() - (@interval::TEXT || ' seconds')::interval) RETURNING *; @@ -37,19 +36,11 @@ RETURNING *; -- name: ReleaseLock :exec DELETE FROM entity_execution_lock -WHERE entity = sqlc.arg(entity)::entities AND - COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(repository_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - locked_by = sqlc.arg(locked_by)::UUID; +WHERE entity_instance_id = sqlc.arg(entity_instance_id) AND locked_by = sqlc.arg(locked_by)::UUID; -- name: UpdateLease :exec UPDATE entity_execution_lock SET last_lock_time = NOW() -WHERE entity = $1 AND -COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(repository_id), '00000000-0000-0000-0000-000000000000'::UUID) AND -COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND -COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND -locked_by = sqlc.arg(locked_by)::UUID; +WHERE entity_instance_id = $1 AND locked_by = sqlc.arg(locked_by)::UUID; -- name: EnqueueFlush :one INSERT INTO flush_cache( @@ -66,16 +57,13 @@ INSERT INTO flush_cache( sqlc.narg(pull_request_id)::UUID, sqlc.arg(project_id)::UUID, sqlc.arg(entity_instance_id)::UUID -) ON CONFLICT(entity, COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) +) ON CONFLICT(entity_instance_id) DO NOTHING RETURNING *; -- name: FlushCache :one DELETE FROM flush_cache -WHERE entity = $1 AND - COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(repository_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) +WHERE entity_instance_id= $1 RETURNING *; -- name: ListFlushCache :many diff --git a/internal/db/entity_execution_lock.sql.go b/internal/db/entity_execution_lock.sql.go index 144cec671c..7d1da1f9af 100644 --- a/internal/db/entity_execution_lock.sql.go +++ b/internal/db/entity_execution_lock.sql.go @@ -26,7 +26,7 @@ INSERT INTO flush_cache( $4::UUID, $5::UUID, $6::UUID -) ON CONFLICT(entity, COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) +) ON CONFLICT(entity_instance_id) DO NOTHING RETURNING id, entity, repository_id, artifact_id, pull_request_id, queued_at, project_id, entity_instance_id ` @@ -65,27 +65,12 @@ func (q *Queries) EnqueueFlush(ctx context.Context, arg EnqueueFlushParams) (Flu const flushCache = `-- name: FlushCache :one DELETE FROM flush_cache -WHERE entity = $1 AND - COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($2::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($3::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($4::UUID, '00000000-0000-0000-0000-000000000000'::UUID) +WHERE entity_instance_id= $1 RETURNING id, entity, repository_id, artifact_id, pull_request_id, queued_at, project_id, entity_instance_id ` -type FlushCacheParams struct { - Entity Entities `json:"entity"` - RepositoryID uuid.NullUUID `json:"repository_id"` - ArtifactID uuid.NullUUID `json:"artifact_id"` - PullRequestID uuid.NullUUID `json:"pull_request_id"` -} - -func (q *Queries) FlushCache(ctx context.Context, arg FlushCacheParams) (FlushCache, error) { - row := q.db.QueryRowContext(ctx, flushCache, - arg.Entity, - arg.RepositoryID, - arg.ArtifactID, - arg.PullRequestID, - ) +func (q *Queries) FlushCache(ctx context.Context, entityInstanceID uuid.UUID) (FlushCache, error) { + row := q.db.QueryRowContext(ctx, flushCache, entityInstanceID) var i FlushCache err := row.Scan( &i.ID, @@ -156,11 +141,10 @@ INSERT INTO entity_execution_lock( $4::UUID, $5::UUID, $6::UUID -) ON CONFLICT(entity, COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) +) ON CONFLICT(entity_instance_id) DO UPDATE SET locked_by = gen_random_uuid(), - last_lock_time = NOW(), - entity_instance_id = $6::UUID + last_lock_time = NOW() WHERE entity_execution_lock.last_lock_time < (NOW() - ($7::TEXT || ' seconds')::interval) RETURNING id, entity, locked_by, last_lock_time, repository_id, artifact_id, pull_request_id, project_id, entity_instance_id ` @@ -208,59 +192,33 @@ func (q *Queries) LockIfThresholdNotExceeded(ctx context.Context, arg LockIfThre const releaseLock = `-- name: ReleaseLock :exec DELETE FROM entity_execution_lock -WHERE entity = $1::entities AND - COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($2::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($3::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($4::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND - locked_by = $5::UUID +WHERE entity_instance_id = $1 AND locked_by = $2::UUID ` type ReleaseLockParams struct { - Entity Entities `json:"entity"` - RepositoryID uuid.NullUUID `json:"repository_id"` - ArtifactID uuid.NullUUID `json:"artifact_id"` - PullRequestID uuid.NullUUID `json:"pull_request_id"` - LockedBy uuid.UUID `json:"locked_by"` + EntityInstanceID uuid.UUID `json:"entity_instance_id"` + LockedBy uuid.UUID `json:"locked_by"` } // ReleaseLock is used to release a lock on an entity. It will delete the // entity_execution_lock record if the lock is held by the given locked_by // value. func (q *Queries) ReleaseLock(ctx context.Context, arg ReleaseLockParams) error { - _, err := q.db.ExecContext(ctx, releaseLock, - arg.Entity, - arg.RepositoryID, - arg.ArtifactID, - arg.PullRequestID, - arg.LockedBy, - ) + _, err := q.db.ExecContext(ctx, releaseLock, arg.EntityInstanceID, arg.LockedBy) return err } const updateLease = `-- name: UpdateLease :exec UPDATE entity_execution_lock SET last_lock_time = NOW() -WHERE entity = $1 AND -COALESCE(repository_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($2, '00000000-0000-0000-0000-000000000000'::UUID) AND -COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($3::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND -COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE($4::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND -locked_by = $5::UUID +WHERE entity_instance_id = $1 AND locked_by = $2::UUID ` type UpdateLeaseParams struct { - Entity Entities `json:"entity"` - RepositoryID uuid.NullUUID `json:"repository_id"` - ArtifactID uuid.NullUUID `json:"artifact_id"` - PullRequestID uuid.NullUUID `json:"pull_request_id"` - LockedBy uuid.UUID `json:"locked_by"` + EntityInstanceID uuid.UUID `json:"entity_instance_id"` + LockedBy uuid.UUID `json:"locked_by"` } func (q *Queries) UpdateLease(ctx context.Context, arg UpdateLeaseParams) error { - _, err := q.db.ExecContext(ctx, updateLease, - arg.Entity, - arg.RepositoryID, - arg.ArtifactID, - arg.PullRequestID, - arg.LockedBy, - ) + _, err := q.db.ExecContext(ctx, updateLease, arg.EntityInstanceID, arg.LockedBy) return err } diff --git a/internal/db/querier.go b/internal/db/querier.go index 6c97cf9b35..ff07cad251 100644 --- a/internal/db/querier.go +++ b/internal/db/querier.go @@ -72,7 +72,7 @@ type Querier interface { // providers by it. It also optionally takes a name, in case we want to // filter by name as well. FindProviders(ctx context.Context, arg FindProvidersParams) ([]Provider, error) - FlushCache(ctx context.Context, arg FlushCacheParams) (FlushCache, error) + FlushCache(ctx context.Context, entityInstanceID uuid.UUID) (FlushCache, error) GetAccessTokenByEnrollmentNonce(ctx context.Context, arg GetAccessTokenByEnrollmentNonceParams) (ProviderAccessToken, error) GetAccessTokenByProjectID(ctx context.Context, arg GetAccessTokenByProjectIDParams) (ProviderAccessToken, error) GetAccessTokenByProvider(ctx context.Context, provider string) ([]ProviderAccessToken, error) diff --git a/internal/eea/eea.go b/internal/eea/eea.go index ac25820a3f..ea166947bf 100644 --- a/internal/eea/eea.go +++ b/internal/eea/eea.go @@ -199,7 +199,10 @@ func (e *EEA) FlushMessageHandler(msg *message.Message) error { return fmt.Errorf("error unmarshalling payload: %w", err) } - repoID, artifactID, pullRequestID := inf.GetEntityDBIDs() + eID, err := inf.GetID() + if err != nil { + return fmt.Errorf("error getting entity ID: %w", err) + } logger := zerolog.Ctx(ctx).With(). Str("component", "EEA"). @@ -211,12 +214,7 @@ func (e *EEA) FlushMessageHandler(msg *message.Message) error { logger.Debug().Msg("flushing event") - _, err = e.querier.FlushCache(ctx, db.FlushCacheParams{ - Entity: entities.EntityTypeToDB(inf.Type), - RepositoryID: repoID, - ArtifactID: artifactID, - PullRequestID: pullRequestID, - }) + _, err = e.querier.FlushCache(ctx, eID) // Nothing to do here. If we can't flush the cache, it means // that the event has already been executed. if err != nil && errors.Is(err, sql.ErrNoRows) { diff --git a/internal/engine/eval_status.go b/internal/engine/eval_status.go index ede33195b5..a0fd201854 100644 --- a/internal/engine/eval_status.go +++ b/internal/engine/eval_status.go @@ -39,11 +39,16 @@ func (e *executor) createEvalStatusParams( rule *models.RuleInstance, ) (*engif.EvalStatusParams, error) { repoID, artID, prID := inf.GetEntityDBIDs() + eID, err := inf.GetID() + if err != nil { + return nil, fmt.Errorf("Error getting ID from entity info wrapper") + } params := &engif.EvalStatusParams{ Rule: rule, Profile: profile, EntityType: entities.EntityTypeToDB(inf.Type), + EntityID: eID, RepoID: repoID, ArtifactID: artID, PullRequestID: prID, diff --git a/internal/engine/executor.go b/internal/engine/executor.go index 927db9b4fe..0001cd9e04 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -246,11 +246,8 @@ func (e *executor) updateLockLease( zerolog.Ctx(ctx).With().Str("execution_id", executionID.String()).Logger()) if err := e.querier.UpdateLease(ctx, db.UpdateLeaseParams{ - Entity: params.EntityType, - RepositoryID: params.RepoID, - ArtifactID: params.ArtifactID, - PullRequestID: params.PullRequestID, - LockedBy: executionID, + LockedBy: executionID, + EntityInstanceID: params.EntityID, }); err != nil { logger.Err(err).Msg("error updating lock lease") return @@ -264,10 +261,18 @@ func (e *executor) releaseLockAndFlush( inf *entities.EntityInfoWrapper, ) { repoID, artID, prID := inf.GetEntityDBIDs() + eID, err := inf.GetID() + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("error getting entity id") + return + } logger := zerolog.Ctx(ctx).Info(). Str("entity_type", inf.Type.ToString()). - Str("execution_id", inf.ExecutionID.String()) + Str("execution_id", inf.ExecutionID.String()). + Str("entity_id", eID.String()) + + // TODO: change these to entity_id if repoID.Valid { logger = logger.Str("repo_id", repoID.UUID.String()) } @@ -280,11 +285,8 @@ func (e *executor) releaseLockAndFlush( } if err := e.querier.ReleaseLock(ctx, db.ReleaseLockParams{ - Entity: entities.EntityTypeToDB(inf.Type), - RepositoryID: repoID, - ArtifactID: artID, - PullRequestID: prID, - LockedBy: *inf.ExecutionID, + EntityInstanceID: eID, + LockedBy: *inf.ExecutionID, }); err != nil { logger.Err(err).Msg("error updating lock lease") } diff --git a/internal/engine/executor_test.go b/internal/engine/executor_test.go index ca85d3bf32..2410f2bb9f 100644 --- a/internal/engine/executor_test.go +++ b/internal/engine/executor_test.go @@ -256,24 +256,15 @@ default allow = true`, // Mock update lease for lock mockStore.EXPECT(). UpdateLease(gomock.Any(), db.UpdateLeaseParams{ - Entity: db.EntitiesRepository, - RepositoryID: uuid.NullUUID{ - UUID: repositoryID, - Valid: true, - }, - ArtifactID: uuid.NullUUID{}, - PullRequestID: uuid.NullUUID{}, - LockedBy: executionID, + EntityInstanceID: repositoryID, + LockedBy: executionID, }).Return(nil) // Mock release lock mockStore.EXPECT(). ReleaseLock(gomock.Any(), db.ReleaseLockParams{ - Entity: db.EntitiesRepository, - RepositoryID: uuid.NullUUID{UUID: repositoryID, Valid: true}, - ArtifactID: uuid.NullUUID{}, - PullRequestID: uuid.NullUUID{}, - LockedBy: executionID, + EntityInstanceID: repositoryID, + LockedBy: executionID, }).Return(nil) // -- end expectations diff --git a/internal/engine/interfaces/interface.go b/internal/engine/interfaces/interface.go index f6102c588b..f8a1198d12 100644 --- a/internal/engine/interfaces/interface.go +++ b/internal/engine/interfaces/interface.go @@ -117,6 +117,7 @@ type EvalStatusParams struct { TaskRunID uuid.UUID BuildID uuid.UUID EntityType db.Entities + EntityID uuid.UUID EvalStatusFromDb *db.ListRuleEvaluationsByProfileIdRow evalErr error actionsOnOff map[ActionType]models.ActionOpt