8000 fix syncutil by kooksee · Pull Request #14 · pubgo/funk · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix syncutil #14

New issue

Have a question about this project? Sign up for 8000 a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 32 additions & 38 deletions syncutil/waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"runtime"
"sync"
"sync/atomic"
_ "unsafe"
"time"

"github.com/pubgo/funk/async"
"github.com/pubgo/funk/fastrand"
"github.com/pubgo/funk/log"
"github.com/pubgo/funk/stack"
"github.com/pubgo/funk/try"
"github.com/rs/zerolog"
)

//go:linkname state sync.(*WaitGroup).state
func state(*sync.WaitGroup) (*uint64, *uint32)

var defaultConcurrent = uint32(runtime.NumCPU() * 2)

func NewWaitGroup(maxConcurrent ...uint32) *WaitGroup {
Expand All @@ -24,50 +23,45 @@ func NewWaitGroup(maxConcurrent ...uint32) *WaitGroup {
}

type WaitGroup struct {
_ noCopy
wg sync.WaitGroup
count atomic.Uint32
maxConcurrent uint32
err error
}

func (t *WaitGroup) Count() uint32 {
count, _ := state(&t.wg)
return uint32(atomic.LoadUint64(count) >> 32)
}

func (t *WaitGroup) check() {
// 阻塞, 等待任务处理完毕
// 采样率(1), 打印log
for t.Count() >= t.maxConcurrent {
if fastrand.Sampling(1) {
logs.Warn().
Uint32("current", t.Count()).
Uint32("maximum", t.maxConcurrent).
Msg("WaitGroup current concurrent number exceeds the maximum concurrent number of the system")
func (t *WaitGroup) Count() uint32 { return t.count.Load() }
func (t *WaitGroup) checkAndWait() {
for {
count := t.Count()
if count <= t.maxConcurrent {
break
}

runtime.Gosched()
if count < defaultConcurrent {
runtime.Gosched()
} else {
time.Sleep(time.Microsecond * 10)
}
}
}

func (t *WaitGroup) Go(fn func()) {
t.wg.Add(1)
t.check()
async.GoSafe(
func() error { fn(); return nil },
func(err error) {
t.err = err
},
)
t.checkAndWait()
go func() {
defer t.wg.Done()
err := try.Try(func() error {
fn()
return nil
})
if err != nil {
log.Err(err).
Func(func(e *zerolog.Event) {
e.Str("fn_stack", stack.CallerWithFunc(fn).String())
}).Msg("recovery func panic")
}
}()
}

func (t *WaitGroup) Wait() error {
func (t *WaitGroup) Wait() {
t.wg.Wait()
return t.err
}

type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
Empty file removed syncutil/waitgroup.s
Empty file.
20 changes: 20 additions & 0 deletions syncutil/waitgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package syncutil

import (
"testing"
"time"
)

func TestName(t *testing.T) {
now := time.Now()
defer func() {
t.Log(time.Since(now))
}()
wg := NewWaitGroup(10)
for i := 0; i < 10; i++ {
wg.Go(func() {
time.Sleep(time.Millisecond * 10)
})
}
wg.Wait()
}
Loading
0