8000 fix: Reload rule table when store contents change by charithe · Pull Request #2452 · cerbos/cerbos · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: Reload rule table when store contents change #2452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/dev/plan.hurl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jsonpath "$.action" == "view"
jsonpath "$.resourceKind" == "album:object"
jsonpath "$.policyVersion" == "nonexistent"
jsonpath "$.filter.kind" == "KIND_ALWAYS_DENIED"
jsonpath "$.meta.filterDebug" == "NO_MATCH"
jsonpath "$.meta.filterDebug" == "(false)"
jsonpath "$.cerbosCallId" exists


Expand Down
28 changes: 16 additions & 12 deletions internal/compile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ const (
)

type Manager struct {
log *zap.SugaredLogger
store storage.SourceStore
schemaMgr schema.Manager
updateQueue chan storage.Event
cache *cache.Cache[namer.ModuleID, *runtimev1.RunnablePolicySet]
sf singleflight.Group
store storage.SourceStore
schemaMgr schema.Manager
sf singleflight.Group
log *zap.SugaredLogger
updateQueue chan storage.Event
cache *cache.Cache[namer.ModuleID, *runtimev1.RunnablePolicySet]
*storage.SubscriptionManager
cacheDuration time.Duration
}

Expand All @@ -54,12 +55,13 @@ func NewManagerFromDefaultConf(ctx context.Context, store storage.SourceStore, s

func NewManagerFromConf(ctx context.Context, conf *Conf, store storage.SourceStore, schemaMgr schema.Manager) *Manager {
c := &Manager{
log: zap.S().Named("compiler"),
store: store,
schemaMgr: schemaMgr,
updateQueue: make(chan storage.Event, updateQueueSize),
cache: cache.New[namer.ModuleID, *runtimev1.RunnablePolicySet]("compile", conf.CacheSize),
cacheDuration: conf.CacheDuration,
log: zap.S().Named("compiler"),
store: store,
schemaMgr: schemaMgr,
updateQueue: make(chan storage.Event, updateQueueSize),
cache: cache.New[namer.ModuleID, *runtimev1.RunnablePolicySet]("compile", conf.CacheSize),
cacheDuration: conf.CacheDuration,
SubscriptionManager: storage.NewSubscriptionManager(ctx),
}

go c.processUpdateQueue(ctx)
Expand Down Expand Up @@ -90,6 +92,7 @@ func (c *Manager) processUpdateQueue(ctx context.Context) {
case storage.EventReload:
c.log.Info("Purging compile cache")
c.cache.Purge()
c.NotifySubscribers(evt)
case storage.EventAddOrUpdatePolicy, storage.EventDeleteOrDisablePolicy:
if err := c.recompile(evt); err != nil {
c.log.Warnw("Error while processing storage event", "event", evt, "error", err)
Expand Down Expand Up @@ -151,6 +154,7 @@ func (c *Manager) recompile(evt storage.Event) error {
}
}

c.NotifySubscribers(evt)
return nil
}

Expand Down
114 changes: 61 additions & 53 deletions internal/engine/ruletable/rule_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"sync"
"time"

"github.com/google/cel-go/cel"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"

effectv1 "github.com/cerbos/cerbos/api/genpb/cerbos/effect/v1"
policyv1 "github.com/cerbos/cerbos/api/genpb/cerbos/policy/v1"
runtimev1 "github.com/cerbos/cerbos/api/genpb/cerbos/runtime/v1"
Expand All @@ -17,9 +21,6 @@ import (
"github.com/cerbos/cerbos/internal/namer"
"github.com/cerbos/cerbos/internal/storage"
"github.com/cerbos/cerbos/internal/util"
"github.com/google/cel-go/cel"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)

const (
Expand All @@ -32,9 +33,9 @@ type RuleTable struct {
primaryIdx map[string]map[string]*util.GlobMap[*util.GlobMap[[]*Row]]
scopedResourceIdx map[string]map[string]*util.GlobMap[struct{}]
log *zap.SugaredLogger
schemas map[string]*policyv1.Schemas
meta map[string]*runtimev1.RuleTableMetadata
policyDerivedRoles map[string]map[string]*WrappedRunnableDerivedRole
schemas map[namer.ModuleID]*policyv1.Schemas
meta map[namer.ModuleID]*runtimev1.RuleTableMetadata
policyDerivedRoles map[namer.ModuleID]map[string]*WrappedRunnableDerivedRole
scopeMap map[string]struct{}
scopeScopePermissions map[string]policyv1.ScopePermissions
parentRoles map[string][]string
Expand All @@ -53,6 +54,7 @@ type Row struct {
Params *rowParams
DerivedRoleParams *rowParams
EvaluationKey string
OriginModuleID namer.ModuleID
}

func (r *Row) Matches(scope, action string, roles []string) bool {
Expand Down Expand Up @@ -97,9 +99,9 @@ func NewRuleTable() *RuleTable {
primaryIdx: make(map[string]map[string]*util.GlobMap[*util.GlobMap[[]*Row]]),
scopedResourceIdx: make(map[string]map[string]*util.GlobMap[struct{}]),
log: zap.S().Named("ruletable"),
schemas: make(map[string]*policyv1.Schemas),
meta: make(map[string]*runtimev1.RuleTableMetadata),
policyDerivedRoles: make(map[string]map[string]*WrappedRunnableDerivedRole),
schemas: make(map[namer.ModuleID]*policyv1.Schemas),
meta: make(map[namer.ModuleID]*runtimev1.RuleTableMetadata),
policyDerivedRoles: make(map[namer.ModuleID]map[string]*WrappedRunnableDerivedRole),
scopeMap: make(map[string]struct{}),
scopeScopePermissions: make(map[string]policyv1.ScopePermissions),
parentRoles: make(map[string][]string),
Expand Down Expand Up @@ -136,8 +138,6 @@ func (rt *RuleTable) addPolicy(rps *runtimev1.RunnablePolicySet) error {
func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet) error {
sanitizedResource := namer.SanitizedResource(rrps.Meta.Resource)

rt.schemas[rrps.Meta.Fqn] = rrps.Schemas

policies := rrps.GetPolicies()
if len(policies) == 0 {
return nil
Expand All @@ -146,7 +146,9 @@ func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet
// we only process the first of resource policy sets as it's assumed parent scopes are handled in separate calls
p := rrps.GetPolicies()[0]

rt.meta[rrps.Meta.Fqn] = &runtimev1.RuleTableMetadata{
moduleID := namer.GenModuleIDFromFQN(rrps.Meta.Fqn)
rt.schemas[moduleID] = rrps.Schemas
rt.meta[moduleID] = &runtimev1.RuleTableMetadata{
Fqn: rrps.Meta.Fqn,
Name: &runtimev1.RuleTableMetadata_Resource{Resource: sanitizedResource},
Version: rrps.Meta.Version,
Expand All @@ -161,7 +163,7 @@ func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet
Constants: (&structpb.Struct{Fields: dr.Constants}).AsMap(),
}
}
rt.policyDerivedRoles[rrps.Meta.Fqn] = wrapped
rt.policyDerivedRoles[moduleID] = wrapped

progs, err := getCelProgramsFromExpressions(p.OrderedVariables)
if err != nil {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet
}
}

ruleFqn := namer.RuleFQN(rt.meta[rrps.Meta.Fqn], p.Scope, rule.Name)
ruleFqn := namer.RuleFQN(rt.meta[moduleID], p.Scope, rule.Name)
evaluationKey := fmt.Sprintf("%s#%s", policyParameters.Key, ruleFqn)
for a := range rule.Actions {
for r := range rule.Roles {
Expand All @@ -208,8 +210,9 @@ func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet
EmitOutput: emitOutput,
Name: rule.Name,
},
Params: policyParameters,
EvaluationKey: evaluationKey,
Params: policyParameters,
EvaluationKey: evaluationKey,
OriginModuleID: moduleID,
})
}

Expand Down Expand Up @@ -248,6 +251,7 @@ func (rt *RuleTable) addResourcePolicy(rrps *runtimev1.RunnableResourcePolicySet
Params: policyParameters,
DerivedRoleParams: derivedRoleParams,
EvaluationKey: evaluationKey,
OriginModuleID: moduleID,
})
}
}
Expand Down Expand Up @@ -281,7 +285,8 @@ func 5D39 (rt *RuleTable) addRolePolicy(p *runtimev1.RunnableRolePolicySet) {
rt.scopeScopePermissions[p.Scope] = p.ScopePermissions

version := "default"
rt.meta[p.Meta.Fqn] = &runtimev1.RuleTableMetadata{
moduleID := namer.GenModuleIDFromFQN(p.Meta.Fqn)
rt.meta[moduleID] = &runtimev1.RuleTableMetadata{
Fqn: p.Meta.Fqn,
Name: &runtimev1.RuleTableMetadata_Role{Role: p.Role},
Version: version,
Expand All @@ -304,7 +309,8 @@ func (rt *RuleTable) addRolePolicy(p *runtimev1.RunnableRolePolicySet) {
ScopePermissions: p.ScopePermissions,
Version: version,
},
EvaluationKey: evaluationKey,
EvaluationKey: evaluationKey,
OriginModuleID: moduleID,
})
}
}
Expand Down Expand Up @@ -361,51 +367,48 @@ func (rt *RuleTable) insertRule(r *Row) {
rt.rules = append(rt.rules, r)
}

func (rt *RuleTable) deletePolicy(rps *runtimev1.RunnablePolicySet) {
func (rt *RuleTable) deletePolicy(moduleID namer.ModuleID) {
// TODO(saml) rebuilding/reassigning the whole row slice on each delete is hugely inefficient.
// Perhaps we could mark as `deleted` and periodically purge the deleted rows.
// However, it's unlikely this bespoke table implementation will be around long enough to worry about this.

AE8F rt.mu.Lock()
defer rt.mu.Unlock()

deletedFqn := rps.Fqn
meta := rt.meta[moduleID]
rt.log.Debugf("Deleting policy %s", meta.GetFqn())

versionSet := make(map[string]struct{})
scopeSet := make(map[string]struct{})

newRules := []*Row{}
newRules := make([]*Row, 0, len(rt.rules))
for _, r := range rt.rules {
if r.OriginFqn != deletedFqn {
if r.OriginModuleID != moduleID {
newRules = append(newRules, r)
versionSet[r.Version] = struct{}{}
scopeSet[r.Scope] = struct{}{}
} else {
rt.log.Debugf("Dropping rule %s", r.GetOriginFqn())
}
}
rt.rules = newRules

delete(rt.schemas, deletedFqn)
delete(rt.meta, deletedFqn)

var version, scope string
switch rps.PolicySet.(type) {
case *runtimev1.RunnablePolicySet_ResourcePolicy:
rp := rps.GetResourcePolicy()
version = rp.Meta.Version
scope = rp.Policies[0].Scope
case *runtimev1.RunnablePolicySet_RolePolicy:
rlp := rps.GetRolePolicy()
version = "default" // TODO(saml)
scope = rlp.Scope
delete(rt.schemas, moduleID)
delete(rt.meta, moduleID)
delete(rt.policyDerivedRoles, moduleID)

delete(rt.parentRoles, rlp.Role)
delete(rt.parentRoleAncestorsCache, rlp.Role)
if role := meta.GetRole(); role != "" {
delete(rt.parentRoles, role)
delete(rt.parentRoleAncestorsCache, role)
}

version := meta.GetVersion()
if _, ok := versionSet[version]; !ok {
delete(rt.primaryIdx, version)
delete(rt.scopedResourceIdx, version)
}

scope := namer.ScopeFromFQN(meta.GetFqn())
if _, ok := scopeSet[scope]; !ok {
delete(rt.scopeMap, scope)
delete(rt.scopeScopePermissions, scope)
Expand All @@ -416,22 +419,24 @@ func (rt *RuleTable) purge() {
rt.mu.Lock()
defer rt.mu.Unlock()

rt.rules = []*Row{}
rt.parentRoles = make(map[string][]string)
rt.schemas = make(map[string]*policyv1.Schemas)

rt.scopeMap = make(map[string]struct{})
rt.primaryIdx = make(map[string]map[string]*util.GlobMap[*util.GlobMap[[]*Row]])
rt.scopedResourceIdx = make(map[string]map[string]*util.GlobMap[struct{}])
rt.parentRoleAncestorsCache = make(map[string][]string)
clear(rt.meta)
clear(rt.parentRoleAncestorsCache)
clear(rt.parentRoles)
clear(rt.policyDerivedRoles)
clear(rt.primaryIdx)
clear(rt.rules)
clear(rt.schemas)
clear(rt.scopeMap)
clear(rt.scopeScopePermissions)
clear(rt.scopedResourceIdx)
}

func (rt *RuleTable) Len() int {
return len(rt.rules)
}

func (rt *RuleTable) GetDerivedRoles(fqn string) map[string]*WrappedRunnableDerivedRole {
return rt.policyDerivedRoles[fqn]
return rt.policyDerivedRoles[namer.GenModuleIDFromFQN(fqn)]
}

func (rt *RuleTable) GetAllScopes(scope, resource, version string) ([]string, string, string) {
Expand Down Expand Up @@ -571,7 +576,7 @@ func (rt *RuleTable) GetSchema(fqn string) *policyv1.Schemas {
rt.mu.RLock()
defer rt.mu.RUnlock()

if s, ok := rt.schemas[fqn]; ok {
if s, ok := rt.schemas[namer.GenModuleIDFromFQN(fqn)]; ok {
return s
}

Expand All @@ -582,7 +587,7 @@ func (rt *RuleTable) GetMeta(fqn string) *runtimev1.RuleTableMetadata {
rt.mu.RLock()
defer rt.mu.RUnlock()

if s, ok := rt.meta[fqn]; ok {
if s, ok := rt.meta[namer.GenModuleIDFromFQN(fqn)]; ok {
return s
}

Expand Down Expand Up @@ -630,14 +635,17 @@ func (rt *RuleTable) processPolicyEvent(ev storage.Event) error {
ctx, cancelFunc := context.WithTimeout(context.Background(), storeFetchTimeout)
defer cancelFunc()

rps, err := rt.policyLoader.GetFirstMatch(ctx, []namer.ModuleID{ev.PolicyID})
if err != nil {
return err
rt.deletePolicy(ev.PolicyID)
if ev.OldPolicyID != nil {
rt.deletePolicy(*ev.OldPolicyID)
}

rt.deletePolicy(rps)

if ev.Kind == storage.EventAddOrUpdatePolicy {
rps, err := rt.policyLoader.GetFirstMatch(ctx, []namer.ModuleID{ev.PolicyID})
if err != nil {
return err
}

if err := rt.addPolicy(rps); err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func Start(ctx context.Context) error {

rt := ruletable.NewRuleTable().WithPolicyLoader(policyLoader)

// For now, we're only enabling the ruletable engine for non-mutable stores populate rule table
// for non-mutable stores
// Populate rule table for non-mutable stores.
if _, ok := store.(storage.MutableStore); !ok {
rps, err := policyLoader.GetAll(ctx)
if err != nil {
Expand All @@ -171,10 +170,10 @@ func Start(ctx context.Context) error {
if err := rt.LoadPolicies(rps); err != nil {
return fmt.Errorf("failed to load policies into rule table: %w", err)
}
}

if ss, ok := store.(storage.Subscribable); ok {
ss.Subscribe(rt)
}
if ss, ok := policyLoader.(storage.Subscribable); ok {
ss.Subscribe(rt)
Comment on lines +175 to +176
Copy link
Contributor
@Sambigeara Sambigeara Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2451 (comment)

I'm not sure I understand why this is the case. For mutable stores, the original stores remain as the source of truth, and a "partial rule table" is generated at query time (and discarded afterwards).

In the mutable-store case, partial rule tables are (currently) generated from the mutable store per-query and discarded afterwards. I chose to do this as an initial step as it seemed odd to maintain two sources of mutable truth.

This may or may not be related to the recent E2E test failure (it's currently nil-pointering on a nonexistent policy set passed to the addPolicy method).

}

// create engine
Expand Down
Loading
0