Skip to content

Commit 917949e

Browse files
committed
fix
1 parent 76b148b commit 917949e

File tree

5 files changed

+61
-10
lines changed

5 files changed

+61
-10
lines changed

internal/fs/fs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,8 @@ func (fs *fileSystem) lookUpOrCreateInodeIfNotStale(ic inode.Core) (in inode.Ino
903903
}
904904

905905
// Handle implicit directories.
906-
if ic.MinObject == nil {
906+
if ic.MinObject.ImplicitDir {
907+
logger.Infof("In Implicit Dir")
907908
return fs.createDirInode(ic, fs.implicitDirInodes)
908909
}
909910

@@ -2185,6 +2186,7 @@ func (fs *fileSystem) RmDir(
21852186
// Delete the backing object.
21862187
fs.mu.Lock()
21872188
_, isImplicitDir := fs.implicitDirInodes[child.Name()]
2189+
logger.Infof("Is it Implicit Dir: ", isImplicitDir)
21882190
fs.mu.Unlock()
21892191
parent.Lock()
21902192
err = parent.DeleteChildDir(ctx, op.Name, isImplicitDir, childDir)

internal/fs/inode/dir.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,9 @@ func findDirInode(ctx context.Context, bucket *gcsx.SyncerBucket, name Name) (*C
404404
}
405405

406406
req := &gcs.ListObjectsRequest{
407-
Prefix: name.GcsObjectName(),
408-
MaxResults: 1,
407+
Prefix: name.GcsObjectName(),
408+
MaxResults: 1,
409+
ForceFetchFromCache: true,
409410
}
410411
listing, err := bucket.ListObjects(ctx, req)
411412
if err != nil {

internal/storage/caching/fast_stat_bucket.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,38 @@ func (b *fastStatBucket) insertMultiple(objs []*gcs.Object) {
9292
}
9393

9494
// LOCKS_EXCLUDED(b.mu)
95-
func (b *fastStatBucket) insertMultipleMinObjects(minObjs []*gcs.MinObject) {
95+
func (b *fastStatBucket) insertMultipleMinObjects(listing *gcs.Listing) {
9696
b.mu.Lock()
9797
defer b.mu.Unlock()
9898

99+
minObjectNames := make(map[string]struct{})
99100
expiration := b.clock.Now().Add(b.primaryCacheTTL)
100-
for _, o := range minObjs {
101+
102+
for _, o := range listing.MinObjects {
101103
b.cache.Insert(o, expiration)
104+
minObjectNames[o.Name] = struct{}{}
102105
}
106+
107+
for _, p := range listing.CollapsedRuns {
108+
// If a MinObject with the same name as the CollapsedRun already exists,
109+
// we don't need to insert it again as a Folder.
110+
if _, ok := minObjectNames[p]; ok {
111+
fmt.Println("MinObjects")
112+
continue
113+
}
114+
if !strings.HasSuffix(p, "/") {
115+
// log the error for incorrect prefix but don't fail the operation
116+
logger.Errorf("error in prefix name: %s", p)
117+
} else {
118+
f := &gcs.MinObject{
119+
Name: p,
120+
ImplicitDir: true,
121+
}
122+
fmt.Println("Cache implicit dir: ", f)
123+
b.cache.Insert(f, expiration)
124+
}
125+
}
126+
103127
}
104128

105129
// LOCKS_EXCLUDED(b.mu)
@@ -117,15 +141,20 @@ func (b *fastStatBucket) insertHierarchicalListing(listing *gcs.Listing) {
117141
b.mu.Lock()
118142
defer b.mu.Unlock()
119143

144+
minObjectNames := make(map[string]struct{})
120145
expiration := b.clock.Now().Add(b.primaryCacheTTL)
121146

122147
for _, o := range listing.MinObjects {
123-
if !strings.HasSuffix(o.Name, "/") {
124-
b.cache.Insert(o, expiration)
125-
}
148+
b.cache.Insert(o, expiration)
149+
minObjectNames[o.Name] = struct{}{}
126150
}
127151

128152
for _, p := range listing.CollapsedRuns {
153+
// If a MinObject with the same name as the CollapsedRun already exists,
154+
// we don't need to insert it again as a Folder.
155+
if _, ok := minObjectNames[p]; ok {
156+
continue
157+
}
129158
if !strings.HasSuffix(p, "/") {
130159
// log the error for incorrect prefix but don't fail the operation
131160
logger.Errorf("error in prefix name: %s", p)
@@ -145,7 +174,11 @@ func (b *fastStatBucket) insert(o *gcs.Object) {
145174
}
146175

147176
func (b *fastStatBucket) insertMinObject(o *gcs.MinObject) {
148-
b.insertMultipleMinObjects([]*gcs.MinObject{o})
177+
b.mu.Lock()
178+
defer b.mu.Unlock()
179+
180+
expiration := b.clock.Now().Add(b.primaryCacheTTL)
181+
b.cache.Insert(o, expiration)
149182
}
150183

151184
// LOCKS_EXCLUDED(b.mu)
@@ -357,6 +390,18 @@ func (b *fastStatBucket) StatObject(
357390
func (b *fastStatBucket) ListObjects(
358391
ctx context.Context,
359392
req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) {
393+
// If ForceFetchFromCache is true, we will try to serve listing from cache.
394+
if req.ForceFetchFromCache {
395+
fmt.Println("In force fetch", req.Prefix)
396+
if hit, entry := b.lookUp(req.Prefix); hit {
397+
// Otherwise, return MinObject and nil ExtendedObjectAttributes.
398+
listing = &gcs.Listing{
399+
MinObjects: []*gcs.MinObject{entry},
400+
}
401+
return
402+
}
403+
}
404+
360405
// Fetch the listing.
361406
listing, err = b.wrapped.ListObjects(ctx, req)
362407
if err != nil {
@@ -369,7 +414,7 @@ func (b *fastStatBucket) ListObjects(
369414
}
370415

371416
// note anything we found.
372-
b.insertMultipleMinObjects(listing.MinObjects)
417+
b.insertMultipleMinObjects(listing)
373418
return
374419
}
375420

internal/storage/gcs/object.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type MinObject struct {
9191
Metadata map[string]string
9292
ContentEncoding string
9393
CRC32C *uint32 // Missing for CMEK buckets
94+
ImplicitDir bool
9495
}
9596

9697
// ExtendedObjectAttributes contains the missing attributes of Object which are not present in MinObject.

internal/storage/gcs/request.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ type ListObjectsRequest struct {
320320
// the current flow, default value will be full and callers can override it
321321
// using this param.
322322
ProjectionVal Projection
323+
324+
ForceFetchFromCache bool
323325
}
324326

325327
// Listing contains a set of objects and delimter-based collapsed runs returned

0 commit comments

Comments
 (0)