Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/fluxcd/pkg/http/fetch v0.19.0
github.com/fluxcd/pkg/kustomize v1.22.0
github.com/fluxcd/pkg/runtime v0.86.0
github.com/fluxcd/pkg/ssa v0.57.0
github.com/fluxcd/pkg/ssa v0.58.0
github.com/fluxcd/pkg/tar v0.14.0
github.com/fluxcd/pkg/testserver v0.13.0
github.com/fluxcd/source-controller/api v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ github.com/fluxcd/pkg/runtime v0.86.0 h1:q7aBSerJwt0N9hpurPVElG+HWpVhZcs6t96bcNQ
github.com/fluxcd/pkg/runtime v0.86.0/go.mod h1:Wt9mUzQgMPQMu2D/wKl5pG4zh5vu/tfF5wq9pPobxOQ=
github.com/fluxcd/pkg/sourceignore v0.14.0 h1:ZiZzbXtXb/Qp7I7JCStsxOlX8ri8rWwCvmvIrJ0UzQQ=
github.com/fluxcd/pkg/sourceignore v0.14.0/go.mod h1:E3zKvyTyB+oQKqm/2I/jS6Rrt3B7fNuig/4bY2vi3bg=
github.com/fluxcd/pkg/ssa v0.57.0 h1:G2cKyeyOtEdOdLeMBWZe0XT+J0rBWSBzy9xln2myTaI=
github.com/fluxcd/pkg/ssa v0.57.0/go.mod h1:iN/QDMqdJaVXKkqwbXqGa4PyWQwtyIy2WkeM2+9kfXA=
github.com/fluxcd/pkg/ssa v0.58.0 h1:W7m2LQFsZxPN9nn3lfGVDwXsZnIgCWWJ/+/K5hpzW+k=
github.com/fluxcd/pkg/ssa v0.58.0/go.mod h1:iN/QDMqdJaVXKkqwbXqGa4PyWQwtyIy2WkeM2+9kfXA=
github.com/fluxcd/pkg/tar v0.14.0 h1:9Gku8FIvPt2bixKldZnzXJ/t+7SloxePlzyVGOK8GVQ=
github.com/fluxcd/pkg/tar v0.14.0/go.mod h1:+rOWYk93qLEJ8WwmkvJOkB8i0dna1mrwJFybE8i9Udo=
github.com/fluxcd/pkg/testserver v0.13.0 h1:xEpBcEYtD7bwvZ+i0ZmChxKkDo/wfQEV3xmnzVybSSg=
Expand Down
45 changes: 39 additions & 6 deletions internal/controller/kustomization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ type KustomizationReconciler struct {

// Feature gates

AdditiveCELDependencyCheck bool
AllowExternalArtifact bool
FailFast bool
GroupChangeLog bool
StrictSubstitutions bool
AdditiveCELDependencyCheck bool
AllowExternalArtifact bool
CancelHealthCheckOnNewRevision bool
FailFast bool
GroupChangeLog bool
StrictSubstitutions bool
}

func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
Expand Down Expand Up @@ -983,7 +984,39 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
}

// Check the health with a default timeout of 30sec shorter than the reconciliation interval.
if err := manager.WaitForSet(toCheck, ssa.WaitOptions{
healthCtx := ctx
if r.CancelHealthCheckOnNewRevision {
// Create a cancellable context for health checks that monitors for new revisions
var cancel context.CancelFunc
healthCtx, cancel = context.WithCancel(ctx)
defer cancel()

// Start monitoring for new revisions to allow early cancellation
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
// Get the latest source artifact
latestSrc, err := r.getSource(ctx, obj)
if err == nil && latestSrc.GetArtifact() != nil {
if newRevision := latestSrc.GetArtifact().Revision; newRevision != revision {
const msg = "New revision detected during health check, cancelling"
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
ctrl.LoggerFrom(ctx).Info(msg, "current", revision, "new", newRevision)
cancel()
return
}
}
}
}
}()
}
if err := manager.WaitForSetWithContext(healthCtx, toCheck, ssa.WaitOptions{
Interval: 5 * time.Second,
Timeout: obj.GetTimeout(),
FailFast: r.FailFast,
Expand Down
162 changes: 162 additions & 0 deletions internal/controller/kustomization_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,165 @@ func TestKustomizationReconciler_RESTMapper(t *testing.T) {
g.Expect(err).To(HaveOccurred())
})
}

