Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 45 additions & 43 deletions internal/cmd/cli/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,31 @@ type Apply struct {
FleetClient
BundleInputArgs
OutputArgsNoDefault
Label map[string]string `usage:"Labels to apply to created bundles" short:"l"`
TargetsFile string `usage:"Addition source of targets and restrictions to be append"`
Compress bool `usage:"Force all resources to be compress" short:"c"`
ServiceAccount string `usage:"Service account to assign to bundle created" short:"a"`
SyncGeneration int `usage:"Generation number used to force sync the deployment"`
TargetNamespace string `usage:"Ensure this bundle goes to this target namespace"`
Paused bool `usage:"Create bundles in a paused state"`
Commit string `usage:"Commit to assign to the bundle" env:"COMMIT"`
Username string `usage:"Basic auth username for helm repo" env:"HELM_USERNAME"`
PasswordFile string `usage:"Path of file containing basic auth password for helm repo"`
CACertsFile string `usage:"Path of custom cacerts for helm repo" name:"cacerts-file"`
SSHPrivateKeyFile string `usage:"Path of ssh-private-key for helm repo" name:"ssh-privatekey-file"`
HelmRepoURLRegex string `usage:"Helm credentials will be used if the helm repo matches this regex. Credentials will always be used if this is empty or not provided" name:"helm-repo-url-regex"`
KeepResources bool `usage:"Keep resources created after the GitRepo or Bundle is deleted" name:"keep-resources"`
DeleteNamespace bool `usage:"Delete GitRepo target namespace after the GitRepo or Bundle is deleted" name:"delete-namespace"`
HelmCredentialsByPathFile string `usage:"Path of file containing helm credentials for paths" name:"helm-credentials-by-path-file"`
HelmBasicHTTP bool `usage:"Uses plain HTTP connections when downloading from helm repositories" name:"helm-basic-http"`
HelmInsecureSkipTLS bool `usage:"Skip TLS verification when downloading from helm repositories" name:"helm-insecure-skip-tls"`
CorrectDrift bool `usage:"Rollback any change made from outside of Fleet" name:"correct-drift"`
CorrectDriftForce bool `usage:"Use --force when correcting drift. Resources can be deleted and recreated" name:"correct-drift-force"`
CorrectDriftKeepFailHistory bool `usage:"Keep helm history for failed rollbacks" name:"correct-drift-keep-fail-history"`
OCIRegistrySecret string `usage:"OCI storage registry secret name" name:"oci-registry-secret"`
DrivenScan bool `usage:"Use driven scan. Bundles are defined by the user" name:"driven-scan"`
DrivenScanSeparator string `usage:"Separator to use for bundle folder and options file" name:"driven-scan-sep" default:":"`
Label map[string]string `usage:"Labels to apply to created bundles" short:"l"`
TargetsFile string `usage:"Addition source of targets and restrictions to be append"`
Compress bool `usage:"Force all resources to be compress" short:"c"`
ServiceAccount string `usage:"Service account to assign to bundle created" short:"a"`
SyncGeneration int `usage:"Generation number used to force sync the deployment"`
TargetNamespace string `usage:"Ensure this bundle goes to this target namespace"`
Paused bool `usage:"Create bundles in a paused state"`
Commit string `usage:"Commit to assign to the bundle" env:"COMMIT"`
Username string `usage:"Basic auth username for helm repo" env:"HELM_USERNAME"`
PasswordFile string `usage:"Path of file containing basic auth password for helm repo"`
CACertsFile string `usage:"Path of custom cacerts for helm repo" name:"cacerts-file"`
SSHPrivateKeyFile string `usage:"Path of ssh-private-key for helm repo" name:"ssh-privatekey-file"`
HelmRepoURLRegex string `usage:"Helm credentials will be used if the helm repo matches this regex. Credentials will always be used if this is empty or not provided" name:"helm-repo-url-regex"`
KeepResources bool `usage:"Keep resources created after the GitRepo or Bundle is deleted" name:"keep-resources"`
DeleteNamespace bool `usage:"Delete GitRepo target namespace after the GitRepo or Bundle is deleted" name:"delete-namespace"`
HelmCredentialsByPathFile string `usage:"Path of file containing helm credentials for paths" name:"helm-credentials-by-path-file"`
HelmBasicHTTP bool `usage:"Uses plain HTTP connections when downloading from helm repositories" name:"helm-basic-http"`
HelmInsecureSkipTLS bool `usage:"Skip TLS verification when downloading from helm repositories" name:"helm-insecure-skip-tls"`
CorrectDrift bool `usage:"Rollback any change made from outside of Fleet" name:"correct-drift"`
CorrectDriftForce bool `usage:"Use --force when correcting drift. Resources can be deleted and recreated" name:"correct-drift-force"`
CorrectDriftKeepFailHistory bool `usage:"Keep helm history for failed rollbacks" name:"correct-drift-keep-fail-history"`
OCIRegistrySecret string `usage:"OCI storage registry secret name" name:"oci-registry-secret"`
DrivenScan bool `usage:"Use driven scan. Bundles are defined by the user" name:"driven-scan"`
DrivenScanSeparator string `usage:"Separator to use for bundle folder and options file" name:"driven-scan-sep" default:":"`
BundleCreationMaxConcurrency int `usage:"Maximum number of concurrent bundle creation routines" name:"bundle-creation-max-concurrency" default:"4" env:"FLEET_BUNDLE_CREATION_MAX_CONCURRENCY"`
}

