Running into issues getting a client to start #817
-
Hello all, I was wanting to get River set up, so I created a worker/args pair for expiring user sessions, like so: package workers
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
)
type ExpireSessionsArgs struct{}
func (ExpireSessionsArgs) Kind() string { return "expire_sessions" }
func (a ExpireSessionsArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
Queue: river.QueueDefault,
}
}
type ExpireSessionsWorker struct {
river.WorkerDefaults[ExpireSessionsArgs]
pool *pgxpool.Pool
}
func NewExpireSessionsWorker(pool *pgxpool.Pool) *ExpireSessionsWorker {
return &ExpireSessionsWorker{
pool: pool,
}
}
func (w *ExpireSessionsWorker) Work(ctx context.Context, job *river.Job[ExpireSessionsArgs]) error {
// TODO: Delete user sessions that are too old
return nil
} I created a function to create a new *river.Workers object like so: package workers
import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
)
func NewWorkerBundle(pool *pgxpool.Pool) *river.Workers {
w := river.NewWorkers()
river.AddWorker(w, NewExpireSessionsWorker(pool))
return w
} Finally, I'm starting the River client within a cobra.Command, like so:
When I run it (in Docker Compose) I get the following output:
This seems to indicate to me that the workers bundle does not have any workers registered since it looks like it comes from here, but I'm not sure what I'm doing wrong compared to the docs. I even tried copying the SortWorker example verbatim and calling AddWorker with it, but I get the same error. I'm sure I'm doing something wrong here, but I was hoping someone could help me spot it. Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
@mylanconnolly Hm, there's nothing obviously wrong I can see in your code above. I suspect you may be referencing a package in Here's your code with very few modifications all baked into one package main
import (
"cmp"
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
type ExpireSessionsArgs struct{}
func (ExpireSessionsArgs) Kind() string { return "expire_sessions" }
func (a ExpireSessionsArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
Queue: river.QueueDefault,
}
}
type ExpireSessionsWorker struct {
river.WorkerDefaults[ExpireSessionsArgs]
pool *pgxpool.Pool
}
func NewExpireSessionsWorker(pool *pgxpool.Pool) *ExpireSessionsWorker {
return &ExpireSessionsWorker{
pool: pool,
}
}
func (w *ExpireSessionsWorker) Work(ctx context.Context, job *river.Job[ExpireSessionsArgs]) error {
// TODO: Delete user sessions that are too old
return nil
}
func NewWorkerBundle(pool *pgxpool.Pool) *river.Workers {
w := river.NewWorkers()
river.AddWorker(w, NewExpireSessionsWorker(pool))
return w
}
func main() {
if err := run(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
os.Exit(1)
}
}
func run(ctx context.Context) error {
databaseURL := cmp.Or(os.Getenv("DATABASE_URL"), "postgres://localhost:5432/river_dev")
pool, err := pgxpool.New(context.Background(), databaseURL)
if err != nil {
return err
}
defer pool.Close()
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: slog.Default(),
Workers: NewWorkerBundle(pool),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
})
if err != nil {
return err
}
if err := riverClient.Start(ctx); err != nil {
return err
}
defer riverClient.Stop(ctx)
const sleepDuration = 5 * time.Second
fmt.Printf("River started; waiting %s, then terminating\n", sleepDuration)
time.Sleep(sleepDuration)
return nil
} Seems to run fine:
|
Beta Was this translation helpful? Give feedback.
@mylanconnolly Hm, there's nothing obviously wrong I can see in your code above. I suspect you may be referencing a package in
workers.NewWorkerBundle
that's different than what you were expecting or something of that nature.Here's your code with very few modifications all baked into one
main.go
file (I added amain
function, default value forDATABASE_URL
, andStop
for river Client, but that's about it):