8000 Feature/fuse improvements by evgmik · Pull Request #88 · sahib/brig · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Feature/fuse improvements #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jan 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f2c7539
better name for explicitly pinned extended attribute
evgmik Jan 18, 2021
5e214f0
fuse read speed up with MaxReadahead set to 128 kB
evgmik Jan 18, 2021
ef581a0
fuse: Extra 3MB/S with reduced log spaming at fuse-Read
evgmik Jan 18, 2021
4082458
fuse enabling WritebackCache speeds up Write operation 25MB/s ->50MB/s
evgmik Jan 18, 2021
58dbe03
fuse avoiding call to cfs.Stat(path) before we sure the Xattribute exist
evgmik Jan 18, 2021
73714fc
fuse: commenting out debug info in Getxattr gives about +10MB/s to th…
evgmik Jan 18, 2021
3a729e9
added missing import and minor comment edit
evgmik Jan 20, 2021
aed7d55
Putting back logPanic, it does not influence performance
evgmik Jan 20, 2021
c24b8d3
added time profiling info code
evgmik Jan 20, 2021
1ae191f
better debugging: log with fields does not preserve fields order :(
evgmik Jan 20, 2021
bbf29ae
more intensive debugging
evgmik Jan 20, 2021
1f4dded
fuse: added resp.Flags |= fuse.OpenKeepCache
evgmik Jan 20, 2021
7c83f7d
Overlay improvement: Do not call for seek if the pointer already at t…
evgmik Jan 22, 2021
6b90be0
added WritaAt method to avoid unneeded Seek during writes
evgmik Jan 23, 2021
d0b3f64
fuse uses overlay for writes
evgmik Jan 23, 2021
45f264d
gofmt fixes
evgmik Jan 23, 2021
03e48e6
WriteAt -> writeAt to indicate that it is not called by `fuse` but is…
evgmik Jan 23, 2021
d3aad7a
improved comment about Write request not coming in `fifo` order
evgmik Jan 23, 2021
984107a
Removed unused maxInt constant
evgmik Jan 23, 2021
ce915ce
bug fix: the response to Xattribute must take as much as needed!!!
evgmik Jan 23, 2021
c940779
redone getXattr and listXattr via xattrMap, also change hash related …
evgmik Jan 24, 2021
05b5737
added test for extended attributes
evgmik Jan 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions catfs/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,29 @@ func (hdl *Handle) Write(buf []byte) (int, error) {
return n, nil
}

// Writes data from `buf` at offset `off` counted from the start (0 offset).
// Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt
func (hdl *Handle) WriteAt(buf []byte, off int64) (n int, err error) {
hdl.lock.Lock()
defer hdl.lock.Unlock()

if hdl.readOnly {
return 0, ErrReadOnly
}

if hdl.isClosed {
return 0, ErrIsClosed
}

if err := hdl.initStreamIfNeeded(); err != nil {
return 0, err
}

hdl.wasModified = true
n, err = hdl.layer.WriteAt(buf, off)
return n, err
}

// Seek will jump to the `offset` relative to `whence`.
// There next read and write operation will start from this point.
func (hdl *Handle) Seek(offset int64, whence int) (int64, error) {
Expand Down
23 changes: 23 additions & 0 deletions catfs/mio/overlay/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ func (l *Layer) Write(buf []byte) (int, error) {
return len(buf), nil
}

// Writes data from `buf` at offset `off` counted from the start (0 offset).
// Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt
func (l *Layer) WriteAt(buf []byte, off int64) (n int, err error) {
// Copy the buffer, since we cannot rely on it being valid forever.
modBuf := make([]byte, len(buf))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential improvement here: use a sync.Pool to save a few allocations when writing many buffers. This also applies to the regular Write method and is probably more relevant there. Care must be taken that the buffer from pool.Get() has a suitable length.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I follow here. modBuf is somewhat permanent (sits in the overlay until flashed). Pool implies, that the data would be soon deleted. Besides manual says "Any item stored in the Pool may be removed automatically at any time without notification.". This sounds very scary.

I guess I do not know internals of gc to really see the difference.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That documentation is a little misleading. Nothing gets garbage collected while you still use it (i.e. you hold a reference to it). modBuf can be Put() back to the pool after flush and on L235 above you can do a pool.Get() to fetch a buffer that was either used before or is newly allocated. Go is free to do whatever it likes in the time between Put and Get.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now. Can we move it to separate issue?
From quick reading, pool is not suited for different size objects. So if len(buf) different from result of a Get, we would have to put it back. But I see no way, to iterate over pool to see if there is a suitable candidate. Besides, it would require the full rethinking of how overlay is working.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would just get an item from the pool and check if it's reasonably sized (if larger, cut it), if not allocate a new one the normal way and use that. But sure, we can do it once overlay is touched in general.

copy(modBuf, buf)

l.index.Add(&Modification{off, modBuf})
end := off + int64(len(buf))
if l.limit >= 0 && end > l.limit {
l.limit = end
}

n = len(buf)
err = nil
return n, nil
}

// hasGaps checks if overlays occludes all bytes between `start` and `end`
func hasGaps(overlays []Interval, start, end int64) bool {
diff := end - start
Expand Down Expand Up @@ -342,6 +360,11 @@ func (l *Layer) Seek(offset int64, whence int) (int64, error) {
l.limit = newPos
}

if l.pos == newPos {
// very likely it sequent read/write request
// no need to seek since we already pointing to the right position
return l.pos, nil
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

l.pos = newPos

// Silence EOF:
Expand Down
11 changes: 8 additions & 3 deletions fuse/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func (dir *Directory) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, r
defer logPanic("dir: getxattr")

debugLog("exec dir getxattr: %v: %v", dir.path, req.Name)
xattrs, err := getXattr(dir.m.fs, req.Name, dir.path, req.Size)

// Do not worry about req.Size
// fuse will cut it to allowed size and report to the caller that buffer need to be larger
xattrs, err := getXattr(dir.m.fs, req.Name, dir.path)
if err != nil {
return err
}
Expand All @@ -211,12 +214,14 @@ func (dir *Directory) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, r
return nil
}

// Listxattr is called to list all xattrs of this file.
// Listxattr is called to list all xattrs of this dir
func (dir *Directory) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
defer logPanic("dir: listxattr")

debugLog("exec dir listxattr")
resp.Xattr = listXattr(req.Size)
// Do not worry about req.Size
6D40 // fuse will cut it to allowed size and report to the caller that buffer need to be larger
resp.Xattr = listXattr()
return nil
}

Expand Down
48 changes: 18 additions & 30 deletions fuse/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type File struct {
// Attr is called to get the stat(2) attributes of a file.
func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error {
defer logPanic("file: attr")
log.Debugf("fuse-file-attr: %v", fi.path)

info, err := fi.m.fs.Stat(fi.path)
if err != nil {
Expand All @@ -55,11 +56,7 @@ func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Mode = os.ModeSymlink | filePerm
}
}
if fi.hd != nil && fi.hd.writers > 0 {
attr.Size = uint64(len(fi.hd.data))
} else {
attr.Size = info.Size
}
attr.Size = info.Size
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure, but info.Size might not be correct. When we truncate or extend a file then catfs still remembers the old size (it gets updated only after Stage(), i.e. on Flush()). We might need to use the limits of the overlay to determine the size at that point. At least that's what I remember, in any case we should check the file size on this test:

# file should have a size of 1MB.
$ dd if=/dev/zero of=/fuse/file.txt bs=1M count=1
# file size should increase with each write happening.
# (probably easier to test in a unit test where you can
#  control when the file is closed). 
$ dd if=/dev/zero of=/fuse/file.txt bs=1M count=20

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might also explains why your test produced that much metadata: Every fuse write called catfs.Handle.Write() and it stages the file to update the file size. That's actually something very dumb to do, and we should retrieve the current files ize from the handle and not from catfs (where it will be changed on flush).

attr.Mtime = info.ModTime
attr.Inode = info.Inode

Expand All @@ -83,6 +80,7 @@ func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error {
func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
defer logPanic("file: open")
debugLog("fuse-open: %s", fi.path)
log.Debugf("fuse-file-open: %v with request %v", fi.path, req)

// Check if the file is actually available locally.
if fi.m.options.Offline {
Expand All @@ -103,30 +101,13 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open

fi.mu.Lock()
if fi.hd == nil {
hd := Handle{fd: fd, m: fi.m, writers: 0, wasModified: false, currentFileReadOffset: -1}
hd := Handle{fd: fd, m: fi.m, wasModified: false, currentFileReadOffset: -1}
fi.hd = &hd
}
fi.hd.fd = fd
fi.mu.Unlock()
if req.Flags.IsReadOnly() {
// we don't need to track read-only handles
// and no need to set handle `data`
return fi.hd, nil
}

// for writers we need to copy file data to the handle `data`
fi.hd.mu.Lock()
defer fi.hd.mu.Unlock()
if fi.hd.writers == 0 {
err = fi.hd.loadData(fi.path)
if err != nil {
return nil, errorize("file-open-loadData", err)
}
}
if fi.hd.writers == (^uint(0)) { // checks against writers overflow
return nil, errorize(fi.path, ErrTooManyWriters)
}
fi.hd.writers++
resp.Flags |= fuse.OpenKeepCache
return fi.hd, nil
}

Expand All @@ -135,11 +116,11 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open
// file, the size change is noticed here before Open() is called.
func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
defer logPanic("file: setattr")
log.Debugf("fuse-file-setattr: request %v", req)

// This is called when any attribute of the file changes,
// most importantly the file size. For example it is called when truncating
// the file to zero bytes with a size change of `0`.
debugLog("exec file setattr")
switch {
case req.Valid&fuse.SetattrSize != 0:
if err := fi.hd.truncate(req.Size); err != nil {
Expand All @@ -158,17 +139,21 @@ func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
// Currently, fsync is completely ignored.
func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
defer logPanic("file: fsync")
log.Debugf("fuse-file-fsync: %v", fi.path)

debugLog("exec file fsync")
return nil
}

// Getxattr is called to get a single xattr (extended attribute) of a file.
func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
defer logPanic("file: getxattr")
// log.Debugf("fuse-file-getxattr %v for atribute %v", fi.path, req.Name)

// debugLog("exec file getxattr: %v: %v", fi.path, req.Name)

debugLog("exec file getxattr: %v: %v", fi.path, req.Name)
xattrs, err := getXattr(fi.m.fs, req.Name, fi.path, req.Size)
// Do not worry about req.Size
// fuse will cut it to allowed size and report to the caller that buffer need to be larger
xattrs, err := getXattr(fi.m.fs, req.Name, fi.path)
if err != nil {
return err
}
Expand All @@ -180,15 +165,18 @@ func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
// Listxattr is called to list all xattrs of this file.
func (fi *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
defer logPanic("file: listxattr")
log.Debugf("fuse-file-listxattr: %v", fi.path)

debugLog("exec file listxattr")
resp.Xattr = listXattr(req.Size)
// Do not worry about req.Size
// fuse will cut it to allowed size and report to the caller that buffer need to be larger
resp.Xattr = listXattr()
return nil
}

// Readlink reads a symbolic link.
// This call is triggered when OS tries to see where symlink points
func (fi *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
log.Debugf("fuse-file-readlink: %v", fi.path)
info, err := fi.m.fs.Stat(fi.path)
if err != nil {
return "/brig/backend/ipfs/", err
Expand Down
50 changes: 50 additions & 0 deletions fuse/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"path/filepath"
"syscall"
"testing"

"github.com/sahib/brig/catfs"
Expand Down Expand Up @@ -338,6 +339,55 @@ func TestRead(t *testing.T) {
})
}

func TestFileXattr(t *testing.T) {
withMount(t, MountOptions{}, func(ctx context.Context, control *spawntest.Control, mount *mountInfo) {
size := int64(4)
helloData := testutil.CreateDummyBuf(size)

// Add a simple file:
catfsFilePath := fmt.Sprintf("/hello_from_catfs_%d", size)
req := catfsPayload{Path: catfsFilePath, Data: helloData}
require.NoError(t, control.JSON("/catfsStage").Call(ctx, req, &nothing{}))
checkCatfsFileContent(ctx, t, control, catfsFilePath, helloData)

fuseFilePath := filepath.Join(mount.Dir, catfsFilePath)

// no let's see all the extended attributes list
response := make([]byte, 1024*4) // large buffer to fit everything
sz, err := syscall.Listxattr(fuseFilePath, response)
require.NoError(t, err)
response = response[:sz]
receivedAttrs := bytes.Split(response, []byte{0})
// every response should belong to valid attributes
for _, attr := range receivedAttrs {
if len(attr) == 0 {
// protecting against empty chunk after split delimiter
continue
}
_, ok := xattrMap[string(attr)]
require.Truef(t, ok, "Invalid extended attribute '%s'", attr)
}
// every valid attribute should be in received Attrs list
for attr, _ := range xattrMap {
require.Containsf(t, receivedAttrs, []uint8(attr), "Received attributes are missing '%s'", attr)
}
// now let's check some attributes values
// Note hashes are hard without direct access to catfs
// which is accessed in different process
response = make([]byte, 64) // large buffer to fit everything
sz, err = syscall.Getxattr(fuseFilePath, "user.brig.pinned", response)
require.NoError(t, err)
response = response[:sz]
require.Equal(t, "yes", string(response))

response = make([]byte, 64) // large buffer to fit everything
sz, err = syscall.Getxattr(fuseFilePath, "user.brig.explicitly_pinned", response)
require.NoError(t, err)
response = response[:sz]
require.Equal(t, "no", string(response))
})
}

func TestWrite(t *testing.T) {
withMount(t, MountOptions{}, func(ctx context.Context, control *spawntest.Control, mount *mountInfo) {
for _, size := range DataSizes {
Expand Down
Loading
0