Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func getFuseMountConfig(fsName string, newConfig *cfg.Config) *fuse.MountConfig
// Enables ReadDirPlus, allowing the kernel to retrieve directory entries and their
// attributes in a single operation.
EnableReaddirplus: newConfig.FileSystem.ExperimentalEnableReaddirplus,
UseVectoredRead: true,
}

// GCSFuse to Jacobsa Fuse Log Level mapping:
Expand Down
68 changes: 68 additions & 0 deletions internal/block/prefetch_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"syscall"
)

Expand Down Expand Up @@ -45,6 +46,11 @@ type PrefetchBlock interface {
// Here, off is relative to the start of the block.
ReadAt(p []byte, off int64) (n int, err error)

// ReadAtSlice returns a slice of the block's buffer if the read can be
// satisfied. It avoids a copy. The returned slice should not be modified.
// Here, off is relative to the start of the block.
ReadAtSlice(off int64, size int) (p []byte, err error)

// AbsStartOff returns the absolute start offset of the block.
// Panics if the absolute start offset is not set.
AbsStartOff() int64
Expand All @@ -64,6 +70,21 @@ type PrefetchBlock interface {
// - BlockStatusDownloaded: Download of this block is complete.
// - BlockStatusDownloadFailed: Download of this block has failed.
NotifyReady(val BlockStatus)

// IncrementRef increments the reference count of the block.
IncrementRef()

// DecrementRef decrements the reference count of the block.
DecrementRef() int32

// RefCount returns the current reference count of the block.
RefCount() int32

// WasEvicted returns true if the block was evicted from the retired cache.
WasEvicted() bool

// SetWasEvicted sets the wasEvicted flag.
SetWasEvicted(val bool)
}

type prefetchMemoryBlock struct {
Expand All @@ -77,6 +98,12 @@ type prefetchMemoryBlock struct {

// Stores the absolute start offset of the block-segment in the file.
absStartOff int64

// refCount tracks the number of active references to the block.
refCount atomic.Int32

// wasEvicted is true if the block was evicted from the retired cache.
wasEvicted atomic.Bool
}

func (pmb *prefetchMemoryBlock) Reuse() {
Expand All @@ -85,6 +112,8 @@ func (pmb *prefetchMemoryBlock) Reuse() {
pmb.notification = make(chan BlockStatus, 1)
pmb.status = BlockStatus{State: BlockStateInProgress}
pmb.absStartOff = -1
pmb.refCount.Store(0)
pmb.wasEvicted.Store(false)
}

// createPrefetchBlock creates a new PrefetchBlock.
Expand Down Expand Up @@ -126,6 +155,25 @@ func (pmb *prefetchMemoryBlock) ReadAt(p []byte, off int64) (n int, err error) {
return n, nil
}

// ReadAtSlice returns a slice of the underlying buffer.
// The offset is relative to the start of the block.
// It returns the slice and an error if any.
func (pmb *prefetchMemoryBlock) ReadAtSlice(off int64, size int) (p []byte, err error) {
if off < 0 || off >= pmb.Size() {
return nil, fmt.Errorf("prefetchMemoryBlock.ReadAtSlice: offset %d is out of bounds for block size %d", off, pmb.Size())
}

remData := pmb.Size() - off
if int64(size) > remData {
return nil, fmt.Errorf("prefetchMemoryBlock.ReadAtSlice: requested size %d is larger than remaining block size %d", size, remData)
}

dataStart := pmb.offset.start + off
dataEnd := dataStart + int64(size)

return pmb.buffer[dataStart:dataEnd], nil
}

func (pmb *prefetchMemoryBlock) AbsStartOff() int64 {
if pmb.absStartOff < 0 {
panic("AbsStartOff is not set, it should be set before calling this method.")
Expand Down Expand Up @@ -178,3 +226,23 @@ func (pmb *prefetchMemoryBlock) NotifyReady(val BlockStatus) {
panic("Expected to notify only once, but got multiple notifications.")
}
}

func (pmb *prefetchMemoryBlock) IncrementRef() {
pmb.refCount.Add(1)
}

func (pmb *prefetchMemoryBlock) DecrementRef() int32 {
return pmb.refCount.Add(-1)
}

func (pmb *prefetchMemoryBlock) RefCount() int32 {
return pmb.refCount.Load()
}

func (pmb *prefetchMemoryBlock) WasEvicted() bool {
return pmb.wasEvicted.Load()
}

func (pmb *prefetchMemoryBlock) SetWasEvicted(val bool) {
pmb.wasEvicted.Store(val)
}
40 changes: 35 additions & 5 deletions internal/bufferedread/buffered_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type BufferedReader struct {
// of blocks that can be allocated across all BufferedReader instances.
// GUARDED by (mu)
blockPool *block.GenBlockPool[block.PrefetchBlock]

inflightCallbackWg sync.WaitGroup
}

// NewBufferedReader returns a new bufferedReader instance.
Expand Down Expand Up @@ -143,6 +145,8 @@ func NewBufferedReader(object *gcs.MinObject, bucket gcs.Bucket, config *Buffere
metricHandle: metricHandle,
prefetchMultiplier: defaultPrefetchMultiplier,
randomReadsThreshold: config.RandomSeekThreshold,
readHandle: nil,
inflightCallbackWg: sync.WaitGroup{},
}

reader.ctx, reader.cancelFunc = context.WithCancel(context.Background())
Expand Down Expand Up @@ -299,6 +303,7 @@ func (p *BufferedReader) ReadAt(ctx context.Context, inputBuf []byte, off int64)
}

prefetchTriggered := false
var callBackList []block.PrefetchBlock

for bytesRead < len(inputBuf) {
p.prepareQueueForOffset(off)
Expand Down Expand Up @@ -344,12 +349,21 @@ func (p *BufferedReader) ReadAt(ctx context.Context, inputBuf []byte, off int64)
}

relOff := off - blk.AbsStartOff()
n, readErr := blk.ReadAt(inputBuf[bytesRead:], relOff)
bytesRead += n
off += int64(n)
blkSlice, sliceErr := blk.ReadAtSlice(relOff, min(len(inputBuf), int(blk.Size()-relOff)))
if sliceErr != nil {
err = fmt.Errorf("BufferedReader.ReadAt: block.ReadAtSlice: %w", sliceErr)
break
}
blk.IncrementRef()
resp.DataBufs = append(resp.DataBufs, blkSlice)
resp.VectoredRead = true
callBackList = append(callBackList, blk)

if readErr != nil && !errors.Is(readErr, io.EOF) {
err = fmt.Errorf("BufferedReader.ReadAt: block.ReadAt: %w", readErr)
bytesRead += len(blkSlice)
off += int64(len(blkSlice))

if sliceErr != nil && !errors.Is(sliceErr, io.EOF) {
err = fmt.Errorf("BufferedReader.ReadAt: block.ReadAt: %w", sliceErr)
break
}

Expand All @@ -358,6 +372,7 @@ func (p *BufferedReader) ReadAt(ctx context.Context, inputBuf []byte, off int64)
}

if off >= blk.AbsStartOff()+blk.Size() {
blk.SetWasEvicted(true)
p.blockQueue.Pop()
p.blockPool.Release(blk)

Expand All @@ -376,10 +391,25 @@ func (p *BufferedReader) ReadAt(ctx context.Context, inputBuf []byte, off int64)
}
}

resp.VectoredRead = true
resp.ReadCB = func() {
p.CallbackHelper(callBackList)
}
resp.Size = bytesRead
return resp, err
}

func (p *BufferedReader) CallbackHelper(blocks []block.PrefetchBlock) {
p.inflightCallbackWg.Add(1)
defer p.inflightCallbackWg.Done()
for _, b := range blocks {
b.DecrementRef()
if b.RefCount() == 0 && b.WasEvicted() {
p.blockPool.Release(b)
}
}
}

// prefetch schedules the next set of blocks for prefetching starting from
// the nextBlockIndexToPrefetch.
// LOCKS_REQUIRED(p.mu)
Expand Down
25 changes: 24 additions & 1 deletion internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,6 +2831,11 @@ func (fs *fileSystem) OpenFile(
return
}

var oneMBBuffer []byte
func init() {
oneMBBuffer = make([]byte, 1024*1024 + 100)
}

// LOCKS_EXCLUDED(fs.mu)
func (fs *fileSystem) ReadFile(
ctx context.Context,
Expand Down Expand Up @@ -2870,10 +2875,28 @@ func (fs *fileSystem) ReadFile(
return err
}
}

op.Dst = oneMBBuffer[:op.Size]

// Serve the read.

if fs.newConfig.EnableNewReader {
op.Dst, op.BytesRead, err = fh.ReadWithReadManager(ctx, op.Dst, op.Offset, fs.sequentialReadSizeMb)
var readerResponse *gcsx.ReaderResponse
readerResponse, err = fh.ReadWithReadManager(ctx, op.Dst, op.Offset, fs.sequentialReadSizeMb)
if readerResponse == nil || err != nil {
err = fmt.Errorf("ReadWithReadManager: %w", err)
return
}

logger.Info("Data len: %v, Vectored: %v, BytesRead: %v", len(readerResponse.DataBufs), readerResponse.VectoredRead, readerResponse.Size)

if readerResponse.VectoredRead {
op.Data = readerResponse.DataBufs
op.BytesRead = readerResponse.Size
} else {
op.Dst = readerResponse.DataBufs[0]
op.BytesRead = readerResponse.Size
}
} else {
op.Dst, op.BytesRead, err = fh.Read(ctx, op.Dst, op.Offset, fs.sequentialReadSizeMb)
}
Expand Down
15 changes: 8 additions & 7 deletions internal/fs/handle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,18 @@ func (fh *FileHandle) unlockHandleAndInode(rLock bool) {
//
// LOCKS_REQUIRED(fh.inode.mu)
// UNLOCK_FUNCTION(fh.inode.mu)
func (fh *FileHandle) ReadWithReadManager(ctx context.Context, dst []byte, offset int64, sequentialReadSizeMb int32) ([]byte, int, error) {
func (fh *FileHandle) ReadWithReadManager(ctx context.Context, dst []byte, offset int64, sequentialReadSizeMb int32) (*gcsx.ReaderResponse, error) {
// If content cache enabled, CacheEnsureContent forces the file handler to fall through to the inode
// and fh.inode.SourceGenerationIsAuthoritative() will return false
if err := fh.inode.CacheEnsureContent(ctx); err != nil {
return nil, 0, fmt.Errorf("failed to ensure inode content: %w", err)
return &gcsx.ReaderResponse{}, fmt.Errorf("failed to ensure inode content: %w", err)
}

if !fh.inode.SourceGenerationIsAuthoritative() {
// Read from inode if source generation is not authoratative
defer fh.inode.Unlock()
n, err := fh.inode.Read(ctx, dst, offset)
return dst, n, err
_, err := fh.inode.Read(ctx, dst, offset)
return &gcsx.ReaderResponse{DataBufs: [][]byte{dst}}, err
}

fh.lockHandleAndRelockInode(true)
Expand Down Expand Up @@ -218,6 +218,7 @@ func (fh *FileHandle) ReadWithReadManager(ctx context.Context, dst []byte, offse
fh.mu.RLock()
}

logger.Tracef("FileHandle.ReadWithReadManager: Using readManager to read %d bytes at offset %d", len(dst), offset)
// Use the readManager to read data.
var readerResponse gcsx.ReaderResponse
var err error
Expand All @@ -227,13 +228,13 @@ func (fh *FileHandle) ReadWithReadManager(ctx context.Context, dst []byte, offse
if err != io.EOF {
logger.Warnf("Unexpected EOF error encountered while reading, err: %v type: %T ", err, err)
}
return nil, 0, io.EOF
return nil, io.EOF

case err != nil:
return nil, 0, fmt.Errorf("fh.readManager.ReadAt: %w", err)
return nil, fmt.Errorf("fh.readManager.ReadAt: %w", err)
}

return readerResponse.DataBuf, readerResponse.Size, nil
return &readerResponse, nil
}

// Equivalent to locking fh.Inode() and calling fh.Inode().Read, but may be
Expand Down
8 changes: 8 additions & 0 deletions internal/gcsx/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ type ReaderResponse struct {
// DataBuf contains the bytes read from the object.
DataBuf []byte

DataBufs [][]byte

// Take 0th index of DataBuf if not VectoredRead.
VectoredRead bool

// Size indicates how many bytes were read into DataBuf.
Size int

// Called by jacobsa/fuse once read is complete.
ReadCB func()
}

type Reader interface {
Expand Down
Loading