8000 Add option to skip unpacking the same layer in parallel by dcantah · Pull Request #33 · kevpar/containerd · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add option to skip unpacking the same layer in parallel #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ type RemoteContext struct {
// AllMetadata downloads all manifests and known-configuration files
AllMetadata bool

// DisableSameLayerUnpack prevents a parallel unpack of the same image layer and will wait for the first
// in line to complete before either returning the error if there was one, or returning ErrAlreadyExists.
DisableSameLayerUnpack bool

// ChildLabelMap sets the labels used to reference child objects in the content
// store. By default, all GC reference labels will be set for all fetched content.
ChildLabelMap func(ocispec.Descriptor) []string
Expand Down
11 changes: 11 additions & 0 deletions client_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,14 @@ func WithAllMetadata() RemoteOpt {
return nil
}
}

// WithDisableSameLayerUnpack sets the option that disallows Containerd from unpacking the same layer in parallel. This helps de-duplicate work
// if pulling multiple images that share layers.
func WithDisableSameLayerUnpack() RemoteOpt {
return func(client *Client, c *RemoteContext) error {
c.DisableSameLayerUnpack = true
c.SnapshotterOpts = append(c.SnapshotterOpts,
snapshots.WithLabels(map[string]string{"containerd.io/snapshot/disable-same-unpack": ""}))
return nil
}
}
144 changes: 134 additions & 10 deletions metadata/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,77 @@ import (
)

const (
inheritedLabelsPrefix = "containerd.io/snapshot/"
labelSnapshotRef = "containerd.io/snapshot.ref"
inheritedLabelsPrefix = "containerd.io/snapshot/"
labelSnapshotRef = "containerd.io/snapshot.ref"
labelDisableSameUnpack = "containerd.io/snapshot/disable-same-unpack"
)

type inProgress struct {
entries sync.Map
}

func newInProgress() *inProgress {
return &inProgress{}
}

func (inp *inProgress) load(key string) (*broadCaster, bool) {
val, ok := inp.entries.Load(key)
if ok {
return val.(*broadCaster), true
}
return nil, false
}

func (inp *inProgress) loadOrStore(key string) (*broadCaster, bool) {
bc, ok := inp.entries.LoadOrStore(key, newBroadcaster())
return bc.(*broadCaster), ok
}

func (inp *inProgress) delete(key string) {
inp.entries.Delete(key)
}

type broadCaster struct {
c *sync.Cond
finished bool
err error
}

func newBroadcaster() *broadCaster {
return &broadCaster{
c: sync.NewCond(&sync.Mutex{}),
}
}

func (br *broadCaster) wait() error {
br.c.L.Lock()
defer br.c.L.Unlock()
// Check if the broadcaster has been "finished" which boils down to either us reaching `Commit` or `Remove`. It's a safeguard to protect
// a waiter getting into wait shortly after broadcast has been fired.
for !br.finished {
br.c.Wait()
}
return br.err
}

// broadcast invokes 'work' and then broadcasts to all waiters to wake up
func (br *broadCaster) broadcast(work func()) {
br.c.L.Lock()
defer br.c.L.Unlock()

work()
// Alert all waiters.
br.c.Broadcast()
}

type snapshotter struct {
snapshots.Snapshotter
name string
db *DB
l sync.RWMutex
// inp holds all active extraction snapshots before becoming committed/removed. This can be used to wait on the
// result of an unpack instead of doing the same work twice if one is already underway.
inp *inProgress
}

// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
Expand All @@ -55,6 +117,7 @@ func newSnapshotter(db *DB, name string, sn snapshots.Snapshotter) *snapshotter
Snapshotter: sn,
name: name,
db: db,
inp: newInProgress(),
}
}

Expand Down Expand Up @@ -281,35 +344,68 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
return s.createSnapshot(ctx, key, parent, true, opts)
}