func TestKustomizationReconciler_CancelHealthCheckOnNewRevision(t *testing.T) {
g := NewWithT(t)
id := "cancel-" + randStringRunes(5)
resultK := &kustomizev1.Kustomization{}
timeout := 60 * time.Second

reconciler.CancelHealthCheckOnNewRevision = true
t.Cleanup(func() { reconciler.CancelHealthCheckOnNewRevision = false })

err := createNamespace(id)
g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace")

err = createKubeConfigSecret(id)
g.Expect(err).NotTo(HaveOccurred(), "failed to create kubeconfig secret")

// Create initial successful manifests
successManifests := []testserver.File{
{
Name: "configmap.yaml",
Body: fmt.Sprintf(`apiVersion: v1
kind: ConfigMap
metadata:
name: test-config
namespace: %s
data:
foo: bar`, id),
},
}
artifact, err := testServer.ArtifactFromFiles(successManifests)
g.Expect(err).ToNot(HaveOccurred())

repositoryName := types.NamespacedName{
Name: fmt.Sprintf("cancel-%s", randStringRunes(5)),
Namespace: id,
}

err = applyGitRepository(repositoryName, artifact, "main/"+artifact)
g.Expect(err).NotTo(HaveOccurred())

kustomization := &kustomizev1.Kustomization{}
kustomization.Name = id
kustomization.Namespace = id
kustomization.Spec = kustomizev1.KustomizationSpec{
Interval: metav1.Duration{Duration: 10 * time.Minute},
Path: "./",
Wait: true,
Timeout: &metav1.Duration{Duration: 5 * time.Minute},
SourceRef: kustomizev1.CrossNamespaceSourceReference{
Name: repositoryName.Name,
Kind: sourcev1.GitRepositoryKind,
Namespace: id,
},
KubeConfig: &meta.KubeConfigReference{
SecretRef: &meta.SecretKeyReference{
Name: "kubeconfig",
},
},
}

err = k8sClient.Create(context.Background(), kustomization)
g.Expect(err).NotTo(HaveOccurred())

// Wait for initial reconciliation to succeed
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
return conditions.IsReady(resultK)
}, timeout, time.Second).Should(BeTrue())

// Create failing manifests (deployment with bad image that will timeout)
failingManifests := []testserver.File{
{
Name: "deployment.yaml",
Body: fmt.Sprintf(`apiVersion: apps/v1
kind: Deployment
metadata:
name: failing-deployment
namespace: %s
spec:
replicas: 1
selector:
matchLabels:
app: failing-app
template:
metadata:
labels:
app: failing-app
spec:
containers:
- name: app
image: nonexistent.registry/badimage:latest
ports:
- containerPort: 8080`, id),
},
}

// Apply failing revision
failingArtifact, err := testServer.ArtifactFromFiles(failingManifests)
g.Expect(err).ToNot(HaveOccurred())

err = applyGitRepository(repositoryName, failingArtifact, "main/"+failingArtifact)
g.Expect(err).NotTo(HaveOccurred())

// Wait for reconciliation to start on failing revision
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
return resultK.Status.LastAttemptedRevision == "main/"+failingArtifact
}, timeout, time.Second).Should(BeTrue())

// Now quickly apply a fixed revision while health check should be in progress
fixedManifests := []testserver.File{
{
Name: "deployment.yaml",
Body: fmt.Sprintf(`apiVersion: apps/v1
kind: Deployment
metadata:
name: working-deployment
namespace: %s
spec:
replicas: 1
selector:
matchLabels:
app: working-app
template:
metadata:
labels:
app: working-app
spec:
containers:
- name: app
image: nginx:latest
ports:
- containerPort: 80`, id),
},
}

