diff --git a/cmd/engine/main.go b/cmd/engine/main.go index e525525481..b49d639dea 100644 --- a/cmd/engine/main.go +++ b/cmd/engine/main.go @@ -324,6 +324,9 @@ func main() { //nolint:gocyclo logLevel = config.LevelDebug } } + fmt.Printf("ACB setting trace log\n") + logLevel = config.LevelExtraDebug + if logLevel != "" { slogLevel, err := logLevel.ToSlogLevel() if err != nil { diff --git a/core/container.go b/core/container.go index d34e32ed50..173d9755c9 100644 --- a/core/container.go +++ b/core/container.go @@ -659,7 +659,7 @@ func (container *Container) WithDirectory(ctx context.Context, subdir string, sr }) } -func (container *Container) WithFile(ctx context.Context, destPath string, src *File, permissions *int, owner string) (*Container, error) { +func (container *Container) WithFile(ctx context.Context, srv *dagql.Server, destPath string, src *File, permissions *int, owner string) (*Container, error) { container = container.Clone() dir, file := filepath.Split(filepath.Clean(destPath)) @@ -669,17 +669,18 @@ func (container *Container) WithFile(ctx context.Context, destPath string, src * return nil, err } - return dir.WithFile(ctx, file, src, permissions, ownership) + return dir.WithFile(ctx, srv, file, src, permissions, ownership) }) } -func (container *Container) WithoutPaths(ctx context.Context, destPaths ...string) (*Container, error) { +func (container *Container) WithoutPaths(ctx context.Context, srv *dagql.Server, destPaths ...string) (*Container, error) { container = container.Clone() + fmt.Printf("ACB WithoutPaths %v\n", destPaths) for _, destPath := range destPaths { var err error container, err = container.writeToPath(ctx, path.Dir(destPath), func(dir *Directory) (*Directory, error) { - return dir.Without(ctx, path.Base(destPath)) + return dir.Without(ctx, srv, path.Base(destPath)) }) if err != nil { return nil, err @@ -688,7 +689,7 @@ func (container *Container) WithoutPaths(ctx context.Context, destPaths ...strin return container, nil } -func (container *Container) WithFiles(ctx context.Context, destDir string, src []*File, permissions *int, owner string) (*Container, error) { +func (container *Container) WithFiles(ctx context.Context, srv *dagql.Server, destDir string, src []*File, permissions *int, owner string) (*Container, error) { container = container.Clone() dir, file := filepath.Split(filepath.Clean(destDir)) @@ -698,7 +699,7 @@ func (container *Container) WithFiles(ctx context.Context, destDir string, src [ return nil, err } - return dir.WithFiles(ctx, file, src, permissions, ownership) + return dir.WithFiles(ctx, srv, file, src, permissions, ownership) }) } @@ -1215,6 +1216,12 @@ func (container *Container) writeToPath(ctx context.Context, subdir string, fn f return nil, err } + if mount == nil { + dir.Result = container.FSResult + } else { + dir.Result = mount.Result + } + dir, err = fn(dir) if err != nil { return nil, err @@ -1695,6 +1702,7 @@ func (container *Container) WithoutExposedPort(port int, protocol NetworkProtoco func (container *Container) WithServiceBinding(ctx context.Context, svc dagql.Instance[*Service], alias string) (*Container, error) { container = container.Clone() + fmt.Printf("ACB WithServiceBinding\n") host, err := svc.Self.Hostname(ctx, svc.ID()) if err != nil { @@ -1714,6 +1722,7 @@ func (container *Container) WithServiceBinding(ctx context.Context, svc dagql.In }, }) + fmt.Printf("ACB WithServiceBinding returning\n") return container, nil } diff --git a/core/dagop.go b/core/dagop.go index 24254c7b38..f2c9e75025 100644 --- a/core/dagop.go +++ b/core/dagop.go @@ -310,6 +310,7 @@ func NewContainerDagOp( id *call.ID, ctr *Container, extraInputs []llb.State, + skipMeta bool, ) (*Container, error) { mounts, inputs, outputCount, err := getAllContainerMounts(ctr) if err != nil { @@ -338,7 +339,7 @@ func NewContainerDagOp( } ctr = ctr.Clone() - err = dagop.setAllContainerMounts(ctx, ctr, sts) + err = dagop.setAllContainerMounts(ctx, ctr, sts, skipMeta) if err != nil { return nil, err } @@ -598,7 +599,7 @@ func getAllContainerMounts(container *Container) (mounts []*pb.Mount, states []l // setAllContainerMounts is the reverse of getAllContainerMounts, and rewrites // the container mounts to the given states. -func (op *ContainerDagOp) setAllContainerMounts(ctx context.Context, container *Container, outputs []llb.State) error { +func (op *ContainerDagOp) setAllContainerMounts(ctx context.Context, container *Container, outputs []llb.State, skipMeta bool) error { for mountIdx, mount := range op.Mounts { if mount.Output == pb.SkipOutput { continue @@ -613,7 +614,9 @@ func (op *ContainerDagOp) setAllContainerMounts(ctx context.Context, container * case 0: container.FS = def.ToPB() case 1: - container.Meta = def.ToPB() + if !skipMeta { + container.Meta = def.ToPB() + } default: container.Mounts[mountIdx-2].Source = def.ToPB() } diff --git a/core/directory.go b/core/directory.go index 04e7cbcee4..e06eba4724 100644 --- a/core/directory.go +++ b/core/directory.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "io" "io/fs" "os" "path" @@ -13,6 +14,7 @@ import ( "time" containerdfs "github.com/containerd/continuity/fs" + fscopy "github.com/dagger/dagger/engine/sources/local/copy" bkcache "github.com/moby/buildkit/cache" bkclient "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" @@ -386,6 +388,7 @@ func (dir *Directory) Glob(ctx context.Context, pattern string) ([]string, error func (dir *Directory) WithNewFile(ctx context.Context, dest string, content []byte, permissions fs.FileMode, ownership *Ownership) (*Directory, error) { dir = dir.Clone() + fmt.Printf("ACB WithNewFile %v\n", dest) err := validateFileName(dest) if err != nil { @@ -530,8 +533,82 @@ func (dir *Directory) WithDirectory(ctx context.Context, destDir string, src *Di return dir, nil } +func copyFile(srcPath, dstPath string) (err error) { + srcStat, err := os.Stat(srcPath) + if err != nil { + return err + } + srcPerm := srcStat.Mode().Perm() + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + dst, err := os.OpenFile(dstPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, srcPerm) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = os.Remove(dstPath) + } + }() + defer func() { + if dst != nil { + dst.Close() + } + }() + _, err = io.Copy(dst, src) + if err != nil { + return err + } + err = dst.Close() + if err != nil { + return err + } + dst = nil + + modTime := srcStat.ModTime() + return os.Chtimes(dstPath, modTime, modTime) +} + +func isDir(path string) (bool, error) { + fi, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + return fi.Mode().IsDir(), nil +} + +type hack interface { + Evaluate(context.Context) (*buildkit.Result, error) +} + +func getRefOrEvaluate(ctx context.Context, ref bkcache.ImmutableRef, t hack) (bkcache.ImmutableRef, error) { + if ref != nil { + return ref, nil + } + res, err := t.Evaluate(ctx) + if err != nil { + return nil, err + } + cacheRef, err := res.SingleRef() + if err != nil { + return nil, err + } + if cacheRef == nil { + return nil, nil + } + return cacheRef.CacheRef(ctx) +} + func (dir *Directory) WithFile( ctx context.Context, + srv *dagql.Server, destPath string, src *File, permissions *int, @@ -539,37 +616,82 @@ func (dir *Directory) WithFile( ) (*Directory, error) { dir = dir.Clone() - destSt, err := dir.State() + srcCacheRef, err := getRefOrEvaluate(ctx, src.Result, src) if err != nil { return nil, err } - srcSt, err := src.State() + dirCacheRef, err := getRefOrEvaluate(ctx, dir.Result, dir) if err != nil { return nil, err } - if err := dir.SetState(ctx, mergeStates(mergeStateInput{ - Dest: destSt, - DestDir: path.Join(dir.Dir, path.Dir(destPath)), - DestFileName: path.Base(destPath), - Src: srcSt, - SrcDir: path.Dir(src.File), - SrcFileName: path.Base(src.File), - Permissions: permissions, - Owner: owner, - })); err != nil { + query, err := CurrentQuery(ctx) + if err != nil { return nil, err } - dir.Services.Merge(src.Services) - + destPath = path.Join(dir.Dir, destPath) + newRef, err := query.BuildkitCache().New(ctx, dirCacheRef, nil, bkcache.WithRecordType(bkclient.UsageRecordTypeRegular), + bkcache.WithDescription(fmt.Sprintf("withfile %s %s", destPath, src.File))) + if err != nil { + return nil, err + } + err = MountRef(ctx, newRef, nil, func(dirRoot string) error { + destPath, err := containerdfs.RootPath(dirRoot, destPath) + if err != nil { + return err + } + destIsDir, err := isDir(destPath) + if err != nil { + return err + } + if destIsDir { + _, srcFilename := filepath.Split(src.File) + destPath = path.Join(destPath, srcFilename) + } + destPathDir, _ := filepath.Split(destPath) + err = os.MkdirAll(filepath.Dir(destPathDir), 0755) + if err != nil { + return err + } + err = MountRef(ctx, srcCacheRef, nil, func(srcRoot string) error { + srcPath, err := containerdfs.RootPath(srcRoot, src.File) + if err != nil { + return err + } + return copyFile(srcPath, destPath) + }) + if err != nil { + return err + } + if permissions != nil { + if err := os.Chmod(destPath, os.FileMode(*permissions)); err != nil { + return fmt.Errorf("failed to set chmod %s: err", destPath) + } + } + if owner != nil { + if err := os.Chown(destPath, owner.UID, owner.GID); err != nil { + return fmt.Errorf("failed to set chown %s: err", destPath) + } + } + return nil + }) + if err != nil { + return nil, err + } + snap, err := newRef.Commit(ctx) + if err != nil { + return nil, err + } + dir.Result = snap return dir, nil } // TODO: address https://github.com/dagger/dagger/pull/6556/files#r1482830091 func (dir *Directory) WithFiles( ctx context.Context, + srv *dagql.Server, destDir string, src []*File, permissions *int, @@ -581,6 +703,7 @@ func (dir *Directory) WithFiles( for _, file := range src { dir, err = dir.WithFile( ctx, + srv, path.Join(destDir, path.Base(file.File)), file, permissions, @@ -753,28 +876,57 @@ func (dir *Directory) Diff(ctx context.Context, other *Directory) (*Directory, e return dir, nil } -func (dir *Directory) Without(ctx context.Context, paths ...string) (*Directory, error) { +func (dir *Directory) Without(ctx context.Context, srv *dagql.Server, paths ...string) (*Directory, error) { dir = dir.Clone() - st, err := dir.State() + parentRef, err := getRefOrEvaluate(ctx, dir.Result, dir) if err != nil { return nil, err } - var action *llb.FileAction - for _, path := range paths { - path = filepath.Join(dir.Dir, path) - if action == nil { - action = llb.Rm(path, llb.WithAllowWildcard(true), llb.WithAllowNotFound(true)) - } else { - action = action.Rm(path, llb.WithAllowWildcard(true), llb.WithAllowNotFound(true)) + query, err := CurrentQuery(ctx) + if err != nil { + return nil, err + } + + newRef, err := query.BuildkitCache().New(ctx, parentRef, nil, bkcache.WithRecordType(bkclient.UsageRecordTypeRegular), + bkcache.WithDescription(fmt.Sprintf("without TODO"))) + if err != nil { + return nil, err + } + err = MountRef(ctx, newRef, nil, func(root string) error { + for _, p := range paths { + p = path.Join(dir.Dir, p) + var matches []string + if strings.Contains(p, "*") { + matches, err = fscopy.ResolveWildcards(root, p, true) + if err != nil { + return err + } + } else { + matches = []string{p} + } + for _, m := range matches { + fullPath, err := containerdfs.RootPath(root, m) + if err != nil { + return err + } + err = os.RemoveAll(fullPath) + if err != nil { + return err + } + } } + return nil + }) + if err != nil { + return nil, err } - err = dir.SetState(ctx, st.File(action)) + snap, err := newRef.Commit(ctx) if err != nil { return nil, err } - + dir.Result = snap return dir, nil } @@ -833,24 +985,11 @@ func (dir *Directory) Root() (*Directory, error) { func (dir *Directory) WithSymlink(ctx context.Context, srv *dagql.Server, target, linkName string) (*Directory, error) { dir = dir.Clone() - res, err := dir.Evaluate(ctx) - if err != nil { - return nil, err - } - - ref, err := res.SingleRef() + parentRef, err := getRefOrEvaluate(ctx, dir.Result, dir) if err != nil { return nil, err } - var parentRef bkcache.ImmutableRef - if ref != nil { - parentRef, err = ref.CacheRef(ctx) - if err != nil { - return nil, err - } - } - query, err := CurrentQuery(ctx) if err != nil { return nil, err diff --git a/core/integration/directory_test.go b/core/integration/directory_test.go index 9675c69cff..45cf316cfa 100644 --- a/core/integration/directory_test.go +++ b/core/integration/directory_test.go @@ -435,6 +435,42 @@ func (DirectorySuite) TestWithFile(ctx context.Context, t *testctx.T) { require.NoError(t, err) require.Contains(t, stdout2, "rw-r--r--") }) + + t.Run("dir reference is kept", func(ctx context.Context, t *testctx.T) { + f := c.Directory().WithNewFile("some-file", "data").File("some-file") + + d2 := c.Directory(). + WithNewFile("some-other-file", "other-data"). + WithNewDirectory("some-dir"). + Directory("/some-dir"). + WithFile("f", f) + + // this should no longer be available, since dir.Dir should now be "/dir1" + _, err := d2.File("some-other-file").Contents(ctx) + require.Error(t, err) + + s, err := d2.File("f").Contents(ctx) + require.NoError(t, err) + require.Equal(t, "data", s) + }) + + for _, dst := range []string{".", "", "/"} { + t.Run(fmt.Sprintf("src filename is used dst is a directory referenced by %s", dst), func(ctx context.Context, t *testctx.T) { + f := c.Directory().WithNewFile("some-file", "data").File("some-file") + d := c.Directory().WithFile(dst, f) + s, err := d.File("some-file").Contents(ctx) + require.NoError(t, err) + require.Equal(t, "data", s) + }) + } + + t.Run("src filename (and not directory names) is used dst is empty", func(ctx context.Context, t *testctx.T) { + f := c.Directory().WithNewFile("sub/subterrain/some-file", "data").File("sub/subterrain/some-file") + d := c.Directory().WithFile("", f) + s, err := d.File("some-file").Contents(ctx) + require.NoError(t, err) + require.Equal(t, "data", s) + }) } func (DirectorySuite) TestWithFiles(ctx context.Context, t *testctx.T) { diff --git a/core/schema/container.go b/core/schema/container.go index 596225b49b..989232981c 100644 --- a/core/schema/container.go +++ b/core/schema/container.go @@ -330,7 +330,7 @@ func (s *containerSchema) Install() { `environment variables defined in the container (e.g. "/$VAR/foo").`), ), - dagql.Func("withFile", s.withFile). + dagql.NodeFunc("withFile", DagOpContainerWrapper(s.srv, s.withFile, true)). Doc(`Return a container snapshot with a file added`). Args( dagql.Arg("path").Doc(`Path of the new file. Example: "/path/to/new-file.txt"`), @@ -343,7 +343,7 @@ func (s *containerSchema) Install() { `environment variables defined in the container (e.g. "/$VAR/foo.txt").`), ), - dagql.Func("withoutFile", s.withoutFile). + dagql.NodeFunc("withoutFile", DagOpContainerWrapper(s.srv, s.withoutFile, true)). Doc(`Retrieves this container with the file at the given path removed.`). Args( dagql.Arg("path").Doc(`Location of the file to remove (e.g., "/file.txt").`), @@ -351,7 +351,7 @@ func (s *containerSchema) Install() { `environment variables defined in the container (e.g. "/$VAR/foo.txt").`), ), - dagql.Func("withoutFiles", s.withoutFiles). + dagql.NodeFunc("withoutFiles", DagOpContainerWrapper(s.srv, s.withoutFiles, true)). Doc(`Return a new container spanshot with specified files removed`). Args( dagql.Arg("paths").Doc(`Paths of the files to remove. Example: ["foo.txt, "/root/.ssh/config"`), @@ -359,7 +359,7 @@ func (s *containerSchema) Install() { `environment variables defined in the container (e.g. "/$VAR/foo.txt").`), ), - dagql.Func("withFiles", s.withFiles). + dagql.NodeFunc("withFiles", DagOpContainerWrapper(s.srv, s.withFiles, true)). Doc(`Retrieves this container plus the contents of the given files copied to the given path.`). Args( dagql.Arg("path").Doc(`Location where copied files should be placed (e.g., "/src").`), @@ -413,7 +413,7 @@ func (s *containerSchema) Install() { `environment variables defined in the container (e.g. "/$VAR/foo").`), ), - dagql.Func("withoutDirectory", s.withoutDirectory). + dagql.NodeFunc("withoutDirectory", DagOpContainerWrapper(s.srv, s.withoutDirectory, true)). Doc(`Return a new container snapshot, with a directory removed from its filesystem`). Args( dagql.Arg("path").Doc(`Location of the directory to remove (e.g., ".github/").`), @@ -463,7 +463,7 @@ func (s *containerSchema) Install() { ), ), - dagql.NodeFunc("withSymlink", DagOpContainerWrapper(s.srv, s.withSymlink)). + dagql.NodeFunc("withSymlink", DagOpContainerWrapper(s.srv, s.withSymlink, true)). Doc(`Return a snapshot with a symlink`). Args( dagql.Arg("target").Doc(`Location of the file or directory to link to (e.g., "/existing/file").`), @@ -743,6 +743,7 @@ type containerFromArgs struct { } func (s *containerSchema) from(ctx context.Context, parent dagql.Instance[*core.Container], args containerFromArgs) (inst dagql.Instance[*core.Container], _ error) { + fmt.Printf("ACB from %v\n", args) query, err := core.CurrentQuery(ctx) if err != nil { return inst, err @@ -1609,90 +1610,121 @@ type containerWithFileArgs struct { WithFileArgs Owner string `default:""` Expand bool `default:"false"` + + FSDagOpInternalArgs } -func (s *containerSchema) withFile(ctx context.Context, parent *core.Container, args containerWithFileArgs) (*core.Container, error) { +func (s *containerSchema) withFile(ctx context.Context, parent dagql.Instance[*core.Container], args containerWithFileArgs) (inst dagql.Instance[*core.Container], err error) { file, err := args.Source.Load(ctx, s.srv) if err != nil { - return nil, err + return inst, err } - path, err := expandEnvVar(ctx, parent, args.Path, args.Expand) + path, err := expandEnvVar(ctx, parent.Self, args.Path, args.Expand) if err != nil { - return nil, err + return inst, err } - return parent.WithFile(ctx, path, file.Self, args.Permissions, args.Owner) + ctr, err := parent.Self.WithFile(ctx, s.srv, path, file.Self, args.Permissions, args.Owner) + if err != nil { + return inst, err + } + + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, ctr) } type containerWithFilesArgs struct { WithFilesArgs Owner string `default:""` Expand bool `default:"false"` + + FSDagOpInternalArgs } -func (s *containerSchema) withFiles(ctx context.Context, parent *core.Container, args containerWithFilesArgs) (*core.Container, error) { +func (s *containerSchema) withFiles(ctx context.Context, parent dagql.Instance[*core.Container], args containerWithFilesArgs) (inst dagql.Instance[*core.Container], err error) { files := []*core.File{} for _, id := range args.Sources { file, err := id.Load(ctx, s.srv) if err != nil { - return nil, err + return inst, err } files = append(files, file.Self) } - path, err := expandEnvVar(ctx, parent, args.Path, args.Expand) + path, err := expandEnvVar(ctx, parent.Self, args.Path, args.Expand) if err != nil { - return nil, err + return inst, err } - return parent.WithFiles(ctx, path, files, args.Permissions, args.Owner) + ctr, err := parent.Self.WithFiles(ctx, s.srv, path, files, args.Permissions, args.Owner) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, ctr) } type containerWithoutDirectoryArgs struct { Path string Expand bool `default:"false"` + + FSDagOpInternalArgs } -func (s *containerSchema) withoutDirectory(ctx context.Context, parent *core.Container, args containerWithoutDirectoryArgs) (*core.Container, error) { - path, err := expandEnvVar(ctx, parent, args.Path, args.Expand) +func (s *containerSchema) withoutDirectory(ctx context.Context, parent dagql.Instance[*core.Container], args containerWithoutDirectoryArgs) (inst dagql.Instance[*core.Container], err error) { + fmt.Printf("ACB withoutDirectory here2\n") + path, err := expandEnvVar(ctx, parent.Self, args.Path, args.Expand) if err != nil { - return nil, err + return inst, err } - return parent.WithoutPaths(ctx, path) + ctr, err := parent.Self.WithoutPaths(ctx, s.srv, path) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, ctr) } type containerWithoutFileArgs struct { Path string Expand bool `default:"false"` + + FSDagOpInternalArgs } -func (s *containerSchema) withoutFile(ctx context.Context, parent *core.Container, args containerWithoutFileArgs) (*core.Container, error) { - path, err := expandEnvVar(ctx, parent, args.Path, args.Expand) +func (s *containerSchema) withoutFile(ctx context.Context, parent dagql.Instance[*core.Container], args containerWithoutFileArgs) (inst dagql.Instance[*core.Container], err error) { + path, err := expandEnvVar(ctx, parent.Self, args.Path, args.Expand) if err != nil { - return nil, err + return inst, err } - return parent.WithoutPaths(ctx, path) + ctr, err := parent.Self.WithoutPaths(ctx, s.srv, path) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, ctr) } type containerWithoutFilesArgs struct { Paths []string Expand bool `default:"false"` + + FSDagOpInternalArgs } -func (s *containerSchema) withoutFiles(ctx context.Context, parent *core.Container, args containerWithoutFilesArgs) (*core.Container, error) { +func (s *containerSchema) withoutFiles(ctx context.Context, parent dagql.Instance[*core.Container], args containerWithoutFilesArgs) (inst dagql.Instance[*core.Container], err error) { paths := args.Paths - var err error for i, p := range args.Paths { - paths[i], err = expandEnvVar(ctx, parent, p, args.Expand) + paths[i], err = expandEnvVar(ctx, parent.Self, p, args.Expand) if err != nil { - return nil, err + return inst, err } } - return parent.WithoutPaths(ctx, paths...) + ctr, err := parent.Self.WithoutPaths(ctx, s.srv, paths...) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, ctr) } type containerWithNewFileArgs struct { diff --git a/core/schema/directory.go b/core/schema/directory.go index dd94a90f0e..30f5a0a78d 100644 --- a/core/schema/directory.go +++ b/core/schema/directory.go @@ -65,14 +65,14 @@ func (s *directorySchema) Install() { Args( dagql.Arg("path").Doc(`Location of the file to retrieve (e.g., "README.md").`), ), - dagql.Func("withFile", s.withFile). + dagql.NodeFunc("withFile", DagOpDirectoryWrapper(s.srv, s.withFile, keepParentDir)). Doc(`Retrieves this directory plus the contents of the given file copied to the given path.`). Args( dagql.Arg("path").Doc(`Location of the copied file (e.g., "/file.txt").`), dagql.Arg("source").Doc(`Identifier of the file to copy.`), dagql.Arg("permissions").Doc(`Permission given to the copied file (e.g., 0600).`), ), - dagql.Func("withFiles", s.withFiles). + dagql.NodeFunc("withFiles", DagOpDirectoryWrapper(s.srv, s.withFiles, keepParentDir)). Doc(`Retrieves this directory plus the contents of the given files copied to the given path.`). Args( dagql.Arg("path").Doc(`Location where copied files should be placed (e.g., "/src").`), @@ -86,12 +86,12 @@ func (s *directorySchema) Install() { dagql.Arg("contents").Doc(`Contents of the new file. Example: "Hello world!"`), dagql.Arg("permissions").Doc(`Permissions of the new file. Example: 0600`), ), - dagql.Func("withoutFile", s.withoutFile). + dagql.NodeFunc("withoutFile", DagOpDirectoryWrapper(s.srv, s.withoutFile, keepParentDir)). Doc(`Return a snapshot with a file removed`). Args( dagql.Arg("path").Doc(`Path of the file to remove (e.g., "/file.txt").`), ), - dagql.Func("withoutFiles", s.withoutFiles). + dagql.NodeFunc("withoutFiles", DagOpDirectoryWrapper(s.srv, s.withoutFiles, keepParentDir)). Doc(`Return a snapshot with files removed`). Args( dagql.Arg("paths").Doc(`Paths of the files to remove (e.g., ["/file.txt"]).`), @@ -121,7 +121,7 @@ func (s *directorySchema) Install() { dagql.Arg("path").Doc(`Location of the directory created (e.g., "/logs").`), dagql.Arg("permissions").Doc(`Permission granted to the created directory (e.g., 0777).`), ), - dagql.Func("withoutDirectory", s.withoutDirectory). + dagql.NodeFunc("withoutDirectory", DagOpDirectoryWrapper(s.srv, s.withoutDirectory, keepParentDir)). Doc(`Return a snapshot with a subdirectory removed`). Args( dagql.Arg("path").Doc(`Path of the subdirectory to remove. Example: ".github/workflows"`), @@ -181,7 +181,7 @@ func (s *directorySchema) Install() { guarantees when using this option. It should only be used when absolutely necessary and only with trusted commands.`), ), - dagql.NodeFunc("withSymlink", DagOpDirectoryWrapper(s.srv, s.withSymlink, s.withSymlinkPath)). + dagql.NodeFunc("withSymlink", DagOpDirectoryWrapper(s.srv, s.withSymlink, keepParentDir)). Doc(`Return a snapshot with a symlink`). Args( dagql.Arg("target").Doc(`Location of the file or directory to link to (e.g., "/existing/file").`), @@ -324,58 +324,93 @@ type WithFileArgs struct { Path string Source core.FileID Permissions *int + + FSDagOpInternalArgs } -func (s *directorySchema) withFile(ctx context.Context, parent *core.Directory, args WithFileArgs) (*core.Directory, error) { +func (s *directorySchema) withFile(ctx context.Context, parent dagql.Instance[*core.Directory], args WithFileArgs) (inst dagql.Instance[*core.Directory], err error) { file, err := args.Source.Load(ctx, s.srv) if err != nil { - return nil, err + return inst, err } - return parent.WithFile(ctx, args.Path, file.Self, args.Permissions, nil) + dir, err := parent.Self.WithFile(ctx, s.srv, args.Path, file.Self, args.Permissions, nil) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) +} + +func keepParentDir[A any](_ context.Context, val dagql.Instance[*core.Directory], _ A) (string, error) { + return val.Self.Dir, nil } type WithFilesArgs struct { Path string Sources []core.FileID Permissions *int + + FSDagOpInternalArgs } -func (s *directorySchema) withFiles(ctx context.Context, parent *core.Directory, args WithFilesArgs) (*core.Directory, error) { +func (s *directorySchema) withFiles(ctx context.Context, parent dagql.Instance[*core.Directory], args WithFilesArgs) (inst dagql.Instance[*core.Directory], err error) { files := []*core.File{} for _, id := range args.Sources { file, err := id.Load(ctx, s.srv) if err != nil { - return nil, err + return inst, err } files = append(files, file.Self) } - return parent.WithFiles(ctx, args.Path, files, args.Permissions, nil) + dir, err := parent.Self.WithFiles(ctx, s.srv, args.Path, files, args.Permissions, nil) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) } type withoutDirectoryArgs struct { Path string + + FSDagOpInternalArgs } -func (s *directorySchema) withoutDirectory(ctx context.Context, parent *core.Directory, args withoutDirectoryArgs) (*core.Directory, error) { - return parent.Without(ctx, args.Path) +func (s *directorySchema) withoutDirectory(ctx context.Context, parent dagql.Instance[*core.Directory], args withoutDirectoryArgs) (inst dagql.Instance[*core.Directory], err error) { + fmt.Printf("ACB withoutDirectory here1\n") + dir, err := parent.Self.Without(ctx, s.srv, args.Path) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) } type withoutFileArgs struct { Path string + + FSDagOpInternalArgs } -func (s *directorySchema) withoutFile(ctx context.Context, parent *core.Directory, args withoutFileArgs) (*core.Directory, error) { - return parent.Without(ctx, args.Path) +func (s *directorySchema) withoutFile(ctx context.Context, parent dagql.Instance[*core.Directory], args withoutFileArgs) (inst dagql.Instance[*core.Directory], err error) { + dir, err := parent.Self.Without(ctx, s.srv, args.Path) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) } type withoutFilesArgs struct { Paths []string + + FSDagOpInternalArgs } -func (s *directorySchema) withoutFiles(ctx context.Context, parent *core.Directory, args withoutFilesArgs) (*core.Directory, error) { - return parent.Without(ctx, args.Paths...) +func (s *directorySchema) withoutFiles(ctx context.Context, parent dagql.Instance[*core.Directory], args withoutFilesArgs) (inst dagql.Instance[*core.Directory], err error) { + dir, err := parent.Self.Without(ctx, s.srv, args.Paths...) + if err != nil { + return inst, err + } + return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) } type diffArgs struct { @@ -594,7 +629,3 @@ func (s *directorySchema) withSymlink(ctx context.Context, parent dagql.Instance } return dagql.NewInstanceForCurrentID(ctx, s.srv, parent, dir) } - -func (s *directorySchema) withSymlinkPath(ctx context.Context, val dagql.Instance[*core.Directory], _ directoryWithSymlinkArgs) (string, error) { - return val.Self.Dir, nil -} diff --git a/core/schema/util.go b/core/schema/util.go index 629da335ff..4b536161b4 100644 --- a/core/schema/util.go +++ b/core/schema/util.go @@ -7,21 +7,16 @@ import ( "golang.org/x/mod/semver" + "github.com/dagger/dagger/core" "github.com/dagger/dagger/dagql" "github.com/dagger/dagger/dagql/introspection" - "github.com/dagger/dagger/engine/buildkit" ) type SchemaResolvers interface { Install() } -type Evaluatable interface { - dagql.Typed - Evaluate(context.Context) (*buildkit.Result, error) -} - -func Syncer[T Evaluatable]() dagql.Field[T] { +func Syncer[T core.Evaluatable]() dagql.Field[T] { return dagql.NodeFunc("sync", func(ctx context.Context, self dagql.Instance[T], _ struct{}) (dagql.ID[T], error) { _, err := self.Self.Evaluate(ctx) if err != nil { diff --git a/core/schema/wrapper.go b/core/schema/wrapper.go index 742d91e7c5..dd9697a781 100644 --- a/core/schema/wrapper.go +++ b/core/schema/wrapper.go @@ -176,9 +176,11 @@ func DagOpDirectory[T dagql.Typed, A any]( func DagOpContainerWrapper[A DagOpInternalArgsIface]( srv *dagql.Server, fn dagql.NodeFuncHandler[*core.Container, A, dagql.Instance[*core.Container]], + skipMeta bool, ) dagql.NodeFuncHandler[*core.Container, A, dagql.Instance[*core.Container]] { return func(ctx context.Context, self dagql.Instance[*core.Container], args A) (inst dagql.Instance[*core.Container], err error) { if args.InDagOp() { + fmt.Printf("ACB innie %v\n", fn) query, ok := srv.Root().(dagql.Instance[*core.Query]) if !ok { return inst, fmt.Errorf("server root was %T", srv.Root()) @@ -186,7 +188,8 @@ func DagOpContainerWrapper[A DagOpInternalArgsIface]( ctx = core.ContextWithQuery(ctx, query.Self) return fn(ctx, self, args) } - return DagOpContainer(ctx, srv, self, args, nil, fn) + fmt.Printf("ACB outie %v\n", fn) + return DagOpContainer(ctx, srv, self, args, nil, fn, skipMeta) } } @@ -197,13 +200,14 @@ func DagOpContainer[A any]( args A, data any, fn dagql.NodeFuncHandler[*core.Container, A, dagql.Instance[*core.Container]], + skipMeta bool, ) (inst dagql.Instance[*core.Container], _ error) { deps, err := extractLLBDependencies(ctx, self.Self) if err != nil { return inst, err } - ctr, err := core.NewContainerDagOp(ctx, currentIDForContainerDagOp(ctx), self.Self, deps) + ctr, err := core.NewContainerDagOp(ctx, currentIDForContainerDagOp(ctx), self.Self, deps, skipMeta) if err != nil { return inst, err } diff --git a/core/service.go b/core/service.go index 992eff15b7..096811ec55 100644 --- a/core/service.go +++ b/core/service.go @@ -267,6 +267,8 @@ func (svc *Service) startContainer( forwardStderr func(io.Reader), ) (running *RunningService, rerr error) { dig := id.Digest() + fmt.Printf("ACB startContainer start\n") + defer fmt.Printf("ACB startContainer end\n") slog := slog.With("service", dig.String(), "id", id.DisplaySelf()) diff --git a/core/util.go b/core/util.go index 6824b42a26..0705256452 100644 --- a/core/util.go +++ b/core/util.go @@ -25,6 +25,11 @@ import ( "github.com/dagger/dagger/engine/slog" ) +type Evaluatable interface { + dagql.Typed + Evaluate(context.Context) (*buildkit.Result, error) +} + type HasPBDefinitions interface { PBDefinitions(context.Context) ([]*pb.Definition, error) } diff --git a/dagql/server.go b/dagql/server.go index 99004b50db..dda2cd96f6 100644 --- a/dagql/server.go +++ b/dagql/server.go @@ -8,6 +8,7 @@ import ( "reflect" "runtime/debug" "sync" + "time" "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/errcode" @@ -516,6 +517,64 @@ func (s *Server) ExecOp(ctx context.Context, gqlOp *graphql.OperationContext) (m return results, nil } +type waitstatus struct { + mu *sync.Mutex + m map[string]int + done bool +} + +func NewWaitStatus() *waitstatus { + ws := &waitstatus{ + mu: &sync.Mutex{}, + m: map[string]int{}, + } + go ws.run() + return ws +} + +func (ws *waitstatus) run() { + stop := false + for !stop { + time.Sleep(time.Second * 30) + ws.mu.Lock() + for k, v := range ws.m { + fmt.Printf("ACB waiting on %s -> %d\n", k, v) + } + if ws.done { + stop = true + } + ws.mu.Unlock() + } +} + +func (ws *waitstatus) stop() { + ws.mu.Lock() + ws.done = true + ws.mu.Unlock() +} +func (ws *waitstatus) add(s string) { + ws.mu.Lock() + v, ok := ws.m[s] + if !ok { + v = 0 + } + ws.m[s] = v + 1 + ws.mu.Unlock() +} +func (ws *waitstatus) remove(s string) { + ws.mu.Lock() + v, ok := ws.m[s] + if ok { + v -= 1 + if v == 0 { + delete(ws.m, s) + } else { + ws.m[s] = v + } + } + ws.mu.Unlock() +} + // Resolve resolves the given selections on the given object. // // Each selection is resolved in parallel, and the results are returned in a @@ -523,14 +582,20 @@ func (s *Server) ExecOp(ctx context.Context, gqlOp *graphql.OperationContext) (m func (s *Server) Resolve(ctx context.Context, self Object, sels ...Selection) (map[string]any, error) { results := new(sync.Map) + ws := NewWaitStatus() + defer ws.stop() + pool := pool.New().WithErrors() for _, sel := range sels { + k := fmt.Sprintf("%s", sel) + ws.add(k) pool.Go(func() error { res, err := s.resolvePath(ctx, self, sel) if err != nil { return err } results.Store(sel.Name(), res) + ws.remove(k) return nil }) } diff --git a/engine/buildkit/op.go b/engine/buildkit/op.go index a2b6e2f459..55c8853ff3 100644 --- a/engine/buildkit/op.go +++ b/engine/buildkit/op.go @@ -122,6 +122,9 @@ func (op *CustomOpWrapper) Exec(ctx context.Context, g bksession.Group, inputs [ ctx = engine.ContextWithClientMetadata(ctx, &op.ClientMetadata) ctx = ctxWithBkSessionGroup(ctx, g) + fmt.Printf("ACB Exec start\n") + defer fmt.Printf("ACB Exec end\n") + server, err := op.server.DagqlServer(ctx) if err != nil { return nil, fmt.Errorf("could not find dagql server: %w", err)