func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshots.Opt) ([]mount.Mount, error) {
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshots.Opt) (_ []mount.Mount, err error) {
s.l.RLock()
defer s.l.RUnlock()

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
s.l.RUnlock()
return nil, err
}

var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
s.l.RUnlock()
return nil, err
}
}

if err := validateSnapshot(&base); err != nil {
s.l.RUnlock()
return nil, err
}

var (
target = base.Labels[labelSnapshotRef]
bparent string
bkey string
bopts = []snapshots.Opt{
target = base.Labels[labelSnapshotRef]
_, disableSameUnpack = base.Labels[labelDisableSameUnpack]
bparent string
bkey string
bopts = []snapshots.Opt{
snapshots.WithLabels(snapshots.FilterInheritedLabels(base.Labels)),
}
)

if disableSameUnpack {
bc, ok := s.inp.loadOrStore(key)
if ok {
// Wait for someone to broadcast that an active snapshot with the same key was either committed or removed.
s.l.RUnlock()
err := bc.wait()
// If the extraction snapshot we were waiting on wasn't removed, then it either
// 1. Succeeded and we return ErrAlreadyExists as the unpacker special cases this error and will check if it can find the commited
// snapshot.
// 2. Failed and we return the error as is.
if err == nil {
return nil, errdefs.ErrAlreadyExists
}
return nil, err
} else {
// Else this is the first snapshot with this key, make sure to broadcast any errors if we
// error out in this call.
defer func() {
if err != nil {
bc.broadcast(func() {
bc.err = err
bc.finished = true
s.inp.delete(key)
})
}
}()
}
}
defer s.l.RUnlock()

if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt, err := createSnapshotterBucket(tx, ns, s.name)
if err != nil {
Expand Down Expand Up @@ -492,10 +588,23 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return m, nil
}

func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) (err error) {
s.l.RLock()
defer s.l.RUnlock()

defer func() {
bc, ok := s.inp.load(key)
if ok {
bc.broadcast(func() {
// Set the error that commit will return and then broadcast to all waiters that a commit has successfully
// completed/failed.
bc.finished = true
bc.err = err
s.inp.delete(key)
})
}
}()

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -623,10 +732,25 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap

}

func (s *snapshotter) Remove(ctx context.Context, key string) error {
func (s *snapshotter) Remove(ctx context.Context, key string) (err error) {
s.l.RLock()
defer s.l.RUnlock()

defer func() {
// Broadcast to the waiters so we can return and try again. The unpacker logic special cases ErrAlreadyExists to handle remote
// snapshotters, but it makes sure that the snapshot actually does exist by statting the snapshot. If the stat fails, it will
// retry two more times. Whatever waiter ends up back in Prepare first will store the key and the others will go back to waiting
// until success/failure.
bc, ok := s.inp.load(key)
if ok {
bc.broadcast(func() {
bc.err = errdefs.ErrAlreadyExists
bc.finished = true
s.inp.delete(key)
})
}
}()

ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
Expand Down
15 changes: 14 additions & 1 deletion unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,15 @@ EachLayer:
)

for try := 1; try <= 3; try++ {
// If we want to avoid unpacking the same layer in parallel, we need to use a key format that will
// intentionally cause collisions for the same layer. This is how we'll know an active snapshot is
// underway already, along with intentional tracking in the metadata snapshotter layer.
if rCtx.DisableSameLayerUnpack {
key = fmt.Sprintf(snapshots.UnpackKeyPrefix+"-%s", chainID)
} else {
key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
}
// Prepare snapshot with from parent, label as root
key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
if err != nil {
if errdefs.IsAlreadyExists(err) {
Expand Down Expand Up @@ -189,9 +196,15 @@ EachLayer:

select {
case <-ctx.Done():
if rCtx.DisableSameLayerUnpack {
abort()
}
return ctx.Err()
case err := <-fetchErr:
if err != nil {
if rCtx.DisableSameLayerUnpack {
abort()
}
return err
}
case <-fetchC[i-fetchOffset]:
Expand Down
0