fixedArtifact, err := testServer.ArtifactFromFiles(fixedManifests)
g.Expect(err).ToNot(HaveOccurred())

// Apply the fixed revision shortly after the failing one
time.Sleep(2 * time.Second) // Give some time for health check to start
err = applyGitRepository(repositoryName, fixedArtifact, "main/"+fixedArtifact)
g.Expect(err).NotTo(HaveOccurred())

// The key test: verify that the fixed revision gets attempted
// and that the health check cancellation worked
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
return resultK.Status.LastAttemptedRevision == "main/"+fixedArtifact
}, timeout, time.Second).Should(BeTrue())

// Check cancellation event was emitted
events := getEvents(resultK.GetName(), nil)
var found bool
for _, e := range events {
if e.Message == "New revision detected during health check, cancelling" {
found = true
break
}
}
g.Expect(found).To(BeTrue(), "did not find event for health check cancellation")
}
12 changes: 12 additions & 0 deletions internal/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ const (

// ExternalArtifact controls whether the ExternalArtifact source type is enabled.
ExternalArtifact = "ExternalArtifact"

// CancelHealthCheckOnNewRevision controls whether ongoing health checks
// should be cancelled when a new source revision becomes available.
//
// When enabled, if a new revision is detected while waiting for resources
// to become ready, the current health check will be cancelled to allow
// immediate processing of the new revision. This can help avoid getting
// stuck on failing deployments when fixes are available.
CancelHealthCheckOnNewRevision = "CancelHealthCheckOnNewRevision"
)

var features = map[string]bool{
Expand All @@ -83,6 +92,9 @@ var features = map[string]bool{
// ExternalArtifact
// opt-in from v1.7
ExternalArtifact: false,
// CancelHealthCheckOnNewRevision
// opt-in from v1.7
CancelHealthCheckOnNewRevision: false,
}

func init() {
Expand Down
53 changes: 30 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ func main() {
os.Exit(1)
}

cancelHealthCheckOnNewRevision, err := features.Enabled(features.CancelHealthCheckOnNewRevision)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.CancelHealthCheckOnNewRevision)
os.Exit(1)
}

var tokenCache *pkgcache.TokenCache
if tokenCacheOptions.MaxSize > 0 {
var err error
Expand All @@ -307,29 +313,30 @@ func main() {
}

if err = (&controller.KustomizationReconciler{
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
AllowExternalArtifact: allowExternalArtifact,
APIReader: mgr.GetAPIReader(),
ArtifactFetchRetries: httpRetry,
Client: mgr.GetClient(),
ClusterReader: clusterReader,
ConcurrentSSA: concurrentSSA,
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
DependencyRequeueInterval: requeueDependency,
DisallowedFieldManagers: disallowedFieldManagers,
EventRecorder: eventRecorder,
FailFast: failFast,
GroupChangeLog: groupChangeLog,
KubeConfigOpts: kubeConfigOpts,
Mapper: restMapper,
Metrics: metricsH,
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
NoRemoteBases: noRemoteBases,
SOPSAgeSecret: sopsAgeSecret,
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
StrictSubstitutions: strictSubstitutions,
TokenCache: tokenCache,
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
AllowExternalArtifact: allowExternalArtifact,
CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision,
APIReader: mgr.GetAPIReader(),
ArtifactFetchRetries: httpRetry,
Client: mgr.GetClient(),
ClusterReader: clusterReader,
ConcurrentSSA: concurrentSSA,
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
DependencyRequeueInterval: requeueDependency,
DisallowedFieldManagers: disallowedFieldManagers,
EventRecorder: eventRecorder,
FailFast: failFast,
GroupChangeLog: groupChangeLog,
KubeConfigOpts: kubeConfigOpts,
Mapper: restMapper,
Metrics: metricsH,
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
NoRemoteBases: noRemoteBases,
SOPSAgeSecret: sopsAgeSecret,
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
StrictSubstitutions: strictSubstitutions,
TokenCache: tokenCache,
}).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
WatchConfigsPredicate: watchConfigsPredicate,
Expand Down