From f2c7539d763762c701348f6ee25ec595e5d3fb92 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sun, 17 Jan 2021 22:47:00 -0500 Subject: [PATCH 01/22] better name for explicitly pinned extended attribute Also added explicitly_pinned to the list of known Xattributes --- fuse/util.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fuse/util.go b/fuse/util.go index d07fbd81..76e0a2d5 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -38,6 +38,7 @@ func listXattr(size uint32) []byte { resp = append(resp, "user.brig.hash\x00"...) resp = append(resp, "user.brig.content\x00"...) resp = append(resp, "user.brig.pinned\x00"...) + resp = append(resp, "user.brig.explicitly_pinned\x00"...) if uint32(len(resp)) > size { resp = resp[:size] @@ -65,7 +66,7 @@ func getXattr(cfs *catfs.FS, name, path string, size uint32) ([]byte, error) { } else { resp = []byte("no") } - case "user.brig.explicit": + case "user.brig.explicitly_pinned": if info.IsExplicit { resp = []byte("yes") } else { From 5e214f08ed9b1f239e209378979d41680ac0f6fe Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Mon, 18 Jan 2021 00:24:21 -0500 Subject: [PATCH 02/22] fuse read speed up with MaxReadahead set to 128 kB The default MaxReadahead is 4 kB. By changing it to 128 kB, we get fuse Read speed from 12.5MB to 33MB. Nice win!!! --- fuse/mount.go | 1 + 1 file changed, 1 insertion(+) diff --git a/fuse/mount.go b/fuse/mount.go index 3789d72f..dcf7f469 100644 --- a/fuse/mount.go +++ b/fuse/mount.go @@ -67,6 +67,7 @@ func NewMount(cfs *catfs.FS, mountpoint string, notifier Notifier, opts MountOpt fuse.FSName("brigfs"), fuse.Subtype("brig"), fuse.AllowNonEmptyMount(), + fuse.MaxReadahead(128*1024), // kernel uses at max 128kB = 131072B } if opts.ReadOnly { From ef581a08e28ac30444203b20231703bf467c7e85 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Mon, 18 Jan 2021 00:27:01 -0500 Subject: [PATCH 03/22] fuse: Extra 3MB/S with reduced log spaming at fuse-Read I might be delusional but logging every call to fuse-Read has a througput penalty. With even with Maximum read size of 128kB with large file sizes it called quite often. With default MaxReadahead=4kB that logging leads to quite catastropic speed drop. --- fuse/handle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuse/handle.go b/fuse/handle.go index 7210c0fe..1a9e6f14 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -71,7 +71,7 @@ func (hd *Handle) loadData(path string) error { func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { hd.mu.Lock() defer hd.mu.Unlock() - defer logPanic("handle: read") + // defer logPanic("handle: read") // log.WithFields(log.Fields{ // "path": hd.fd.Path(), From 408245860834d1b72a5465f65618c6c4c0922519 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Mon, 18 Jan 2021 14:57:20 -0500 Subject: [PATCH 04/22] fuse enabling WritebackCache speeds up Write operation 25MB/s ->50MB/s This is because now Write happens in much larger chunks 128kB instead of 8kB, and kernel sucks in the file to be writted. Note to get 50MB/s I need to disable logging some write operation logging, it takes quite a lot of CPU overhead. See follow up patches. --- fuse/mount.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fuse/mount.go b/fuse/mount.go index dcf7f469..957537c1 100644 --- a/fuse/mount.go +++ b/fuse/mount.go @@ -67,7 +67,10 @@ func NewMount(cfs *catfs.FS, mountpoint string, notifier Notifier, opts MountOpt fuse.FSName("brigfs"), fuse.Subtype("brig"), fuse.AllowNonEmptyMount(), + // enabling MaxReadahead double or even triple Read throughput 12MB/s -> 25 or 33 MB/s fuse.MaxReadahead(128*1024), // kernel uses at max 128kB = 131072B + // enabling WritebackCache doubles write speed to buffer 12MB/s -> 24MB/s + fuse.WritebackCache(), // writes will happen in mach large blocks 128kB instead of 8kB } if opts.ReadOnly { From 58dbe037d468bee25e345a8d9b4ba915d68ae15a Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Mon, 18 Jan 2021 16:12:54 -0500 Subject: [PATCH 05/22] fuse avoiding call to cfs.Stat(path) before we sure the Xattribute exist Apparently, when WritebackCache is enable, the OS is quite spammy with requests to Getxattr for attribute "security.capability". Old code use to call cfs.Stat even if such attribute is not the one which we can provide. The call cost time and decrease througput by about 10MB/s. Now we check if we can even provide an attribute, and only then fetch relevant info. --- fuse/util.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/fuse/util.go b/fuse/util.go index 76e0a2d5..9e5b6f11 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -47,7 +47,22 @@ func listXattr(size uint32) []byte { return resp } +func isKnownAttribute(name string) bool { + reqSize := uint32(1024) + knownAttrs := bytes.Split(listXattr(reqSize), []byte{0}) + for _, attr := range(knownAttrs) { + if string(attr) == name { + return true + } + } + return false +} + func getXattr(cfs *catfs.FS, name, path string, size uint32) ([]byte, error) { + if !isKnownAttribute(name) { + return nil, fuse.ErrNoXattr + } + info, err := cfs.Stat(path) if err != nil { return nil, errorize("getxattr", err) From 73714fcb4198a19066ecdf86f2921fca1485be0f Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Mon, 18 Jan 2021 16:17:56 -0500 Subject: [PATCH 06/22] fuse: commenting out debug info in Getxattr gives about +10MB/s to througput As I mention before Write with WritebackCache enabled is ask for Getxattr way to often. If we report every call, our trhoughput is lower by about 10MB/s. --- fuse/file.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fuse/file.go b/fuse/file.go index 5b36ca8f..db4e7cfd 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -165,9 +165,10 @@ func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // Getxattr is called to get a single xattr (extended attribute) of a file. func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - defer logPanic("file: getxattr") + // defer logPanic("file: getxattr") + // log.Debugf("fuse-file-getxattr %v for atribute %v", fi.path, req.Name) - debugLog("exec file getxattr: %v: %v", fi.path, req.Name) + // debugLog("exec file getxattr: %v: %v", fi.path, req.Name) xattrs, err := getXattr(fi.m.fs, req.Name, fi.path, req.Size) if err != nil { return err From 3a729e93e645cc4d2ac2c7f481c27c99c9796afe Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 22:56:18 -0500 Subject: [PATCH 07/22] added missing import and minor comment edit --- fuse/util.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fuse/util.go b/fuse/util.go index 9e5b6f11..b15db589 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -3,6 +3,7 @@ package fuse import ( + "bytes" "time" "bazil.org/fuse" @@ -26,7 +27,7 @@ func errorize(name string, err error) error { } // logPanic logs any panics by being called in a defer. -// A rather inconvinient behaviour of fuse is to not report panics. +// A rather inconvenient behaviour of fuse is to not report panics. func logPanic(name string) { if err := recover(); err != nil { log.Errorf("bug: %s panicked: %v", name, err) From aed7d55534326e6134676574e4060193406af72f Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 23:03:48 -0500 Subject: [PATCH 08/22] Putting back logPanic, it does not influence performance --- fuse/file.go | 2 +- fuse/handle.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fuse/file.go b/fuse/file.go index db4e7cfd..5a26a16a 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -165,7 +165,7 @@ func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // Getxattr is called to get a single xattr (extended attribute) of a file. func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - // defer logPanic("file: getxattr") + defer logPanic("file: getxattr") // log.Debugf("fuse-file-getxattr %v for atribute %v", fi.path, req.Name) // debugLog("exec file getxattr: %v: %v", fi.path, req.Name) diff --git a/fuse/handle.go b/fuse/handle.go index 1a9e6f14..7210c0fe 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -71,7 +71,7 @@ func (hd *Handle) loadData(path string) error { func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { hd.mu.Lock() defer hd.mu.Unlock() - // defer logPanic("handle: read") + defer logPanic("handle: read") // log.WithFields(log.Fields{ // "path": hd.fd.Path(), From c24b8d3bcf2232bc888d260c02b246fc430a67a6 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 23:15:32 -0500 Subject: [PATCH 09/22] added time profiling info code --- fuse/handle.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fuse/handle.go b/fuse/handle.go index 7210c0fe..cd2c2d72 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -36,6 +36,7 @@ func (hd *Handle) loadData(path string) error { log.Debug("fuse: loadData for ", path) hd.data = nil hd.wasModified = false + start := time.Now() // rewind to start newOff, err := hd.fd.Seek(0, io.SeekStart) if err != nil { @@ -64,6 +65,7 @@ func (hd *Handle) loadData(path string) error { } } hd.data = data + log.Infof("fuse: loadData buffered `%s` %d bytes with troughput %.2f MB/s", path, len(hd.data), float64(len(hd.data))/time.Since(start).Seconds()/1.0e6) return nil } @@ -155,10 +157,12 @@ func (hd *Handle) flush() error { if !hd.wasModified { return nil } + start := time.Now() r := bytes.NewReader(hd.data) if err := hd.m.fs.Stage(hd.fd.Path(), r); err != nil { return errorize("handle-flush", err) } + log.Infof("fuse: Staged `%s` %d bytes with troughput %.2f MB/s", hd.fd.Path(), len(hd.data), float64(len(hd.data))/time.Since(start).Seconds()/1.0e6) hd.wasModified = false notifyChange(hd.m, 500*time.Millisecond) From 1ae191f459e913aed338186b06807640a2f5271f Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 23:41:11 -0500 Subject: [PATCH 10/22] better debugging: log with fields does not preserve fields order :( --- fuse/handle.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/fuse/handle.go b/fuse/handle.go index cd2c2d72..bdc85c7d 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -75,11 +75,12 @@ func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Re defer hd.mu.Unlock() defer logPanic("handle: read") - // log.WithFields(log.Fields{ - // "path": hd.fd.Path(), - // "offset": req.Offset, - // "size": req.Size, - // }).Debugf("fuse: handle: read") + log.Debugf( + "fuse-Read: %s (off: %d size: %d)", + hd.fd.Path(), + req.Offset, + req.Size, + ) // if we have writers we can supply response from the write data buffer if hd.writers != 0 { @@ -120,7 +121,7 @@ func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. defer logPanic("handle: write") log.Debugf( - "fuse-write: %s (off: %d size: %d)", + "fuse-Write: %s (off: %d size: %d)", hd.fd.Path(), req.Offset, len(req.Data), From bbf29ae31e0d833fb686db231718194e6e4ef669 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 23:48:22 -0500 Subject: [PATCH 11/22] more intensive debugging --- fuse/file.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/fuse/file.go b/fuse/file.go index 5a26a16a..7652e3cf 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -34,6 +34,7 @@ type File struct { // Attr is called to get the stat(2) attributes of a file. func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error { defer logPanic("file: attr") + log.Debugf("fuse-file-attr: %v", fi.path) info, err := fi.m.fs.Stat(fi.path) if err != nil { @@ -83,6 +84,7 @@ func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error { func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { defer logPanic("file: open") debugLog("fuse-open: %s", fi.path) + log.Debugf("fuse-file-open: %v with request %v", fi.path, req) // Check if the file is actually available locally. if fi.m.options.Offline { @@ -135,11 +137,11 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open // file, the size change is noticed here before Open() is called. func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { defer logPanic("file: setattr") + log.Debugf("fuse-file-setattr: request %v", req) // This is called when any attribute of the file changes, // most importantly the file size. For example it is called when truncating // the file to zero bytes with a size change of `0`. - debugLog("exec file setattr") switch { case req.Valid&fuse.SetattrSize != 0: if err := fi.hd.truncate(req.Size); err != nil { @@ -158,8 +160,8 @@ func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus // Currently, fsync is completely ignored. func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { defer logPanic("file: fsync") + log.Debugf("fuse-file-fsync: %v", fi.path) - debugLog("exec file fsync") return nil } @@ -181,8 +183,8 @@ func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f // Listxattr is called to list all xattrs of this file. func (fi *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { defer logPanic("file: listxattr") + log.Debugf("fuse-file-listxattr: %v", fi.path) - debugLog("exec file listxattr") resp.Xattr = listXattr(req.Size) return nil } @@ -190,6 +192,7 @@ func (fi *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp // Readlink reads a symbolic link. // This call is triggered when OS tries to see where symlink points func (fi *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { + log.Debugf("fuse-file-readlink: %v", fi.path) info, err := fi.m.fs.Stat(fi.path) if err != nil { return "/brig/backend/ipfs/", err From 1f4ddedd2ebe032089b3e251da479558c9403984 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Tue, 19 Jan 2021 23:49:06 -0500 Subject: [PATCH 12/22] fuse: added resp.Flags |= fuse.OpenKeepCache This buffers files at the OS level, so consequent reads are instantaneous (as long as they are cached) --- fuse/file.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fuse/file.go b/fuse/file.go index 7652e3cf..0fabeea0 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -113,6 +113,7 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open if req.Flags.IsReadOnly() { // we don't need to track read-only handles // and no need to set handle `data` + resp.Flags |= fuse.OpenKeepCache return fi.hd, nil } @@ -129,6 +130,7 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open return nil, errorize(fi.path, ErrTooManyWriters) } fi.hd.writers++ + resp.Flags |= fuse.OpenKeepCache return fi.hd, nil } From 7c83f7d1018b478a2f13306095d97fe486c59a64 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Thu, 21 Jan 2021 23:37:54 -0500 Subject: [PATCH 13/22] Overlay improvement: Do not call for seek if the pointer already at the required position --- catfs/mio/overlay/overlay.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/catfs/mio/overlay/overlay.go b/catfs/mio/overlay/overlay.go index 9826d593..50630b61 100644 --- a/catfs/mio/overlay/overlay.go +++ b/catfs/mio/overlay/overlay.go @@ -342,6 +342,11 @@ func (l *Layer) Seek(offset int64, whence int) (int64, error) { l.limit = newPos } + if l.pos == newPos { + // very likely it sequent read/write request + // no need to seek since we already pointing to the right position + return l.pos, nil + } l.pos = newPos // Silence EOF: From 6b90be0478e1f4d1b4b0346dac9eaae59562c737 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Fri, 22 Jan 2021 22:14:32 -0500 Subject: [PATCH 14/22] added WritaAt method to avoid unneeded Seek during writes --- catfs/handle.go | 23 +++++++++++++++++++++++ catfs/mio/overlay/overlay.go | 18 ++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/catfs/handle.go b/catfs/handle.go index cbcaf074..d5e82ced 100644 --- a/catfs/handle.go +++ b/catfs/handle.go @@ -144,6 +144,29 @@ func (hdl *Handle) Write(buf []byte) (int, error) { return n, nil } +// Writes data from `buf` at offset `off` counted from the start (0 offset). +// Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt +func (hdl *Handle) WriteAt(buf []byte, off int64) (n int, err error) { + hdl.lock.Lock() + defer hdl.lock.Unlock() + + if hdl.readOnly { + return 0, ErrReadOnly + } + + if hdl.isClosed { + return 0, ErrIsClosed + } + + if err := hdl.initStreamIfNeeded(); err != nil { + return 0, err + } + + hdl.wasModified = true + n, err = hdl.layer.WriteAt(buf, off) + return n, err +} + // Seek will jump to the `offset` relative to `whence`. // There next read and write operation will start from this point. func (hdl *Handle) Seek(offset int64, whence int) (int64, error) { diff --git a/catfs/mio/overlay/overlay.go b/catfs/mio/overlay/overlay.go index 50630b61..2a47fb29 100644 --- a/catfs/mio/overlay/overlay.go +++ b/catfs/mio/overlay/overlay.go @@ -228,6 +228,24 @@ func (l *Layer) Write(buf []byte) (int, error) { return len(buf), nil } +// Writes data from `buf` at offset `off` counted from the start (0 offset). +// Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt +func (l *Layer) WriteAt(buf []byte, off int64) (n int, err error) { + // Copy the buffer, since we cannot rely on it being valid forever. + modBuf := make([]byte, len(buf)) + copy(modBuf, buf) + + l.index.Add(&Modification{off, modBuf}) + end := off + int64(len(buf)) + if l.limit >= 0 && end > l.limit { + l.limit = end + } + + n = len(buf) + err = nil + return n, nil +} + // hasGaps checks if overlays occludes all bytes between `start` and `end` func hasGaps(overlays []Interval, start, end int64) bool { diff := end - start From d0b3f6497a5bf2d3ba1e32b9a611b9da6b738c92 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Fri, 22 Jan 2021 23:38:22 -0500 Subject: [PATCH 15/22] fuse uses overlay for writes Simplifies writing infrastructure by getting rid of internal write buffer. Everything is done with stream overlay. --- fuse/file.go | 27 +----------- fuse/handle.go | 117 +++++++++++++------------------------------------ 2 files changed, 33 insertions(+), 111 deletions(-) diff --git a/fuse/file.go b/fuse/file.go index 0fabeea0..b20dd418 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -56,11 +56,7 @@ func (fi *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Mode = os.ModeSymlink | filePerm } } - if fi.hd != nil && fi.hd.writers > 0 { - attr.Size = uint64(len(fi.hd.data)) - } else { - attr.Size = info.Size - } + attr.Size = info.Size attr.Mtime = info.ModTime attr.Inode = info.Inode @@ -105,31 +101,12 @@ func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Open fi.mu.Lock() if fi.hd == nil { - hd := Handle{fd: fd, m: fi.m, writers: 0, wasModified: false, currentFileReadOffset: -1} + hd := Handle{fd: fd, m: fi.m, wasModified: false, currentFileReadOffset: -1} fi.hd = &hd } fi.hd.fd = fd fi.mu.Unlock() - if req.Flags.IsReadOnly() { - // we don't need to track read-only handles - // and no need to set handle `data` - resp.Flags |= fuse.OpenKeepCache - return fi.hd, nil - } - // for writers we need to copy file data to the handle `data` - fi.hd.mu.Lock() - defer fi.hd.mu.Unlock() - if fi.hd.writers == 0 { - err = fi.hd.loadData(fi.path) - if err != nil { - return nil, errorize("file-open-loadData", err) - } - } - if fi.hd.writers == (^uint(0)) { // checks against writers overflow - return nil, errorize(fi.path, ErrTooManyWriters) - } - fi.hd.writers++ resp.Flags |= fuse.OpenKeepCache return fi.hd, nil } diff --git a/fuse/handle.go b/fuse/handle.go index bdc85c7d..f5a525aa 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -3,7 +3,6 @@ package fuse import ( - "bytes" "io" "sync" "syscall" @@ -13,62 +12,19 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" - "bazil.org/fuse/fuseutil" "github.com/sahib/brig/catfs" log "github.com/sirupsen/logrus" ) // Handle is an open Entry. type Handle struct { - mu sync.Mutex - fd *catfs.Handle - m *Mount - // number of write-capable handles currently open - writers uint - // only valid if writers > 0, data used as a buffer for write operations - data []byte + mu sync.Mutex + fd *catfs.Handle + m *Mount wasModified bool currentFileReadOffset int64 } -func (hd *Handle) loadData(path string) error { - // Reads the whole file into the memory buffer - log.Debug("fuse: loadData for ", path) - hd.data = nil - hd.wasModified = false - start := time.Now() - // rewind to start - newOff, err := hd.fd.Seek(0, io.SeekStart) - if err != nil { - return errorize("handle-loadData-seek", err) - } - if newOff != 0 { - log.Warningf("seek offset differs (want %d, got %d)", 0, newOff) - return errorize("handle-loadData-seek", err) - } - // TODO make large buffer, right now there is a problem - // with the underlying Read(buf) which seems to be - // limitedStream with the default size of 64kB. - // so there is no way to read file faster than in 64kB chunks - var bufSize int = 64 * 1024 - buf := make([]byte, bufSize) - var data []byte - for { - n, err := hd.fd.Read(buf) - isEOF := (err == io.ErrUnexpectedEOF || err == io.EOF) - if err != nil && !isEOF { - return errorize("file-loadData", err) - } - data = append(data, buf[:n]...) - if isEOF && n == 0 { - break - } - } - hd.data = data - log.Infof("fuse: loadData buffered `%s` %d bytes with troughput %.2f MB/s", path, len(hd.data), float64(len(hd.data))/time.Since(start).Seconds()/1.0e6) - return nil -} - // Read is called to read a block of data at a certain offset. func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { hd.mu.Lock() @@ -82,13 +38,6 @@ func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Re req.Size, ) - // if we have writers we can supply response from the write data buffer - if hd.writers != 0 { - fuseutil.HandleRead(req, resp, hd.data) - return nil - } - - // otherwise we will read from the brig file system directly newOff := hd.currentFileReadOffset if req.Offset != hd.currentFileReadOffset { var err error @@ -116,6 +65,9 @@ const maxInt = int(^uint(0) >> 1) // Write is called to write a block of data at a certain offset. func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + start := time.Now() + // Note: even when underlying process makes consequent writes, + // the kernel might send blocks out of order!!! hd.mu.Lock() defer hd.mu.Unlock() defer logPanic("handle: write") @@ -127,21 +79,33 @@ func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. len(req.Data), ) - // expand the buffer if necessary - newLen := req.Offset + int64(len(req.Data)) - if newLen > int64(maxInt) { - return fuse.Errno(syscall.EFBIG) + // Offset seems to be always provided from the start (i.e. 0) + n, err := hd.WriteAt(req.Data, req.Offset) + resp.Size = n + if err != nil { + return errorize("handle-write-io", err) } - if newLen := int(newLen); newLen > len(hd.data) { - hd.data = append(hd.data, make([]byte, newLen-len(hd.data))...) + if n != len(req.Data) { + log.Panicf("written amount %d is not equal to requested %d", n, len(req.Data)) + return err } - - n := copy(hd.data[req.Offset:], req.Data) + log.Infof("fuse: Write time %v for %d bytes", time.Since(start), n) hd.wasModified = true - resp.Size = n return nil } +// Writes data from `buf` at offset `off` counted from the start (0 offset). +// Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt +// Main idea is not bother with Seek pointer, since underlying `overlay` works +// with intervals in memory and we do not need to `Seek` the backend which is very time expensive. +func (hd *Handle) WriteAt(buf []byte, off int64) (n int, err error) { + n, err = hd.fd.WriteAt(buf, off) + if n != len(buf) || err != nil { + log.Errorf("fuse: were not able to save %d bytes at offset %d", len(buf), off) + } + return n, err +} + // Flush is called to make sure all written contents get synced to disk. func (hd *Handle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return hd.flush() @@ -159,11 +123,10 @@ func (hd *Handle) flush() error { return nil } start := time.Now() - r := bytes.NewReader(hd.data) - if err := hd.m.fs.Stage(hd.fd.Path(), r); err != nil { + if err := hd.fd.Flush(); err != nil { return errorize("handle-flush", err) } - log.Infof("fuse: Staged `%s` %d bytes with troughput %.2f MB/s", hd.fd.Path(), len(hd.data), float64(len(hd.data))/time.Since(start).Seconds()/1.0e6) + log.Infof("fuse: Flashed `%s` in %v", hd.fd.Path(), time.Since(start)) hd.wasModified = false notifyChange(hd.m, 500*time.Millisecond) @@ -185,13 +148,6 @@ func (hd *Handle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { return errorize("handle-release", err) } - hd.mu.Lock() - defer hd.mu.Unlock() - - hd.writers-- - if hd.writers == 0 { - hd.data = nil - } return nil } @@ -199,20 +155,9 @@ func (hd *Handle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { func (hd *Handle) truncate(size uint64) error { log.Debugf("fuse-truncate: %v to size %d", hd.fd.Path(), size) defer logPanic("handle: truncate") + err := hd.fd.Truncate(size) - if size > uint64(maxInt) { - return fuse.Errno(syscall.EFBIG) - } - newLen := int(size) - switch { - case newLen > len(hd.data): - hd.data = append(hd.data, make([]byte, newLen-len(hd.data))...) - hd.wasModified = true - case newLen < len(hd.data): - hd.data = hd.data[:newLen] - hd.wasModified = true - } - return nil + return err } // Poll checks that the handle is ready for I/O or not From 45f264d5f9fb91ab718592266bb9d42b21d057b4 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 00:01:26 -0500 Subject: [PATCH 16/22] gofmt fixes --- fuse/mount.go | 2 +- fuse/util.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fuse/mount.go b/fuse/mount.go index 957537c1..9a885775 100644 --- a/fuse/mount.go +++ b/fuse/mount.go @@ -68,7 +68,7 @@ func NewMount(cfs *catfs.FS, mountpoint string, notifier Notifier, opts MountOpt fuse.Subtype("brig"), fuse.AllowNonEmptyMount(), // enabling MaxReadahead double or even triple Read throughput 12MB/s -> 25 or 33 MB/s - fuse.MaxReadahead(128*1024), // kernel uses at max 128kB = 131072B + fuse.MaxReadahead(128 * 1024), // kernel uses at max 128kB = 131072B // enabling WritebackCache doubles write speed to buffer 12MB/s -> 24MB/s fuse.WritebackCache(), // writes will happen in mach large blocks 128kB instead of 8kB } diff --git a/fuse/util.go b/fuse/util.go index b15db589..4854a1e9 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -51,7 +51,7 @@ func listXattr(size uint32) []byte { func isKnownAttribute(name string) bool { reqSize := uint32(1024) knownAttrs := bytes.Split(listXattr(reqSize), []byte{0}) - for _, attr := range(knownAttrs) { + for _, attr := range knownAttrs { if string(attr) == name { return true } From 03e48e6425b1df2809a5fee8d0ae40c1e3392701 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 14:01:05 -0500 Subject: [PATCH 17/22] WriteAt -> writeAt to indicate that it is not called by `fuse` but is inner helper --- fuse/handle.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fuse/handle.go b/fuse/handle.go index f5a525aa..e551b81e 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -80,7 +80,7 @@ func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. ) // Offset seems to be always provided from the start (i.e. 0) - n, err := hd.WriteAt(req.Data, req.Offset) + n, err := hd.writeAt(req.Data, req.Offset) resp.Size = n if err != nil { return errorize("handle-write-io", err) @@ -98,7 +98,7 @@ func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse. // Mimics `WriteAt` from `io` package https://golang.org/pkg/io/#WriterAt // Main idea is not bother with Seek pointer, since underlying `overlay` works // with intervals in memory and we do not need to `Seek` the backend which is very time expensive. -func (hd *Handle) WriteAt(buf []byte, off int64) (n int, err error) { +func (hd *Handle) writeAt(buf []byte, off int64) (n int, err error) { n, err = hd.fd.WriteAt(buf, off) if n != len(buf) || err != nil { log.Errorf("fuse: were not able to save %d bytes at offset %d", len(buf), off) From d3aad7af412cfc5f58ccf8bdbe7766eb6fc39adc Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 14:06:08 -0500 Subject: [PATCH 18/22] improved comment about Write request not coming in `fifo` order --- fuse/handle.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fuse/handle.go b/fuse/handle.go index e551b81e..ba189cae 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -64,10 +64,12 @@ func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Re const maxInt = int(^uint(0) >> 1) // Write is called to write a block of data at a certain offset. +// Note: do not assume that Write requests come in `fifo` order from the OS level!!! +// I.e. during `cp largeFile /brig-fuse-mount/newFile` +// the kernel might occasionally send write requests with blocks out of order!!! +// In other words stream-like optimizations are not possible . func (hd *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { start := time.Now() - // Note: even when underlying process makes consequent writes, - // the kernel might send blocks out of order!!! hd.mu.Lock() defer hd.mu.Unlock() defer logPanic("handle: write") From 984107a946f730fd9a1fab086cc76d0be655d383 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 17:15:17 -0500 Subject: [PATCH 19/22] Removed unused maxInt constant --- fuse/handle.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/fuse/handle.go b/fuse/handle.go index ba189cae..6fb3e846 100644 --- a/fuse/handle.go +++ b/fuse/handle.go @@ -61,8 +61,6 @@ func (hd *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Re return nil } -const maxInt = int(^uint(0) >> 1) - // Write is called to write a block of data at a certain offset. // Note: do not assume that Write requests come in `fifo` order from the OS level!!! // I.e. during `cp largeFile /brig-fuse-mount/newFile` From ce915cef093ee6ceb9ed54dde532036b72347ea9 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 18:38:41 -0500 Subject: [PATCH 20/22] bug fix: the response to Xattribute must take as much as needed!!! Pretty much 1st call always comes with req.Size=0. Our job to respond with all we have. If response resp.Xattr does not fit to a specified buffer req.Size, fuse will internally truncate it (see https://github.com/bazil/fuse/blob/fb710f7dfd05053a3bc9516dd5a7a8f84ead8aab/fuse.go#L1656 and similar https://github.com/bazil/fuse/blob/fb710f7dfd05053a3bc9516dd5a7a8f84ead8aab/fuse.go#L1618) and relay it to the caller with ERANGE error (buffer is too small). The caller should ask again with a larger buffer. From other hand truncation to zero is a disaster, since the caller assumes that there are no Xattributes. Note that apparently, even if the call comes with allowed req.Siz=0, internally it has some small buffer. So our first reply often fits and the call ends without error. --- fuse/directory.go | 6 ++++-- fuse/file.go | 4 +++- fuse/util.go | 16 ++++------------ 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/fuse/directory.go b/fuse/directory.go index 043525de..0a70c308 100644 --- a/fuse/directory.go +++ b/fuse/directory.go @@ -211,12 +211,14 @@ func (dir *Directory) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, r return nil } -// Listxattr is called to list all xattrs of this file. +// Listxattr is called to list all xattrs of this dir func (dir *Directory) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { defer logPanic("dir: listxattr") debugLog("exec dir listxattr") - resp.Xattr = listXattr(req.Size) + // Do not worry about req.Size + // fuse will cut it to allowed size and report to the caller that buffer need to be larger + resp.Xattr = listXattr() return nil } diff --git a/fuse/file.go b/fuse/file.go index b20dd418..09388574 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -164,7 +164,9 @@ func (fi *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp defer logPanic("file: listxattr") log.Debugf("fuse-file-listxattr: %v", fi.path) - resp.Xattr = listXattr(req.Size) + // Do not worry about req.Size + // fuse will cut it to allowed size and report to the caller that buffer need to be larger + resp.Xattr = listXattr() return nil } diff --git a/fuse/util.go b/fuse/util.go index 4854a1e9..22a1a926 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -34,23 +34,18 @@ func logPanic(name string) { } } -func listXattr(size uint32) []byte { +func listXattr() []byte { resp := []byte{} resp = append(resp, "user.brig.hash\x00"...) resp = append(resp, "user.brig.content\x00"...) resp = append(resp, "user.brig.pinned\x00"...) resp = append(resp, "user.brig.explicitly_pinned\x00"...) - if uint32(len(resp)) > size { - resp = resp[:size] - } - return resp } func isKnownAttribute(name string) bool { - reqSize := uint32(1024) - knownAttrs := bytes.Split(listXattr(reqSize), []byte{0}) + knownAttrs := bytes.Split(listXattr(), []byte{0}) for _, attr := range knownAttrs { if string(attr) == name { return true @@ -92,11 +87,8 @@ func getXattr(cfs *catfs.FS, name, path string, size uint32) ([]byte, error) { return nil, fuse.ErrNoXattr } - // Truncate if less bytes were requested for some reason: - if uint32(len(resp)) > size { - resp = resp[:size] - } - + // Do not worry about req.Size + // fuse will cut it to allowed size and report to the caller that buffer need to be larger return resp, nil } From c940779f19b78ab168b220426d1c657e7aa731eb Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 22:32:30 -0500 Subject: [PATCH 21/22] redone getXattr and listXattr via xattrMap, also change hash related names --- fuse/directory.go | 5 ++- fuse/file.go | 5 ++- fuse/util.go | 77 +++++++++++++++++++++-------------------------- 3 files changed, 42 insertions(+), 45 deletions(-) diff --git a/fuse/directory.go b/fuse/directory.go index 0a70c308..e10331a2 100644 --- a/fuse/directory.go +++ b/fuse/directory.go @@ -202,7 +202,10 @@ func (dir *Directory) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, r defer logPanic("dir: getxattr") debugLog("exec dir getxattr: %v: %v", dir.path, req.Name) - xattrs, err := getXattr(dir.m.fs, req.Name, dir.path, req.Size) + + // Do not worry about req.Size + // fuse will cut it to allowed size and report to the caller that buffer need to be larger + xattrs, err := getXattr(dir.m.fs, req.Name, dir.path) if err != nil { return err } diff --git a/fuse/file.go b/fuse/file.go index 09388574..f71437a9 100644 --- a/fuse/file.go +++ b/fuse/file.go @@ -150,7 +150,10 @@ func (fi *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f // log.Debugf("fuse-file-getxattr %v for atribute %v", fi.path, req.Name) // debugLog("exec file getxattr: %v: %v", fi.path, req.Name) - xattrs, err := getXattr(fi.m.fs, req.Name, fi.path, req.Size) + + // Do not worry about req.Size + // fuse will cut it to allowed size and report to the caller that buffer need to be larger + xattrs, err := getXattr(fi.m.fs, req.Name, fi.path) if err != nil { return err } diff --git a/fuse/util.go b/fuse/util.go index 22a1a926..e19acd58 100644 --- a/fuse/util.go +++ b/fuse/util.go @@ -3,7 +3,6 @@ package fuse import ( - "bytes" "time" "bazil.org/fuse" @@ -34,28 +33,45 @@ func logPanic(name string) { } } +type xattrGetter func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) + +var xattrMap = map[string]xattrGetter{ + "user.brig.hash.content": func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) { + return []byte(info.ContentHash.B58String()), nil + }, + "user.brig.hash.tree": func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) { + return []byte(info.TreeHash.B58String()), nil + }, + "user.brig.hash.backend": func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) { + return []byte(info.BackendHash.B58String()), nil + }, + "user.brig.pinned": func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) { + if info.IsPinned { + return []byte("yes"), nil + } + return []byte("no"), nil + }, + "user.brig.explicitly_pinned": func(cfs *catfs.FS, info *catfs.StatInfo) ([]byte, error) { + if info.IsExplicit { + return []byte("yes"), nil + } + return []byte("no"), nil + }, +} + func listXattr() []byte { resp := []byte{} - resp = append(resp, "user.brig.hash\x00"...) - resp = append(resp, "user.brig.content\x00"...) - resp = append(resp, "user.brig.pinned\x00"...) - resp = append(resp, "user.brig.explicitly_pinned\x00"...) + for k, _ := range xattrMap { + resp = append(resp, k...) + resp = append(resp, '\x00') + } return resp } -func isKnownAttribute(name string) bool { - knownAttrs := bytes.Split(listXattr(), []byte{0}) - for _, attr := range knownAttrs { - if string(attr) == name { - return true - } - } - return false -} - -func getXattr(cfs *catfs.FS, name, path string, size uint32) ([]byte, error) { - if !isKnownAttribute(name) { +func getXattr(cfs *catfs.FS, name, path string) ([]byte, error) { + handler, ok := xattrMap[name] + if !ok { return nil, fuse.ErrNoXattr } @@ -64,32 +80,7 @@ func getXattr(cfs *catfs.FS, name, path string, size uint32) ([]byte, error) { return nil, errorize("getxattr", err) } - resp := []byte{} - - switch name { - case "user.brig.hash": - resp = []byte(info.TreeHash.B58String()) - case "user.brig.content": - resp = []byte(info.ContentHash.B58String()) - case "user.brig.pinned": - if info.IsPinned { - resp = []byte("yes") - } else { - resp = []byte("no") - } - case "user.brig.explicitly_pinned": - if info.IsExplicit { - resp = []byte("yes") - } else { - resp = []byte("no") - } - default: - return nil, fuse.ErrNoXattr - } - - // Do not worry about req.Size - // fuse will cut it to allowed size and report to the caller that buffer need to be larger - return resp, nil + return handler(cfs, info) } func notifyChange(m *Mount, d time.Duration) { From 05b5737016fdc223b892d8a624611813f4182ef5 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 23 Jan 2021 23:44:18 -0500 Subject: [PATCH 22/22] added test for extended attributes --- fuse/fuse_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/fuse/fuse_test.go b/fuse/fuse_test.go index 5bfab539..38df2129 100644 --- a/fuse/fuse_test.go +++ b/fuse/fuse_test.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "path/filepath" + "syscall" "testing" "github.com/sahib/brig/catfs" @@ -338,6 +339,55 @@ func TestRead(t *testing.T) { }) } +func TestFileXattr(t *testing.T) { + withMount(t, MountOptions{}, func(ctx context.Context, control *spawntest.Control, mount *mountInfo) { + size := int64(4) + helloData := testutil.CreateDummyBuf(size) + + // Add a simple file: + catfsFilePath := fmt.Sprintf("/hello_from_catfs_%d", size) + req := catfsPayload{Path: catfsFilePath, Data: helloData} + require.NoError(t, control.JSON("/catfsStage").Call(ctx, req, ¬hing{})) + checkCatfsFileContent(ctx, t, control, catfsFilePath, helloData) + + fuseFilePath := filepath.Join(mount.Dir, catfsFilePath) + + // no let's see all the extended attributes list + response := make([]byte, 1024*4) // large buffer to fit everything + sz, err := syscall.Listxattr(fuseFilePath, response) + require.NoError(t, err) + response = response[:sz] + receivedAttrs := bytes.Split(response, []byte{0}) + // every response should belong to valid attributes + for _, attr := range receivedAttrs { + if len(attr) == 0 { + // protecting against empty chunk after split delimiter + continue + } + _, ok := xattrMap[string(attr)] + require.Truef(t, ok, "Invalid extended attribute '%s'", attr) + } + // every valid attribute should be in received Attrs list + for attr, _ := range xattrMap { + require.Containsf(t, receivedAttrs, []uint8(attr), "Received attributes are missing '%s'", attr) + } + // now let's check some attributes values + // Note hashes are hard without direct access to catfs + // which is accessed in different process + response = make([]byte, 64) // large buffer to fit everything + sz, err = syscall.Getxattr(fuseFilePath, "user.brig.pinned", response) + require.NoError(t, err) + response = response[:sz] + require.Equal(t, "yes", string(response)) + + response = make([]byte, 64) // large buffer to fit everything + sz, err = syscall.Getxattr(fuseFilePath, "user.brig.explicitly_pinned", response) + require.NoError(t, err) + response = response[:sz] + require.Equal(t, "no", string(response)) + }) +} + func TestWrite(t *testing.T) { withMount(t, MountOptions{}, func(ctx context.Context, control *spawntest.Control, mount *mountInfo) { for _, size := range DataSizes {