8000 fix(api): include repo type for run result unicity by richardlt · Pull Request #6184 · ovh/cds · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(api): include repo type for run result unicity #6184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (e *artifactoryReleasePlugin) Run(ctx context.Context, opts *integrationplu
return fail("unable to list run results: %v", err)
}

fmt.Printf("Found %d run results\n", len(runResult))

log.SetLogger(log.NewLogger(log.INFO, os.Stdout))
if distributionURL == "" {
fmt.Printf("Using %s to release\n", artifactoryURL)
Expand Down Expand Up @@ -137,8 +139,15 @@ func (e *artifactoryReleasePlugin) Run(ctx context.Context, opts *integrationplu
break
}
}
name, err := r.ComputeName()
if err != nil {
return fail("unable to read result %s: %v", r.ID, err)
}
if skip {
fmt.Printf("Result %q skipped\n", name)
continue
} else {
fmt.Printf("Result %q to promote\n", name)
}
switch rData.RepoType {
case "docker":
Expand Down
38 changes: 17 additions & 21 deletions engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,42 +354,38 @@ func insertResult(tx gorpmapper.SqlExecutorWithTx, runResult *sdk.WorkflowRunRes
return nil
}

func getAll(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) ([]sdk.WorkflowRunResult, error) {
func getAll(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) (sdk.WorkflowRunResults, error) {
var dbResults []dbRunResult
if err := gorpmapping.GetAll(ctx, db, query, &dbResults); err != nil {
return nil, err
}
results := make([]sdk.WorkflowRunResult, 0, len(dbResults))
results := make(sdk.WorkflowRunResults, 0, len(dbResults))
for _, r := range dbResults {
results = append(results, sdk.WorkflowRunResult(r))
}
return results, nil
}

func LoadRunResultsByRunID(ctx context.Context, db gorp.SqlExecutor, runID int64) ([]sdk.WorkflowRunResult, error) {
dbQuery := `
WITH allResults AS (
SELECT data->>'name' AS name, sub_num, id
FROM workflow_run_result
WHERE workflow_run_id = $1
),
deduplication AS (
SELECT distinct on (name) *
FROM allResults
ORDER BY name, sub_num DESC
)
SELECT * FROM workflow_run_result WHERE id IN (SELECT id FROM deduplication);`
query := gorpmapping.NewQuery(dbQuery).Args(runID)
func LoadRunResultsByRunID(ctx context.Context, db gorp.SqlExecutor, runID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 ORDER BY sub_num DESC").Args(runID)
return getAll(ctx, db, query)
}

func LoadRunResultsByNodeRunID(ctx context.Context, db gorp.SqlExecutor, nodeRunID int64) ([]sdk.WorkflowRunResult, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result where workflow_node_run_id = $1").Args(nodeRunID)
return getAll(ctx, db, query)
func LoadRunResultsByRunIDUnique(ctx context.Context, db gorp.SqlExecutor, runID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 ORDER BY sub_num DESC").Args(runID)
rs, err := getAll(ctx, db, query)
if err != nil {
return nil, err
}
return rs.Unique()
}

func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runID int64, t sdk.WorkflowRunResultType) ([]sdk.WorkflowRunResult, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result where workflow_run_id = $1 AND type = $2").Args(runID, t)
func LoadRunResultsByNodeRunID(ctx context.Context, db gorp.SqlExecutor, nodeRunID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_node_run_id = $1").Args(nodeRunID)
return getAll(ctx, db, query)
}

func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runID int64, t sdk.WorkflowRunResultType) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 AND type = $2").Args(runID, t)
return getAll(ctx, db, query)
}
20 changes: 15 additions & 5 deletions engine/api/workflow/workflow_run_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func TestCanUploadArtifactAlreadyExist(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -171,7 +173,9 @@ func TestCanUploadArtifactAlreadyExistInMoreRecentSubNum(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -230,7 +234,9 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -275,7 +281,9 @@ func TestCanUploadStaticFile(t *testing.T) {
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "my title static file",
},
RemoteURL: "https://foo/bar",
}
bts, err := json.Marshal(artiData)
Expand Down Expand Up @@ -309,7 +317,9 @@ func TestCanUploadStaticFileInvalid(t *testing.T) {
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "my title static file",
},
RemoteURL: "",
}
bts, err := json.Marshal(artiData)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_purge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,10 @@ func Test_Purge_DeleteArtifactsFromRepositoryManager(t *testing.T) {
require.NoError(t, err)

data := sdk.WorkflowRunResultArtifactManager{
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
Path: "path/to/foo",
Name: "foo",
RepoName: "repository",
}
rawData, _ := json.Marshal(data)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_queue_test.go
F438
Original file line number Diff line number Diff line change
Expand Up @@ -1556,10 +1556,12 @@ func Test_workflowRunResultsAdd(t *testing.T) {
}

artiData := sdk.WorkflowRunResultArtifact{
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
Size: 1,
MD5: "AA",
CDNRefHash: "AA",
Name: "myartifact",
Perm: 0777,
}
bts, err := json.Marshal(artiData)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ func (api *API) getWorkflowRunResultsHandler() service.Handler {
return sdk.WrapError(err, "unable to load workflow run for workflow %s and number %d", name, number)
}

results, err := workflow.LoadRunResultsByRunID(ctx, api.mustDB(), wr.ID)
results, err := workflow.LoadRunResultsByRunIDUnique(ctx, api.mustDB(), wr.ID)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_run_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func Test_getWorkflowRunAndNodeRunResults(t *testing.T) {
require.NoError(t, err)

artiData := sdk.WorkflowRunResultArtifact{
Name: "myarti",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myarti",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down
8 changes: 6 additions & 2 deletions engine/cdn/cdn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,19 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
switch runResultApiRef.RunResultType {
case sdk.WorkflowRunResultTypeArtifact:
result = sdk.WorkflowRunResultArtifact{
Name: apiRef.ToFilename(),
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: apiRef.ToFilename(),
},
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
}
case sdk.WorkflowRunResultTypeCoverage:
result = sdk.WorkflowRunResultCoverage{
Name: apiRef.ToFilename(),
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: apiRef.ToFilename(),
},
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Expand Down
8 changes: 6 additions & 2 deletions engine/worker/cmd_run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func addStaticFileRunResultCmd() func(cmd *cobra.Command, args []string) {
}

payload := sdk.WorkflowRunResultStaticFile{
Name: name,
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: name,
},
RemoteURL: remoteURL.String(),
}
data, _ := json.Marshal(payload)
Expand Down Expand Up @@ -109,7 +111,9 @@ func addArtifactManagerRunResultCmd() func(cmd *cobra.Command, args []string) {
}

payload := sdk.WorkflowRunResultArtifactManager{
Name: fileName,
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: fileName,
},
Perm: 0,
Path: filePath,
RepoName: repositoryName,
Expand Down
4 changes: 3 additions & 1 deletion engine/worker/internal/action/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ func addWorkflowRunResult(ctx context.Context, wk workerruntime.Runtime, filePat
}

data := sdk.WorkflowRunResultArtifactManager{
Name: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFileName],
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFileName],
},
Perm: uint32(perm),
RepoName: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathRepoName],
Path: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFilePath],
Expand Down
8 changes: 6 additions & 2 deletions engine/worker/internal/handler_run_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func Test_addRunResultStaticFileHandler(t *testing.T) {
).Times(1)

