Open
Description
Is your feature request related to a problem? Please describe
In kafka
component we have a feature which provides info when a consumer is reached current offset (it closes a channel)
simple.WithDurationOffset(since time.Duration)
simple.WithNotificationOnceReachingLatestOffset(ch chan<- struct{})
This is usually used in a readiness check to wait until consumer is ready and then a service is ready to process incoming requests.
Readiness check is a simple function interface
// ReadyCheckFunc defines a function type for implementing a readiness check.
type ReadyCheckFunc func() ReadyStatus
Currently, every time when one wants to use this feature they have to implement boilerplate readiness check.
Describe the solution
- Add new package
patron/probe
independent from HTTP. It can be used then in gRPC probes. - Extract probes related code into this new package in
patron/probe/probe.go
- Add deterministic readiness check implementation in this new package in
patron/probe/deterministic.go
.
Additional context
New package with 2 files
patron/probe
-- probe.go
-- deterministic.go
Move probe related code into probe/probe.go
file from component/http/v2/check.go
.
Leave code related to http routes there.
package probe
// AliveStatus type representing the liveness of the service.
type AliveStatus int
// ReadyStatus type.
type ReadyStatus int
const (
// Alive represents a state defining an Alive state.
Alive AliveStatus = 1
// Unhealthy represents an unhealthy alive state.
Unhealthy AliveStatus = 2
// Ready represents a state defining a Ready state.
Ready ReadyStatus = 1
// NotReady represents a state defining a NotReady state.
NotReady ReadyStatus = 2
// AlivePath of the component.
AlivePath = "/alive"
// ReadyPath of the component.
ReadyPath = "/ready"
)
// ReadyCheckFunc defines a function type for implementing a readiness check.
type ReadyCheckFunc func() ReadyStatus
// LivenessCheckFunc defines a function type for implementing a liveness check.
type LivenessCheckFunc func() AliveStatus
Add another file probe/deterministic.go
with the following deterministic ready check implementation.
package probe
import (
"sync"
"time"
"github.com/beatlabs/patron/log"
)
// DeterministicReadyCheck waits until all the defined channels are closed
type DeterministicReadyCheck struct {
sync.Mutex
checks map[string]*readyCheck
status v2.ReadyStatus
}
// NewDeterministicReadyCheck creates new deterministic check
func NewDeterministicReadyCheck() *DeterministicReadyCheck {
return &DeterministicReadyCheck{
checks: make(map[string]*readyCheck),
status: v2.NotReady,
}
}
// AddCheck add new channel to watch when it's closed
func (r *DeterministicReadyCheck) AddCheck(name string, ch chan struct{}) *DeterministicReadyCheck {
r.checks[name] = &readyCheck{name: name, ch: ch, startedAt: time.Now()}
return r
}
// Status ask when all the checks are ready
func (r *DeterministicReadyCheck) Status() v2.ReadyStatus {
r.Lock()
defer r.Unlock()
if r.status == v2.NotReady {
for _, chk := range r.checks {
if !chk.isReady() {
return r.status
}
}
r.status = v2.Ready
}
return r.status
}
type readyCheck struct {
name string
ch <-chan struct{}
startedAt time.Time
closed bool
}
func (r *readyCheck) isReady() bool {
if r.closed {
return true
}
// if channel was closed, set the flag and log how long it took
select {
case _, ok := <-r.ch:
if !ok {
r.closed = true
log.Infof("readiness %s: %v", r.name, time.Since(r.startedAt))
return true
}
default:
}
return false
}
Metadata
Metadata
Assignees
Labels
No labels