8000 Limit recorded output to 32MB in size by bgentry · Pull Request #782 · riverqueue/river · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Limit recorded output to 32MB in size #782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files. Retry
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- The `river.RecordOutput` function now returns an error if the output is too large. The output is limited to 32MB in size. [PR #782](https://github.com/riverqueue/river/pull/782).

## [0.18.0] - 2025-02-20

⚠️ Version 0.18.0 has breaking changes for the `rivertest.Worker` type that was just introduced. While attempting to round out some edge cases with its design, we realized some of them simply couldn't be solved adequately without changing the overall design such that all tested jobs are inserted into the database. Given the short duration since it was released (over a weekend) it's unlikely many users have adopted it and it seemed best to rip off the bandaid to fix it before it gets widely used.
Expand Down
12 changes: 11 additions & 1 deletion recorded_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/rivertype"
)

const maxOutputSize = 32 * 1024 * 1024 // 32MB

// RecordOutput records output JSON from a job. The "output" can be any
// JSON-encodable value and will be stored in the database on the job row after
// the current execution attempt completes. Output may be useful for debugging,
Expand All @@ -26,7 +29,8 @@ import (
// ([github.com/riverqueue/river/rivertype.MetadataKeyOutput]).
// This function must be called within an Worker's Work function. It returns an
// error if called anywhere else. As with any stored value, care should be taken
// to ensure that the payload size is not too large.
// to ensure that the payload size is not too large. Output is limited to 32MB
// in size for safety, but should be kept much smaller than this.
//
// Only one output can be stored per job. If this function is called more than
// once, the output will be overwritten with the latest value. The output also
Expand All @@ -49,6 +53,12 @@ func RecordOutput(ctx context.Context, output any) error {
return err
}

// Postgres JSONB is limited to 255MB, but it would be a bad idea to get
// anywhere close to that limit here.
if len(metadataUpdatesBytes) > maxOutputSize {
return fmt.Errorf("output is too large: %d bytes (max 32 MB)", len(metadataUpdatesBytes))
}

metadataUpdates[rivertype.MetadataKeyOutput] = json.RawMessage(metadataUpdatesBytes)
return nil
}
51 changes: 41 additions & 10 deletions recorded_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package river
import (
"context"
"encoding/json"
"strings"
"testing"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -44,15 +45,15 @@ func Test_RecordedOutput(t *testing.T) {

client, _ := setup(t)

subChan := subscribe(t, client)
startClient(ctx, t, client)

validOutput := myOutput{Message: "it worked"}
expectedOutput := `{"output":{"message":"it worked"}}`
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return RecordOutput(ctx, validOutput)
}))

subChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

Expand All @@ -70,16 +71,15 @@ func Test_RecordedOutput(t *testing.T) {

client, _ := setup(t)

// Subscribe to job failure events
subChan := subscribe(t, client)
startClient(ctx, t, client)

// Use an invalid output value (a channel, which cannot be marshaled to JSON)
var invalidOutput chan int
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return RecordOutput(ctx, invalidOutput)
}))

subChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

Expand All @@ -102,14 +102,14 @@ func Test_RecordedOutput(t *testing.T) {

client, _ := setup(t)

subChan := subscribe(t, client)
startClient(ctx, t, client)

newOutput := myOutput{Message: "new output"}
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return RecordOutput(ctx, newOutput)
}))

subChan := subscribe(t, client)
startClient(ctx, t, client)

// Insert a job with pre-existing metadata (including an output key)
initialMeta := `{"existing":"value","output":"old"}`
insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{Metadata: []byte(initialMeta)})
Expand All @@ -125,4 +125,35 @@ func Test_RecordedOutput(t *testing.T) {
require.NoError(t, err)
require.JSONEq(t, expectedMeta, string(jobFromDB.Metadata))
})

t.Run("OutputTooLarge", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
// Record an output of 32MB + 1 byte:
err := RecordOutput(ctx, strings.Repeat("x", 32*1024*1024+1))
require.ErrorContains(t, err, "output is too large")
return err
}))

subChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subChan)
require.Equal(t, EventKindJobFailed, event.Kind)
require.NotEmpty(t, event.Job.Errors)
require.Contains(t, event.Job.Errors[0].Error, "output is too large")

jobFromDB, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
var meta map[string]any
require.NoError(t, json.Unmarshal(jobFromDB.Metadata, &meta))
_, ok := meta["output"]
require.False(t, ok, "output key should not be set in metadata")
})
}
7 changes: 4 additions & 3 deletions rivertest/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// Worker makes it easier to test river workers. Once built, the worker can be
// used to insert and work any number jobs:
//
// worker := rivertest.NewWorker(t, driver, config, worker)
// result, err := worker.Work(ctx, t, tx, args, nil)
// testWorker := rivertest.NewWorker(t, driver, config, worker)
// result, err := testWorker.Work(ctx, t, tx, args, nil)
// if err != nil {
// t.Fatalf("failed to work job: %s", err)
// }
Expand All @@ -35,7 +35,8 @@ import (
//
// An existing job (inserted using external logic) can also be worked:
//
// job := worker.insertJob(ctx, t, tx, args, nil)
// job := client.InsertTx(ctx, tx, args, nil)
// // ...
// result, err := worker.WorkJob(ctx, t, tx, job)
// if err != nil {
// t.Fatalf("failed to work job: %s", err)
Expand Down
Loading
0