8000 Use `nproc` for clarity and add a unit test to verify that the message causing the crash is dropped by mapogolions · Pull Request #181 · anthdm/hollywood · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Use nproc for clarity and add a unit test to verify that the message causing the crash is dropped #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,32 @@ func (p *process) Invoke(msgs []Envelope) {
nmsg = len(msgs)
// numbers of msgs that are processed.
nproc = 0
// FIXME: We could use nrpoc here, but for some reason placing nproc++ on the
// bottom of the function it freezes some tests. Hence, I created a new counter
// for bookkeeping.
processed = 0
)
defer func() {
// If we recovered, we buffer up all the messages that we could not process
// so we can retry them on the next restart.
if v := recover(); v != nil {
// Processed 'nproc' messages successfully, then encountered a crash on the next message.
// After restart, processing begins with the message following the one that caused the crash.
// 'nrecv' represents the total number of successfully received messages, including the one that caused the crash.
nrecv := nproc + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line of code is problematic. If the actor receives a message that will cause a crash, we should not skip that message.
Example: Actor receives a new message -> store data in DB.
If database connection fails, panic -> recover() -> retry

We do not want to skip that message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation from the master branch already does this. Skips a message that failed to process.

p.context.message = Stopped{}
p.context.receiver.Receive(p.context)

p.mbuffer = make([]Envelope, nmsg-nproc)
for i := 0; i < nmsg-nproc; i++ {
p.mbuffer[i] = msgs[i+nproc]
p.mbuffer = make([]Envelope, nmsg-nrecv)
for i := 0; i < nmsg-nrecv; i++ {
p.mbuffer[i] = msgs[i+nrecv]
}
p.tryRestart(v)
}
}()
for i := 0; i < len(msgs); i++ {
nproc++
msg := msgs[i]
if pill, ok := msg.Msg.(poisonPill); ok {
// If we need to gracefuly stop, we process all the messages
// from the inbox, otherwise we ignore and cleanup.
if pill.graceful {
msgsToProcess := msgs[processed:]
msgsToProcess := msgs[nproc:]
for _, m := range msgsToProcess {
p.invokeMsg(m)
}
Expand All @@ -96,7 +95,7 @@ func (p *process) Invoke(msgs []Envelope) {
return
}
p.invokeMsg(msg)
processed++
nproc++
}
}

Expand Down
27 changes: 24 additions & 3 deletions actor/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,35 @@ import (
"github.com/stretchr/testify/require"
)

type triggerPanic struct {
data int
}

func Test_ProcessingStartsFromNextMessageAfterRestart(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
done := make(chan struct{})
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
if _, ok := c.Message().(triggerPanic); ok {
// message causing the failure is not processed again.
panicWrapper()
}
if s, ok := c.Message().(string); ok && s == "foo" {
close(done)
}
}, "kind", WithMaxRestarts(1))

e.Send(pid, triggerPanic{})
e.Send(pid, "foo")
<-done
}

// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function
// which triggers the panic is at the top of the stack trace.
func Test_CleanTrace(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
type triggerPanic struct {
data int
}
stopCh := make(chan struct{})
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
Expand Down
Loading
0