From 3d922a6e8b42c3ad4590c237f549cff793d7ff99 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 13 Jan 2026 18:09:10 -0700 Subject: [PATCH] chore: add reconcile hook for env progression dependencies --- .../environment_progression_action.go | 122 ++++++++++ .../pkg/workspace/store/release_targets.go | 27 +++ .../pkg/workspace/workspace.go | 26 +- ...ine_policy_environment_progression_test.go | 227 ++++++++++++++++++ 4 files changed, 385 insertions(+), 17 deletions(-) create mode 100644 apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environment_progression_action.go create mode 100644 apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environment_progression_action.go b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environment_progression_action.go new file mode 100644 index 000000000..718ac7e39 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environment_progression_action.go @@ -0,0 +1,122 @@ +package environmentprogression + +import ( + "context" + "fmt" + + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/selector" + "workspace-engine/pkg/workspace/releasemanager/action" + "workspace-engine/pkg/workspace/store" +) + +type ReconcileFn func(ctx context.Context, target *oapi.ReleaseTarget) error + +type EnvironmentProgressionAction struct { + store *store.Store + reconcileFn ReconcileFn +} + +func NewEnvironmentProgressionAction(store *store.Store, reconcileFn ReconcileFn) *EnvironmentProgressionAction { + return &EnvironmentProgressionAction{ + store: store, + reconcileFn: reconcileFn, + } +} + +func (a *EnvironmentProgressionAction) Name() string { + return "environmentprogression" +} + +func (a *EnvironmentProgressionAction) Execute(ctx context.Context, trigger action.ActionTrigger, actx action.ActionContext) error { + if trigger != action.TriggerJobSuccess { + return nil + } + + environment := a.getEnvironment(actx.Release.ReleaseTarget.EnvironmentId) + if environment == nil { + return nil + } + + progressionDependentPolicies, err := a.getProgressionDependentPolicies(ctx, environment) + if err != nil { + return fmt.Errorf("failed to get progression dependent policies: %w", err) + } + + if len(progressionDependentPolicies) == 0 { + return nil + } + + progressionDependentTargets, err := a.getProgressionDependentTargets(ctx, progressionDependentPolicies) + if err != nil { + return fmt.Errorf("failed to get progression dependent targets: %w", err) + } + + if len(progressionDependentTargets) == 0 { + return nil + } + + return a.reconcileTargets(ctx, progressionDependentTargets) +} + +func (a *EnvironmentProgressionAction) getEnvironment(envId string) *oapi.Environment { + env, ok := a.store.Environments.Get(envId) + if !ok { + return nil + } + return env +} + +func (a *EnvironmentProgressionAction) getProgressionDependentPolicies(ctx context.Context, environment *oapi.Environment) ([]*oapi.Policy, error) { + policies := make([]*oapi.Policy, 0) + for _, policy := range a.store.Policies.Items() { + for _, rule := range policy.Rules { + if rule.EnvironmentProgression == nil { + continue + } + + dependsOnSelector := rule.EnvironmentProgression.DependsOnEnvironmentSelector + + matched, err := selector.Match(ctx, &dependsOnSelector, *environment) + if err != nil { + return nil, fmt.Errorf("failed to match selector: %w", err) + } + + if matched { + policies = append(policies, policy) + } + + break + } + } + + return policies, nil +} + +func (a *EnvironmentProgressionAction) getProgressionDependentTargets(ctx context.Context, policies []*oapi.Policy) ([]*oapi.ReleaseTarget, error) { + targetMap := make(map[string]*oapi.ReleaseTarget) + for _, policy := range policies { + policyTargets, err := a.store.ReleaseTargets.GetForPolicy(ctx, policy) + if err != nil { + return nil, fmt.Errorf("failed to get release targets for policy %s: %w", policy.Id, err) + } + for _, target := range policyTargets { + targetMap[target.Key()] = target + } + } + + targetList := make([]*oapi.ReleaseTarget, 0, len(targetMap)) + for _, target := range targetMap { + targetList = append(targetList, target) + } + return targetList, nil +} + +func (a *EnvironmentProgressionAction) reconcileTargets(ctx context.Context, targets []*oapi.ReleaseTarget) error { + for _, target := range targets { + if err := a.reconcileFn(ctx, target); err != nil { + return fmt.Errorf("failed to reconcile target %s: %w", target.Key(), err) + } + } + return nil +} diff --git a/apps/workspace-engine/pkg/workspace/store/release_targets.go b/apps/workspace-engine/pkg/workspace/store/release_targets.go index 1d8173e87..029c01eab 100644 --- a/apps/workspace-engine/pkg/workspace/store/release_targets.go +++ b/apps/workspace-engine/pkg/workspace/store/release_targets.go @@ -186,6 +186,33 @@ func (r *ReleaseTargets) GetForSystem(ctx context.Context, systemId string) ([]* return releaseTargets, nil } +func (r *ReleaseTargets) GetForPolicy(ctx context.Context, policy *oapi.Policy) (map[string]*oapi.ReleaseTarget, error) { + targetMap := make(map[string]*oapi.ReleaseTarget) + + allReleaseTargets := r.releaseTargets.Items() + for _, releaseTarget := range allReleaseTargets { + environment, ok := r.store.Environments.Get(releaseTarget.EnvironmentId) + if !ok { + continue + } + deployment, ok := r.store.Deployments.Get(releaseTarget.DeploymentId) + if !ok { + continue + } + resource, ok := r.store.Resources.Get(releaseTarget.ResourceId) + if !ok { + continue + } + + isMatch := selector.MatchPolicy(ctx, policy, selector.NewResolvedReleaseTarget(environment, deployment, resource)) + if isMatch { + targetMap[releaseTarget.Key()] = releaseTarget + } + } + + return targetMap, nil +} + func (r *ReleaseTargets) RemoveForResource(ctx context.Context, resourceId string) { for _, releaseTarget := range r.GetForResource(ctx, resourceId) { if releaseTarget.ResourceId == resourceId { diff --git a/apps/workspace-engine/pkg/workspace/workspace.go b/apps/workspace-engine/pkg/workspace/workspace.go index d54b9d8eb..289655c92 100644 --- a/apps/workspace-engine/pkg/workspace/workspace.go +++ b/apps/workspace-engine/pkg/workspace/workspace.go @@ -10,6 +10,7 @@ import ( verificationaction "workspace-engine/pkg/workspace/releasemanager/action/verification" "workspace-engine/pkg/workspace/releasemanager/deployment/jobs" "workspace-engine/pkg/workspace/releasemanager/policy/evaluator/deploymentdependency" + "workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression" "workspace-engine/pkg/workspace/releasemanager/trace" "workspace-engine/pkg/workspace/releasemanager/trace/spanstore" "workspace-engine/pkg/workspace/store" @@ -33,25 +34,16 @@ func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace // Create release manager with trace store (will panic if nil) ws.releasemanager = releasemanager.New(s, ws.traceStore) + reconcileFn := func(ctx context.Context, target *oapi.ReleaseTarget) error { + return ws.releasemanager.ReconcileTarget(ctx, target, releasemanager.WithTrigger(trace.TriggerJobSuccess)) + } + ws.actionOrchestrator = action. NewOrchestrator(s). - RegisterAction( - verificationaction.NewVerificationAction( - ws.releasemanager.VerificationManager(), - ), - ).RegisterAction( - deploymentdependency.NewDeploymentDependencyAction( - s, - func(ctx context.Context, target *oapi.ReleaseTarget) error { - return ws.releasemanager.ReconcileTarget(ctx, target, releasemanager.WithTrigger(trace.TriggerJobSuccess)) - }, - ), - ).RegisterAction( - rollback.NewRollbackAction( - s, - jobs.NewDispatcher(s, ws.releasemanager.VerificationManager()), - ), - ) + RegisterAction(verificationaction.NewVerificationAction(ws.releasemanager.VerificationManager())). + RegisterAction(deploymentdependency.NewDeploymentDependencyAction(s, reconcileFn)). + RegisterAction(environmentprogression.NewEnvironmentProgressionAction(s, reconcileFn)). + RegisterAction(rollback.NewRollbackAction(s, jobs.NewDispatcher(s, ws.releasemanager.VerificationManager()))) return ws } diff --git a/apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go b/apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go new file mode 100644 index 000000000..5a0f33a92 --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go @@ -0,0 +1,227 @@ +package e2e + +import ( + "context" + "testing" + "time" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/test/integration" + c "workspace-engine/test/integration/creators" + + "github.com/stretchr/testify/assert" +) + +func TestEngine_PolicyEnvironmentProgression_TriggersGradualRollout(t *testing.T) { + jobAgentID := "job-agent-1" + deploymentID := "deployment-1" + qaEnvironmentID := "qa-environment" + prodEnvironmentID := "prod-environment" + resourceID := "resource-1" + policyID := "policy-1" + + minSuccessPercentage := float32(100) + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + ), + integration.WithSystem( + integration.WithDeployment( + integration.DeploymentID(deploymentID), + integration.DeploymentName("app"), + integration.DeploymentJobAgent(jobAgentID), + integration.DeploymentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(qaEnvironmentID), + integration.EnvironmentName("qa"), + integration.EnvironmentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(prodEnvironmentID), + integration.EnvironmentName("prod"), + integration.EnvironmentCelResourceSelector("true"), + ), + ), + integration.WithResource( + integration.ResourceID(resourceID), + ), + integration.WithPolicy( + integration.PolicyID(policyID), + integration.PolicyName("prod-depends-on-qa"), + integration.WithPolicyTargetSelector( + integration.PolicyTargetCelEnvironmentSelector("environment.name == 'prod'"), + integration.PolicyTargetCelDeploymentSelector("true"), + integration.PolicyTargetCelResourceSelector("true"), + ), + integration.WithPolicyRule( + integration.WithRuleEnvironmentProgression( + integration.EnvironmentProgressionDependsOnEnvironmentSelector("environment.name == 'qa'"), + integration.EnvironmentProgressionMinimumSuccessPercentage(minSuccessPercentage), + ), + ), + ), + ) + + ctx := context.Background() + + qaReleaseTarget := &oapi.ReleaseTarget{ + ResourceId: resourceID, + EnvironmentId: qaEnvironmentID, + DeploymentId: deploymentID, + } + prodReleaseTarget := &oapi.ReleaseTarget{ + ResourceId: resourceID, + EnvironmentId: prodEnvironmentID, + DeploymentId: deploymentID, + } + + version := c.NewDeploymentVersion() + version.DeploymentId = deploymentID + engine.PushEvent(ctx, handler.DeploymentVersionCreate, version) + + qaJobs := engine.Workspace().Jobs().GetJobsForReleaseTarget(qaReleaseTarget) + prodJobs := engine.Workspace().Jobs().GetJobsForReleaseTarget(prodReleaseTarget) + + assert.Equal(t, 1, len(qaJobs), "expected 1 qa job after version creation") + assert.Equal(t, 0, len(prodJobs), "expected 0 prod jobs before qa succeeds") + + var qaJob *oapi.Job + for _, j := range qaJobs { + qaJob = j + break + } + + qaJobCopy := *qaJob + qaJobCopy.Status = oapi.JobStatusSuccessful + completedAt := time.Now() + qaJobCopy.CompletedAt = &completedAt + jobUpdateEvent := &oapi.JobUpdateEvent{ + Id: &qaJobCopy.Id, + Job: qaJobCopy, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent) + + prodJobs = engine.Workspace().Jobs().GetJobsForReleaseTarget(prodReleaseTarget) + + assert.Equal(t, 1, len(prodJobs), "expected 1 prod job after qa succeeds (environment progression should trigger reconciliation)") +} + +func TestEngine_PolicyEnvironmentProgression_TriggersGradualRolloutStart(t *testing.T) { + jobAgentID := "job-agent-1" + deploymentID := "deployment-1" + qaEnvironmentID := "qa-environment" + prodEnvironmentID := "prod-environment" + resourceID := "resource-1" + envProgressionPolicyID := "env-progression-policy" + gradualRolloutPolicyID := "gradual-rollout-policy" + + minSuccessPercentage := float32(100) + timeScaleInterval := int32(60) + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + ), + integration.WithSystem( + integration.WithDeployment( + integration.DeploymentID(deploymentID), + integration.DeploymentName("app"), + integration.DeploymentJobAgent(jobAgentID), + integration.DeploymentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(qaEnvironmentID), + integration.EnvironmentName("qa"), + integration.EnvironmentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(prodEnvironmentID), + integration.EnvironmentName("prod"), + integration.EnvironmentCelResourceSelector("true"), + ), + ), + integration.WithResource( + integration.ResourceID(resourceID), + ), + integration.WithPolicy( + integration.PolicyID(envProgressionPolicyID), + integration.PolicyName("prod-depends-on-qa"), + integration.WithPolicyTargetSelector( + integration.PolicyTargetCelEnvironmentSelector("environment.name == 'prod'"), + integration.PolicyTargetCelDeploymentSelector("true"), + integration.PolicyTargetCelResourceSelector("true"), + ), + integration.WithPolicyRule( + integration.WithRuleEnvironmentProgression( + integration.EnvironmentProgressionDependsOnEnvironmentSelector("environment.name == 'qa'"), + integration.EnvironmentProgressionMinimumSuccessPercentage(minSuccessPercentage), + ), + ), + ), + integration.WithPolicy( + integration.PolicyID(gradualRolloutPolicyID), + integration.PolicyName("prod-gradual-rollout"), + integration.WithPolicyTargetSelector( + integration.PolicyTargetCelEnvironmentSelector("environment.name == 'prod'"), + integration.PolicyTargetCelDeploymentSelector("true"), + integration.PolicyTargetCelResourceSelector("true"), + ), + integration.WithPolicyRule( + integration.WithRuleGradualRollout(timeScaleInterval), + ), + ), + ) + + ctx := context.Background() + + qaReleaseTarget := &oapi.ReleaseTarget{ + ResourceId: resourceID, + EnvironmentId: qaEnvironmentID, + DeploymentId: deploymentID, + } + prodReleaseTarget := &oapi.ReleaseTarget{ + ResourceId: resourceID, + EnvironmentId: prodEnvironmentID, + DeploymentId: deploymentID, + } + + version := c.NewDeploymentVersion() + version.DeploymentId = deploymentID + engine.PushEvent(ctx, handler.DeploymentVersionCreate, version) + + qaJobs := engine.Workspace().Jobs().GetJobsForReleaseTarget(qaReleaseTarget) + prodJobs := engine.Workspace().Jobs().GetJobsForReleaseTarget(prodReleaseTarget) + + assert.Equal(t, 1, len(qaJobs), "expected 1 qa job after version creation") + assert.Equal(t, 0, len(prodJobs), "expected 0 prod jobs before qa succeeds (env progression blocking)") + + var qaJob *oapi.Job + for _, j := range qaJobs { + qaJob = j + break + } + + qaJobCopy := *qaJob + qaJobCopy.Status = oapi.JobStatusSuccessful + completedAt := time.Now() + qaJobCopy.CompletedAt = &completedAt + jobUpdateEvent := &oapi.JobUpdateEvent{ + Id: &qaJobCopy.Id, + Job: qaJobCopy, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent) + + prodJobs = engine.Workspace().Jobs().GetJobsForReleaseTarget(prodReleaseTarget) + + assert.Equal(t, 1, len(prodJobs), "expected 1 prod job after qa succeeds (gradual rollout should start)") +}