v := sdk.WorkflowRunResultStaticFile{
Name: "foo",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
RemoteURL: "http://locat.local/static/foo.html",
}
buf, err := json.Marshal(v)
Expand Down Expand Up @@ -107,7 +109,9 @@ func Test_addRunResultArtifactManagerHandler(t *testing.T) {
).Times(1)

v := sdk.WorkflowRunResultArtifactManager{
Name: "foo",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
RepoName: "my-repo",
}
buf, err := json.Marshal(v)
Expand Down
73 changes: 68 additions & 5 deletions sdk/workflow_run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sdk

import (
"encoding/json"
"fmt"
"sort"
"time"
)

Expand All @@ -15,6 +17,28 @@ const (
type WorkflowRunResultType string
type WorkflowRunResultDataKey string

type WorkflowRunResults []WorkflowRunResult

// Unique returns the last version of each results
func (w WorkflowRunResults) Unique() (WorkflowRunResults, error) {
m := make(map[string]WorkflowRunResult, len(w))
for i := range w {
key, err := w[i].ComputeUniqueKey()
if err != nil {
return nil, err
}
if v, ok := m[key]; !ok || v.SubNum < w[i].SubNum {
m[key] = w[i]
}
}
filtered := make(WorkflowRunResults, 0, len(m))
for _, v := range m {
filtered = append(filtered, v)
}
sort.Slice(filtered, func(i, j int) bool { return filtered[i].Created.Before(filtered[j].Created) })
return filtered, nil
}

type WorkflowRunResult struct {
ID string `json:"id" db:"id"`
Created time.Time `json:"created" db:"created"`
Expand All @@ -26,12 +50,47 @@ type WorkflowRunResult struct {
DataRaw json.RawMessage `json:"data" db:"data"`
}

func (r WorkflowRunResult) ComputeUniqueKey() (string, error) {
key := fmt.Sprintf("%d-%s", r.WorkflowRunID, r.Type)
switch r.Type {
case WorkflowRunResultTypeArtifactManager:
var data WorkflowRunResultArtifactManager
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
key = key + "-" + data.Name + "-" + data.RepoType
default:
var data WorkflowRunResultArtifactCommon
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
key = key + "-" + data.Name
}
return key, nil
}

func (r WorkflowRunResult) ComputeName() (string, error) {
switch r.Type {
case WorkflowRunResultTypeArtifactManager:
var data WorkflowRunResultArtifactManager
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
return fmt.Sprintf("%s (%s: %s)", data.Name, r.Type, data.RepoType), nil
default:
var data WorkflowRunResultArtifactCommon
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
return fmt.Sprintf("%s (%s)", data.Name, r.Type), nil
}
}

func (r *WorkflowRunResult) GetArtifact() (WorkflowRunResultArtifact, error) {
var data WorkflowRunResultArtifact
if err := JSONUnmarshal(r.DataRaw, &data); err != nil {
return data, WithStack(err)
}

return data, nil
}

Expand Down Expand Up @@ -70,8 +129,12 @@ type WorkflowRunResultCheck struct {
ResultType WorkflowRunResultType `json:"result_type"`
}

type WorkflowRunResultArtifactCommon struct {
Name string `json:"name"`
}

type WorkflowRunResultArtifactManager struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
Path string `json:"path"`
Expand All @@ -98,7 +161,7 @@ func (a *WorkflowRunResultArtifactManager) IsValid() error {
}

type WorkflowRunResultStaticFile struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
RemoteURL string `json:"remote_url"`
}

Expand All @@ -113,7 +176,7 @@ func (a *WorkflowRunResultStaticFile) IsValid() error {
}

type WorkflowRunResultArtifact struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
CDNRefHash string `json:"cdn_hash"`
Expand All @@ -138,7 +201,7 @@ func (a *WorkflowRunResultArtifact) IsValid() error {
}

type WorkflowRunResultCoverage struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
CDNRefHash string `json:"cdn_hash"`
Expand Down
Loading
0