8000 More granular locking for scrapeLoop. by brian-brazil · Pull Request #8104 · prometheus/prometheus · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

More granular locking for scrapeLoop. #8104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 26, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,13 @@ type scrapePool struct {
appendable storage.Appendable
logger log.Logger

mtx sync.Mutex
// targetMtx protects activeTargets and droppedTargets from concurrent reads
// and writes. Only one of Sync/stop/reload may be called at once due to
// manager.mtxScrape so we only need to protect from concurrent reads from
// the ActiveTargets and DroppedTargets methods. This allows those two
// methods to always complete without having to wait on scrape loops to gracefull stop.
targetMtx sync.Mutex

config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
Expand Down Expand Up @@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
}

func (sp *scrapePool) ActiveTargets() []*Target {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.targetMtx.Lock()
defer sp.targetMtx.Unlock()

var tActive []*Target
for _, t := range sp.activeTargets {
Expand All @@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target {
}

func (sp *scrapePool) DroppedTargets() []*Target {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.targetMtx.Lock()
defer sp.targetMtx.Unlock()
return sp.droppedTargets
}

Expand All @@ -294,8 +300,7 @@ func (sp *scrapePool) stop() {
sp.cancel()
var wg sync.WaitGroup

sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.targetMtx.Lock()

for fp, l := range sp.loops {
wg.Add(1)
Expand All @@ -308,6 +313,9 @@ func (sp *scrapePool) stop() {
delete(sp.loops, fp)
delete(sp.activeTargets, fp)
}

sp.targetMtx.Unlock()

wg.Wait()
sp.client.CloseIdleConnections()

Expand All @@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc()
start := time.Now()

sp.mtx.Lock()
defer sp.mtx.Unlock()

client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false)
if err != nil {
targetScrapePoolReloadsFailed.Inc()
Expand All @@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
mrc = sp.config.MetricRelabelConfigs
)

sp.targetMtx.Lock()

forcedErr := sp.refreshTargetLimitErr()
for fp, oldLoop := range sp.loops {
var cache *scrapeCache
Expand Down Expand Up @@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.loops[fp] = newLoop
}

sp.targetMtx.Unlock()

wg.Wait()
oldClient.CloseIdleConnections()
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
Expand All @@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()

start := time.Now()

sp.targetMtx.Lock()
var all []*Target
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
Expand All @@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
}
}
}
sp.targetMtx.Unlock()
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
Expand All @@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
// This function expects that you have acquired the sp.mtx lock.
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
Expand All @@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) {
mrc = sp.config.MetricRelabelConfigs
)

sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()

Expand Down Expand Up @@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) {
}
}

sp.targetMtx.Unlock()

targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
Expand All @@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) {
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
// if the number of targets exceeds the configured limit.
func (sp *scrapePool) refreshTargetLimitErr() error {
// This function expects that you have acquired the sp.mtx lock.
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
return nil
}
Expand Down
0