8000 Add new `HookWorkEnd` interface that runs after workers finish by brandur · Pull Request #863 · riverqueue/river · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
8000

Add new HookWorkEnd interface that runs after workers finish #863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `river/riverlog` containing middleware that injects a context logger to workers that collates log output and persists it with job metadata. This is paired with a River UI enhancement that shows logs in the UI. [PR #844](https://github.com/riverqueue/river/pull/844).
- Added `JobInsertMiddlewareFunc` and `WorkerMiddlewareFunc` to easily implement middleware with a function instead of a struct. [PR #844](https://github.com/riverqueue/river/pull/844).
- Added `Config.Schema` which lets a non-default schema be injected explicitly into a River client that'll be used for all database operations. This may be particularly useful for proxies like PgBouncer that may not respect a schema configured in `search_path`. [PR #848](https://github.com/riverqueue/river/pull/848).
- Added `rivertype.HookWorkEnd` hook interface that runs after a job has been worked. [PR #863](https://github.com/riverqueue/river/pull/863).

### Changed

Expand Down
143 changes: 121 additions & 22 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,33 +741,110 @@ func Test_Client(t *testing.T) {
require.True(t, workBeginHookCalled)
})

t.Run("WithGlobalWorkEndHook", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

workEndHookCalled := false

bundle.config.Hooks = []rivertype.Hook{
HookWorkEndFunc(func(ctx context.Context, err error) error {
workEndHookCalled = true
return err
}),
}

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error {
return nil
}))

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

require.True(t, workEndHookCalled)
})

t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error {
type JobArgs struct {
JobArgsReflectKind[JobArgs]
hookEmbed[metadataHookInsertBegin]
}

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil)
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

var metadataMap map[string]any
err = json.Unmarshal(insertRes.Job.Metadata, &metadataMap)
require.NoError(t, err)
require.Equal(t, "called", metadataMap["insert_begin_hook"])
require.Equal(t, metadataHookCalled, metadataMap[metadataHookInsertBeginKey])
})

t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) { //nolint:dupl
t.Parallel()

_, bundle := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
hookEmbed[metadataHookWorkBegin]
}

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadataMap map[string]any
err = json.Unmarshal(event.Job.Metadata, &metadataMap)
require.NoError(t, err)
require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkBeginKey])
})

t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) {
t.Run("WithWorkEndHookOnJobArgs", func(t *testing.T) { //nolint:dupl
t.Parallel()

_, bundle := setup(t)

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error {
type JobArgs struct {
JobArgsReflectKind[JobArgs]
hookEmbed[metadataHookWorkEnd]
}

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

Expand All @@ -777,7 +854,7 @@ func Test_Client(t *testing.T) {
subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil)
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
Expand All @@ -787,7 +864,7 @@ func Test_Client(t *testing.T) {
var metadataMap map[string]any
err = json.Unmarshal(event.Job.Metadata, &metadataMap)
require.NoError(t, err)
require.Equal(t, "called", metadataMap["work_begin_hook"])
require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkEndKey])
})

t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) {
Expand Down Expand Up @@ -1167,30 +1244,31 @@ func Test_Client(t *testing.T) {
})
}

type jobArgsWithCustomHook struct{}

func (jobArgsWithCustomHook) Kind() string { return "with_custom_hook" }
// hookEmbed can be embedded on a JobArgs to add a hook to it in such a way that
// it can be encapsulated within a test case.
type hookEmbed[T rivertype.Hook] struct{}

