Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/cache/data/file_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ type FileInfo struct {
ObjectGeneration int64
Offset uint64
FileSize uint64
FilePtr *os.File
}

func (fi FileInfo) Size() uint64 {
return fi.FileSize
}

type FileSpec struct {
FilePtr *os.File
Path string
FilePerm os.FileMode
DirPerm os.FileMode
Expand Down
9 changes: 8 additions & 1 deletion internal/cache/file/cache_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,19 @@ func (chr *CacheHandler) createLocalFileReadHandle(objectName string, bucketName
// and deletes the file in cache.
func (chr *CacheHandler) cleanUpEvictedFile(fileInfo *data.FileInfo) error {
key := fileInfo.Key
_, err := key.Key()
keyString, err := key.Key()
if err != nil {
return fmt.Errorf("cleanUpEvictedFile: while creating key: %w", err)
}

chr.jobManager.InvalidateAndRemoveJob(key.ObjectName, key.BucketName)
fileInfoCacheEntry := chr.fileInfoCache.LookUpWithoutChangingOrder(keyString)
if fileInfoCacheEntry != nil {
err := fileInfo.FilePtr.Close()
if err != nil {
return fmt.Errorf("error while closing cached file: %v", err)
}
}

localFilePath := util.GetDownloadPath(chr.cacheDir, util.GetObjectPath(key.BucketName, key.ObjectName))
err = util.TruncateAndRemoveFile(localFilePath)
Expand Down
16 changes: 9 additions & 7 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) {

updatedFileInfo := data.FileInfo{
Key: fileInfoKey, ObjectGeneration: job.object.Generation,
FilePtr: job.fileSpec.FilePtr,
FileSize: job.object.Size, Offset: uint64(downloadedOffset),
}

Expand Down Expand Up @@ -418,13 +419,14 @@ func (job *Job) downloadObjectAsync() {
job.handleError(err)
return
}
defer func() {
err = cacheFile.Close()
if err != nil {
err = fmt.Errorf("downloadObjectAsync: error while closing cache file: %w", err)
job.handleError(err)
}
}()
job.fileSpec.FilePtr = cacheFile
//defer func() {
// err = cacheFile.Close()
// if err != nil {
// err = fmt.Errorf("downloadObjectAsync: error while closing cache file: %w", err)
// job.handleError(err)
// }
//}()

// Both parallel and non-parallel download functions support cancellation in
// case of job's cancellation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestMain(m *testing.M) {
// Run tests for testBucket
// 4. Build the flag sets dynamically from the config.
cfg.CloudProfiler[0].Configs[0].Flags[0] = strings.ReplaceAll(cfg.CloudProfiler[0].Configs[0].Flags[0], "--cloud-profiler-label=", fmt.Sprintf("--cloud-profiler-label=%s", testServiceVersion))
flags := setup.BuildFlagSets(cfg.CloudProfiler[0], bucketType)
flags := setup.BuildFlagSets(cfg.CloudProfiler[0], bucketType, "")

setup.SetUpTestDirForTestBucket(&cfg.CloudProfiler[0])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestMain(m *testing.M) {
}()

// 3. Build the flag sets dynamically from the config.
flags := setup.BuildFlagSets(cfg.ExplicitDir[0], bucketType)
flags := setup.BuildFlagSets(cfg.ExplicitDir[0], bucketType, "")

// 4. Run tests with the dynamically generated flags.
successCode := implicit_and_explicit_dir_setup.RunTestsForExplicitAndImplicitDir(&cfg.ExplicitDir[0], flags, m)
Expand Down
2 changes: 1 addition & 1 deletion tools/integration_tests/gzip/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestMain(m *testing.M) {

// Run tests for testBucket.
// 4. Build the flag sets dynamically from the config.
flags := setup.BuildFlagSets(cfg.Gzip[0], bucketType)
flags := setup.BuildFlagSets(cfg.Gzip[0], bucketType, "")

setup.SetUpTestDirForTestBucket(&cfg.Gzip[0])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestMain(m *testing.M) {
}()

// 3. Build the flag sets dynamically from the config.
flags := setup.BuildFlagSets(cfg.ImplicitDir[0], bucketType)
flags := setup.BuildFlagSets(cfg.ImplicitDir[0], bucketType, "")

// 4. Run tests with the dynamically generated flags.
successCode := implicit_and_explicit_dir_setup.RunTestsForExplicitAndImplicitDir(&cfg.ImplicitDir[0], flags, m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestMain(m *testing.M) {

// Run tests for testBucket
// 4. Build the flag sets dynamically from the config.
flags := setup.BuildFlagSets(cfg.ListLargeDir[0], bucketType)
flags := setup.BuildFlagSets(cfg.ListLargeDir[0], bucketType, "")

setup.SetUpTestDirForTestBucket(&cfg.ListLargeDir[0])

Expand Down
2 changes: 1 addition & 1 deletion tools/integration_tests/local_file/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestMain(m *testing.M) {

// Run tests for testBucket.
// 4. Build the flag sets dynamically from the config.
flags := setup.BuildFlagSets(cfg.LocalFile[0], bucketType)
flags := setup.BuildFlagSets(cfg.LocalFile[0], bucketType, "")

setup.SetUpTestDirForTestBucket(&cfg.LocalFile[0])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package read_cache

import (
"context"
"fmt"
"log"
"path"
"os"
"testing"

"cloud.google.com/go/storage"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/client"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/log_parser/json_parser/read_logs"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/setup"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand All @@ -37,19 +37,30 @@ type cacheFileForExcludeRegexTest struct {
flags []string
storageClient *storage.Client
ctx context.Context
baseTestName string
suite.Suite
}

func (s *cacheFileForExcludeRegexTest) SetupSuite() {
setupLogFileAndCacheDir(s.baseTestName)
mountGCSFuseAndSetupTestDir(s.flags, s.ctx, s.storageClient)
}

func (s *cacheFileForExcludeRegexTest) SetupTest() {
setupForMountedDirectoryTests()
//Truncate log file created.
err := os.Truncate(testEnv.cfg.LogFile, 0)
require.NoError(s.T(), err)
// Clean up the cache directory path as gcsfuse don't clean up on mounting.
operations.RemoveDir(cacheDirPath)
mountGCSFuseAndSetupTestDir(s.flags, s.ctx, s.storageClient)
operations.RemoveDir(testEnv.cacheDirPath)
testEnv.testDirPath = client.SetupTestDirectory(s.ctx, s.storageClient, testDirName)
}

func (s *cacheFileForExcludeRegexTest) TearDownTest() {
setup.SaveGCSFuseLogFileInCaseOfFailure(s.T())
setup.UnmountGCSFuseAndDeleteLogFile(rootDir)
}

func (s *cacheFileForExcludeRegexTest) TearDownSuite() {
setup.UnmountGCSFuseWithConfig(testEnv.cfg)
}

////////////////////////////////////////////////////////////////////////
Expand All @@ -73,98 +84,26 @@ func (s *cacheFileForExcludeRegexTest) TestReadsForExcludedFile() {
////////////////////////////////////////////////////////////////////////

func TestCacheFileForExcludeRegexTest(t *testing.T) {
ts := &cacheFileForExcludeRegexTest{ctx: context.Background()}
// Create storage client before running tests.
closeStorageClient := client.CreateStorageClientWithCancel(&ts.ctx, &ts.storageClient)
defer func() {
err := closeStorageClient()
if err != nil {
t.Errorf("closeStorageClient failed: %v", err)
}
}()

// Run tests for mounted directory if the flag is set.
if setup.AreBothMountedDirectoryAndTestBucketFlagsSet() {
ts := &cacheFileForExcludeRegexTest{
ctx: context.Background(),
storageClient: testEnv.storageClient,
baseTestName: t.Name(),
}
// Run tests for mounted directory if the flag is set. This assumes that run flag is properly passed by GKE team as per the config.
if testEnv.cfg.GKEMountedDirectory != "" && testEnv.cfg.TestBucket != "" {
suite.Run(t, ts)
return
}

// Run with cache directory pointing to RAM based dir
ramCacheDir := path.Join("/dev/shm", cacheDirName)

tests := []struct {
flags gcsfuseTestFlags
onlyDirTest bool
}{
{
flags: gcsfuseTestFlags{
cliFlags: []string{"--implicit-dirs", "--file-cache-exclude-regex=."},
cacheSize: cacheCapacityForRangeReadTestInMiB,
cacheFileForRangeRead: false,
fileName: configFileName,
enableParallelDownloads: false,
enableODirect: false,
cacheDirPath: getDefaultCacheDirPathForTests(),
},
},
{
flags: gcsfuseTestFlags{
cliFlags: []string{"--file-cache-exclude-regex=."},
cacheSize: cacheCapacityForRangeReadTestInMiB,
cacheFileForRangeRead: false,
fileName: configFileName,
enableParallelDownloads: false,
enableODirect: false,
cacheDirPath: ramCacheDir,
},
},
{
flags: gcsfuseTestFlags{
cliFlags: []string{"--file-cache-exclude-regex=."},
cacheSize: cacheCapacityForRangeReadTestInMiB,
cacheFileForRangeRead: true,
fileName: configFileName,
enableParallelDownloads: false,
enableODirect: false,
cacheDirPath: ramCacheDir,
},
},
{
flags: gcsfuseTestFlags{
// Exclude regex is set to bucket name as the prefix of the string, so should exclude all objects.
cliFlags: []string{fmt.Sprintf("--file-cache-exclude-regex=^%s/", setup.TestBucket())},
cacheSize: cacheCapacityForRangeReadTestInMiB,
cacheFileForRangeRead: true,
fileName: configFileName,
enableParallelDownloads: false,
enableODirect: false,
cacheDirPath: ramCacheDir,
},
},
{
flags: gcsfuseTestFlags{
// Exclude regex is set to the only-dir value which is not present in local paths, but should be present in all cloud paths.
cliFlags: []string{fmt.Sprintf("--file-cache-exclude-regex=^%s/%s/", setup.TestBucket(), onlyDirMounted)},
cacheSize: cacheCapacityForRangeReadTestInMiB,
cacheFileForRangeRead: true,
fileName: configFileName,
enableParallelDownloads: false,
enableODirect: false,
cacheDirPath: ramCacheDir,
},
onlyDirTest: true,
},
// Run tests for GCE environment otherwise.
flagsSet := setup.BuildFlagSets(*testEnv.cfg, testEnv.bucketType, t.Name())
if setup.OnlyDirMounted() != "" {
flagsSet = append(flagsSet,
[]string{fmt.Sprintf("--file-cache-exclude-regex=^%s/%s/", setup.TestBucket(), onlyDirMounted), "--file-cache-max-size-mb=50", "--file-cache-cache-file-for-range-read=true", "--file-cache-enable-parallel-downloads=false", "--file-cache-enable-o-direct=false", fmt.Sprintf("--cache-dir=%s/gcsfuse-tmp/TestCacheFileForExcludeRegexTest", setup.TestDir()), fmt.Sprintf("--log-file=%s/gcsfuse-tmp/TestCacheFileForExcludeRegexTest.log", setup.TestDir()), "--log-severity=TRACE"},
[]string{fmt.Sprintf("--file-cache-exclude-regex=^%s/%s/", setup.TestBucket(), onlyDirMounted), "--file-cache-max-size-mb=50", "--file-cache-cache-file-for-range-read=true", "--file-cache-enable-parallel-downloads=false", "--file-cache-enable-o-direct=false", fmt.Sprintf("--cache-dir=%s/gcsfuse-tmp/TestCacheFileForExcludeRegexTest", setup.TestDir()), fmt.Sprintf("--log-file=%s/gcsfuse-tmp/TestCacheFileForExcludeRegexTest.log", setup.TestDir()), "--log-severity=TRACE", "--client-protocol=grpc"},
)
}
for _, test := range tests {
test.flags = appendClientProtocolConfigToFlagSet([]gcsfuseTestFlags{test.flags})[0]
if test.onlyDirTest && setup.OnlyDirMounted() == "" {
continue
}
configFilePath := createConfigFile(&test.flags)
ts.flags = []string{"--config-file=" + configFilePath}
if test.flags.cliFlags != nil {
ts.flags = append(ts.flags, test.flags.cliFlags...)
}
for _, ts.flags = range flagsSet {
log.Printf("Running tests with flags: %s", ts.flags)
suite.Run(t, ts)
}
Expand Down
Loading
Loading