func (r *Apply) PersistentPre(_ *cobra.Command, _ []string) error {
Expand Down Expand Up @@ -108,25 +109,26 @@ func (a *Apply) run(cmd *cobra.Command, args []string) error {

name := ""
opts := apply.Options{
Namespace: a.Namespace,
BundleFile: a.BundleFile,
Output: writer.NewDefaultNone(a.Output),
Compress: a.Compress,
ServiceAccount: a.ServiceAccount,
Labels: a.Label,
TargetsFile: a.TargetsFile,
TargetNamespace: a.TargetNamespace,
Paused: a.Paused,
SyncGeneration: int64(a.SyncGeneration),
HelmRepoURLRegex: a.HelmRepoURLRegex,
KeepResources: a.KeepResources,
DeleteNamespace: a.DeleteNamespace,
CorrectDrift: a.CorrectDrift,
CorrectDriftForce: a.CorrectDriftForce,
CorrectDriftKeepFailHistory: a.CorrectDriftKeepFailHistory,
DrivenScan: a.DrivenScan,
DrivenScanSeparator: a.DrivenScanSeparator,
OCIRegistrySecret: a.OCIRegistrySecret,
Namespace: a.Namespace,
BundleFile: a.BundleFile,
Output: writer.NewDefaultNone(a.Output),
Compress: a.Compress,
ServiceAccount: a.ServiceAccount,
Labels: a.Label,
TargetsFile: a.TargetsFile,
TargetNamespace: a.TargetNamespace,
Paused: a.Paused,
SyncGeneration: int64(a.SyncGeneration),
HelmRepoURLRegex: a.HelmRepoURLRegex,
KeepResources: a.KeepResources,
DeleteNamespace: a.DeleteNamespace,
CorrectDrift: a.CorrectDrift,
CorrectDriftForce: a.CorrectDriftForce,
CorrectDriftKeepFailHistory: a.CorrectDriftKeepFailHistory,
DrivenScan: a.DrivenScan,
DrivenScanSeparator: a.DrivenScanSeparator,
OCIRegistrySecret: a.OCIRegistrySecret,
BundleCreationMaxConcurrency: a.BundleCreationMaxConcurrency,
}

knownHostsPath, err := writeTmpKnownHosts()
Expand Down
105 changes: 61 additions & 44 deletions internal/cmd/cli/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ var (
)

const (
JSONOutputEnvVar = "FLEET_JSON_OUTPUT"
JobNameEnvVar = "JOB_NAME"
FleetApplyConflictRetriesEnv = "FLEET_APPLY_CONFLICT_RETRIES"
defaultApplyConflictRetries = 1
JSONOutputEnvVar = "FLEET_JSON_OUTPUT"
JobNameEnvVar = "JOB_NAME"
FleetApplyConflictRetriesEnv = "FLEET_APPLY_CONFLICT_RETRIES"
BundleCreationMaxConcurrencyEnv = "FLEET_BUNDLE_CREATION_MAX_CONCURRENCY"
defaultApplyConflictRetries = 1
defaultBundleCreationMaxConcurrency = 4
)

type Getter interface {
Expand All @@ -64,30 +66,31 @@ type OCIRegistrySpec struct {
}

type Options struct {
Namespace string
BundleFile string
TargetsFile string
Compress bool
BundleReader io.Reader
Output io.Writer
ServiceAccount string
TargetNamespace string
Paused bool
Labels map[string]string
SyncGeneration int64
Auth bundlereader.Auth
HelmRepoURLRegex string
KeepResources bool
DeleteNamespace bool
AuthByPath map[string]bundlereader.Auth
CorrectDrift bool
CorrectDriftForce bool
CorrectDriftKeepFailHistory bool
OCIRegistry OCIRegistrySpec
OCIRegistrySecret string
DrivenScan bool
DrivenScanSeparator string
JobNameEnvVar string
Namespace string
BundleFile string
TargetsFile string
Compress bool
BundleReader io.Reader
Output io.Writer
ServiceAccount string
TargetNamespace string
Paused bool
Labels map[string]string
SyncGeneration int64
Auth bundlereader.Auth
HelmRepoURLRegex string
KeepResources bool
DeleteNamespace bool
AuthByPath map[string]bundlereader.Auth
CorrectDrift bool
CorrectDriftForce bool
CorrectDriftKeepFailHistory bool
OCIRegistry OCIRegistrySpec
OCIRegistrySecret string
DrivenScan bool
DrivenScanSeparator string
JobNameEnvVar string
BundleCreationMaxConcurrency int
}

func globDirs(baseDir string) (result []string, err error) {
Expand All @@ -106,7 +109,12 @@ func globDirs(baseDir string) (result []string, err error) {
return
}

const bundleCreationMaxConcurrency = 4
func getEffectiveMaxConcurrency(configured int) int {
if configured <= 0 {
return defaultBundleCreationMaxConcurrency
}
return configured
}

// CreateBundles creates bundles from the baseDirs, their names are prefixed with
// repoName. Depending on opts.Output the bundles are created in the cluster or
Expand All @@ -116,13 +124,15 @@ func CreateBundles(pctx context.Context, client client.Client, r record.EventRec
baseDirs = []string{"."}
}

maxConcurrency := getEffectiveMaxConcurrency(opts.BundleCreationMaxConcurrency)

// Using an errgroup to manage concurrency
// 1. Goroutines will be launched, honouring the concurrency limit, and eventually block trying to write to `bundlesChan`.
// 2. The main function will read from `bundlesChan`, hence unblocking the goroutines. This will continue to read from `bundlesChan` until it is closed.
// 3. We use another goroutine to wait for all goroutines to finish, then close `bundlesChan`, finally unblocking the main function.
bundlesChan := make(chan *fleet.Bundle)
eg, ctx := errgroup.WithContext(pctx)
eg.SetLimit(bundleCreationMaxConcurrency + 1) // extra goroutine for WalkDir loop
eg.SetLimit(maxConcurrency + 1) // extra goroutine for WalkDir loop
eg.Go(func() error {
for _, baseDir := range baseDirs {
matches, err := globDirs(baseDir)
Expand Down Expand Up @@ -218,18 +228,19 @@ func CreateBundlesDriven(pctx context.Context, client client.Client, r record.Ev
baseDirs = []string{"."}
}

maxConcurrency := getEffectiveMaxConcurrency(opts.BundleCreationMaxConcurrency)

// Using an errgroup to manage concurrency
// 1. Goroutines will be launched, honouring the concurrency limit, and eventually block trying to write to `bundlesChan`.
// 2. The main function will read from `bundlesChan`, hence unblocking the goroutines. This will continue to read from `bundlesChan` until it is closed.
// 3. We use another goroutine to wait for all goroutines to finish, then close `bundlesChan`, finally unblocking the main function.
bundlesChan := make(chan *fleet.Bundle)
eg, ctx := errgroup.WithContext(pctx)
eg.SetLimit(bundleCreationMaxConcurrency + 1) // extra goroutine for WalkDir loop
eg.SetLimit(maxConcurrency + 1) // extra goroutine for scanning loop
eg.Go(func() error {
for _, baseDir := range baseDirs {
opts := opts
eg.Go(func() error {
// verify if it also defines a fleetFile
var err error
baseDir, opts.BundleFile, err = getPathAndFleetYaml(baseDir, opts.DrivenScanSeparator)
if err != nil {
Expand Down Expand Up @@ -817,18 +828,24 @@ func setAuthByPath(opts *Options, path string) error {
return nil
}

func GetOnConflictRetries() (int, error) {
s := os.Getenv(FleetApplyConflictRetriesEnv)
if s != "" {
// check if we have a valid value
// it must be an integer
r, err := strconv.Atoi(s)
if err != nil {
return defaultApplyConflictRetries, err
} else {
return r, nil
}
// getIntEnvVar reads an integer from an environment variable, returning the default if unset or invalid.
func getIntEnvVar(envVarName string, defaultValue int) (int, error) {
s := os.Getenv(envVarName)
if s == "" {
return defaultValue, nil
}

return defaultApplyConflictRetries, nil
val, err := strconv.Atoi(s)
if err != nil {
return defaultValue, err
}
return val, nil
}

func GetOnConflictRetries() (int, error) {
return getIntEnvVar(FleetApplyConflictRetriesEnv, defaultApplyConflictRetries)
}

func GetBundleCreationMaxConcurrency() (int, error) {
return getIntEnvVar(BundleCreationMaxConcurrencyEnv, defaultBundleCreationMaxConcurrency)
}
95 changes: 95 additions & 0 deletions internal/cmd/cli/apply/apply_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package apply

import (
"os"
"testing"
)

func TestGetEffectiveMaxConcurrency(t *testing.T) {
tests := map[string]struct {
input int
expected int
}{
"zero defaults to 4": {0, 4},
"negative defaults to 4": {-1, 4},
"custom value 8": {8, 8},
"custom value 16": {16, 16},
"custom value 12": {12, 12},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
if got := getEffectiveMaxConcurrency(tt.input); got != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, got)
}
})
}
}

func TestGetBundleCreationMaxConcurrency(t *testing.T) {
tests := []struct {
name string
envValue string
expectedValue int
expectedError bool
}{
{
name: "default when env var not set",
envValue: "",
expectedValue: 4,
expectedError: false,
},
{
name: "custom value 8",
envValue: "8",
expectedValue: 8,
expectedError: false,
},
{
name: "custom value 16",
envValue: "16",
expectedValue: 16,
expectedError: false,
},
{
name: "invalid value returns error",
envValue: "not_a_number",
expectedValue: 4,
expectedError: true,
},
{
name: "zero is valid but caller handles default",
envValue: "0",
expectedValue: 0,
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Save and restore the environment variable
oldVal, wasSet := os.LookupEnv(BundleCreationMaxConcurrencyEnv)
defer func() {
if wasSet {
os.Setenv(BundleCreationMaxConcurrencyEnv, oldVal)
} else {
os.Unsetenv(BundleCreationMaxConcurrencyEnv)
}
}()

if tt.envValue != "" {
os.Setenv(BundleCreationMaxConcurrencyEnv, tt.envValue)
} else {
os.Unsetenv(BundleCreationMaxConcurrencyEnv)
}

got, err := GetBundleCreationMaxConcurrency()
if (err != nil) != tt.expectedError {
t.Errorf("expected error %v, got %v", tt.expectedError, err != nil)
}
if got != tt.expectedValue {
t.Errorf("expected %d, got %d", tt.expectedValue, got)
}
})
}
}
Loading