func (jobArgsWithCustomHook) Hooks() []rivertype.Hook {
return []rivertype.Hook{
&testHookInsertAndWorkBegin{},
}
func (f hookEmbed[T]) Hooks() []rivertype.Hook {
var hook T
return []rivertype.Hook{hook}
}

var (
_ rivertype.HookInsertBegin = &testHookInsertAndWorkBegin{}
_ rivertype.HookWorkBegin = &testHookInsertAndWorkBegin{}
const (
metadataHookCalled = "called"
metadataHookInsertBeginKey = "insert_begin"
metadataHookWorkBeginKey = "work_begin"
metadataHookWorkEndKey = "work_end"
)

type testHookInsertAndWorkBegin struct{ HookDefaults }
type metadataHookInsertBegin struct{ rivertype.Hook }

func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error {
func (metadataHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error {
var metadataMap map[string]any
if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil {
return err
}

metadataMap["insert_begin_hook"] = "called"
metadataMap[metadataHookInsertBeginKey] = metadataHookCalled

var err error
params.Metadata, err = json.Marshal(metadataMap)
Expand All @@ -1201,17 +1279,38 @@ func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *ri
return nil
}

func (t *testHookInsertAndWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
type metadataHookWorkBegin struct{ rivertype.Hook }

func (metadataHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !hasMetadataUpdates {
panic("expected to be called from within job executor")
}

metadataUpdates["work_begin_hook"] = "called"
metadataUpdates[metadataHookWorkBeginKey] = metadataHookCalled

return nil
}

type metadataHookWorkEnd struct{ rivertype.Hook }

func (metadataHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !hasMetadataUpdates {
panic("expected to be called from within job executor")
}

metadataUpdates[metadataHookWorkEndKey] = metadataHookCalled

return err
}

var (
_ rivertype.HookInsertBegin = metadataHookInsertBegin{}
_ rivertype.HookWorkBegin = metadataHookWorkBegin{}
_ rivertype.HookWorkEnd = metadataHookWorkEnd{}
)

type workerWithMiddleware[T JobArgs] struct {
WorkerDefaults[T]
workFunc func(context.Context, *Job[T]) error
Expand Down
10 changes: 10 additions & 0 deletions hook_defaults_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow)
}

func (f HookWorkBeginFunc) IsHook() bool { return true }

// HookWorkEndFunc is a convenience helper for implementing
// rivertype.HookworkEnd using a simple function instead of a struct.
type HookWorkEndFunc func(ctx context.Context, err error) error

func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error {
return f(ctx, err)
}

func (f HookWorkEndFunc) IsHook() bool { return true }
7 changes: 7 additions & 0 deletions internal/hooklookup/hook_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type HookKind string
const (
HookKindInsertBegin HookKind = "insert_begin"
HookKindWorkBegin HookKind = "work_begin"
HookKindWorkEnd HookKind = "work_end"
)

//
Expand Down Expand Up @@ -88,6 +89,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
case HookKindWorkEnd:
for _, hook := range c.hooks {
if typedHook, ok := hook.(rivertype.HookWorkEnd); ok {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
}

return c.hooksByKind[kind]
Expand Down
30 changes: 29 additions & 1 deletion internal/hooklookup/hook_lookup_test.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
&testHookWorkBegin{},
&testHookWorkEnd{},
}).(*hookLookup), &testBundle{}
}

Expand All @@ -38,8 +39,11 @@ func TestHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
}, hookLookup.ByHookKind(HookKindWorkBegin))
require.Equal(t, []rivertype.Hook{
&testHookWorkEnd{},
}, hookLookup.ByHookKind(HookKindWorkEnd))

require.Len(t, hookLookup.hooksByKind, 2)
require.Len(t, hookLookup.hooksByKind, 3)

// Repeat lookups to make sure we get the same result.
require.Equal(t, []rivertype.Hook{
Expand All @@ -50,6 +54,9 @@ func TestHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
}, hookLookup.ByHookKind(HookKindWorkBegin))
require.Equal(t, []rivertype.Hook{
&testHookWorkEnd{},
}, hookLookup.ByHookKind(HookKindWorkEnd))
})

t.Run("Stress", func(t *testing.T) {
Expand Down Expand Up @@ -118,6 +125,7 @@ func TestJobHookLookup(t *testing.T) {

require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
Expand All @@ -126,12 +134,16 @@ func TestJobHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
require.Equal(t, []rivertype.Hook{
&testHookWorkEnd{},
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))

require.Len(t, jobHookLookup.hookLookupByKind, 2)

// Repeat lookups to make sure we get the same result.
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
Expand All @@ -140,6 +152,9 @@ func TestJobHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
require.Equal(t, []rivertype.Hook{
&testHookWorkEnd{},
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))
})

t.Run("Stress", func(t *testing.T) {
Expand Down Expand Up @@ -195,6 +210,7 @@ func (jobArgsWithCustomHooks) Hooks() []rivertype.Hook {
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
&testHookWorkBegin{},
&testHookWorkEnd{},
}
}

Expand Down Expand Up @@ -242,3 +258,15 @@ type testHookWorkBegin struct{ rivertype.Hook }
func (t *testHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
return nil
}

//
// testHookWorkEnd
//

var _ rivertype.HookWorkEnd = &testHookWorkEnd{}

type testHookWorkEnd struct{ rivertype.Hook }

func (t *testHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
return nil
}
13 changes: 12 additions & 1 deletion internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,18 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
defer cancel()

return e.WorkUnit.Work(ctx)
err := e.WorkUnit.Work(ctx)

{
for _, hook := range append(
e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd),
e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)...,
) {
err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert
}
}

return err
})

executeFunc := execution.MiddlewareChain(
Expand Down
Loading
Loading
0