8000 exp/lb: Add service.cilium.io/type annotation support by brb · Pull Request #38260 · cilium/cilium · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

exp/lb: Add service.cilium.io/type annotation support #38260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func isValidServiceFrontendIP(netIP net.IP) bool {
// latter two, one can set the annotation with the value "LoadBalancer".
type exposeSvcType slim_corev1.ServiceType

func newSvcExposureType(svc *slim_corev1.Service) (*exposeSvcType, error) {
func NewSvcExposureType(svc *slim_corev1.Service) (*exposeSvcType, error) {
typ, isSet := svc.Annotations[annotation.ServiceTypeExposure]
if !isSet {
return nil, nil
Expand All @@ -171,8 +171,8 @@ func newSvcExposureType(svc *slim_corev1.Service) (*exposeSvcType, error) {
return &expType, nil
}

// canExpose checks whether a given service type can be provisioned.
func (e *exposeSvcType) canExpose(t slim_corev1.ServiceType) bool {
// CanExpose checks whether a given service type can be provisioned.
func (e *exposeSvcType) CanExpose(t slim_corev1.ServiceType) bool {
if e == nil {
return true
}
Expand All @@ -199,7 +199,7 @@ func ParseService(svc *slim_corev1.Service, nodePortAddrs []netip.Addr) (Service
var loadBalancerIPs []string

svcID := ParseServiceID(svc)
expType, err := newSvcExposureType(svc)
expType, err := NewSvcExposureType(svc)
if err != nil {
scopedLog.WithError(err).Warnf("Ignoring %q annotation", annotation.ServiceTypeExposure)
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func ParseService(svc *slim_corev1.Service, nodePortAddrs []netip.Addr) (Service
}

var clusterIPs []net.IP
if expType.canExpose(slim_corev1.ServiceTypeClusterIP) {
if expType.CanExpose(slim_corev1.ServiceTypeClusterIP) {
if len(svc.Spec.ClusterIPs) == 0 {
if clsIP := net.ParseIP(svc.Spec.ClusterIP); clsIP != nil {
clusterIPs = []net.IP{clsIP}
Expand Down Expand Up @@ -265,7 +265,7 @@ func ParseService(svc *slim_corev1.Service, nodePortAddrs []netip.Addr) (Service
intTrafficPolicy = loadbalancer.SVCTrafficPolicyCluster
}

if expType.canExpose(slim_corev1.ServiceTypeLoadBalancer) {
if expType.CanExpose(slim_corev1.ServiceTypeLoadBalancer) {
for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP != "" && (ip.IPMode == nil || *ip.IPMode == slim_corev1.LoadBalancerIPModeVIP) {
loadBalancerIPs = append(loadBalancerIPs, ip.IP)
Expand Down Expand Up @@ -348,7 +348,7 @@ func ParseService(svc *slim_corev1.Service, nodePortAddrs []netip.Addr) (Service
svcInfo.Ports[portName] = p
}

if expType.canExpose(slim_corev1.ServiceTypeNodePort) &&
if expType.CanExpose(slim_corev1.ServiceTypeNodePort) &&
(svc.Spec.Type == slim_corev1.ServiceTypeNodePort || svc.Spec.Type == slim_corev1.ServiceTypeLoadBalancer) {

if option.Config.EnableNodePort {
Expand Down
74 changes: 44 additions & 30 deletions pkg/loadbalancer/experimental/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

daemonK8s "github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/cidr"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/ip"
Expand Down Expand Up @@ -201,7 +202,7 @@ func runServiceEndpointsReflector(ctx context.Context, health cell.Health, p ref
initServices(txn)

case resource.Upsert:
svc, fes := convertService(p.ExtConfig, obj)
svc, fes := convertService(p.ExtConfig, p.Log, obj)
if svc == nil {
return
}
Expand Down Expand Up @@ -373,7 +374,7 @@ func isHeadless(svc *slim_corev1.Service) bool {
return headless
}

func convertService(cfg ExternalConfig, svc *slim_corev1.Service) (s *Service, fes []FrontendParams) {
func convertService(cfg ExternalConfig, log *slog.Logger, svc *slim_corev1.Service) (s *Service, fes []FrontendParams) {
name := loadbalancer.ServiceName{Namespace: svc.Namespace, Name: svc.Name}
s = &Service{
Name: name,
Expand All @@ -384,6 +385,15 @@ func convertService(cfg ExternalConfig, svc *slim_corev1.Service) (s *Service, f
HealthCheckNodePort: uint16(svc.Spec.HealthCheckNodePort),
}

expType, err := k8s.NewSvcExposureType(svc)
if err != nil {
log.Warn("Ignoring annotation",
logfields.Error, err,
logfields.Annotations, annotation.ServiceTypeExposure,
logfields.Service, svc.GetName(),
)
}

if len(svc.Spec.Ports) > 0 {
s.PortNames = map[string]uint16{}
for _, port := range svc.Spec.Ports {
Expand Down Expand Up @@ -436,38 +446,40 @@ func convertService(cfg ExternalConfig, svc *slim_corev1.Service) (s *Service, f
}

// ClusterIP
var clusterIPs []string
if len(svc.Spec.ClusterIPs) > 0 {
clusterIPs = slices.Sorted(slices.Values(svc.Spec.ClusterIPs))
} else {
clusterIPs = []string{svc.Spec.ClusterIP}
}

for _, ip := range clusterIPs {
addr, err := cmtypes.ParseAddrCluster(ip)
if err != nil {
continue
if expType.CanExpose(slim_corev1.ServiceTypeClusterIP) {
var clusterIPs []string
if len(svc.Spec.ClusterIPs) > 0 {
clusterIPs = slices.Sorted(slices.Values(svc.Spec.ClusterIPs))
} else {
clusterIPs = []string{svc.Spec.ClusterIP}
}

if (!cfg.EnableIPv6 && addr.Is6()) || (!cfg.EnableIPv4 && addr.Is4()) {
continue
}
for _, ip := range clusterIPs {
addr, err := cmtypes.ParseAddrCluster(ip)
if err != nil {
continue
}

for _, port := range svc.Spec.Ports {
p := loadbalancer.NewL4Addr(loadbalancer.L4Type(port.Protocol), uint16(port.Port))
if p == nil {
if (!cfg.EnableIPv6 && addr.Is6()) || (!cfg.EnableIPv4 && addr.Is4()) {
continue
}
fe := FrontendParams{
Type: loadbalancer.SVCTypeClusterIP,
PortName: loadbalancer.FEPortName(port.Name),
ServiceName: name,
ServicePort: uint16(port.Port),

for _, port := range svc.Spec.Ports {
p := loadbalancer.NewL4Addr(loadbalancer.L4Type(port.Protocol), uint16(port.Port))
if p == nil {
continue
}
fe := FrontendParams{
Type: loadbalancer.SVCTypeClusterIP,
PortName: loadbalancer.FEPortName(port.Name),
ServiceName: name,
ServicePort: uint16(port.Port),
}
fe.Address.AddrCluster = addr
fe.Address.Scope = loadbalancer.ScopeExternal
fe.Address.L4Addr = *p
fes = append(fes, fe)
}
fe.Address.AddrCluster = addr
fe.Address.Scope = loadbalancer.ScopeExternal
fe.Address.L4Addr = *p
fes = append(fes, fe)
}
}

Expand All @@ -476,7 +488,9 @@ func convertService(cfg ExternalConfig, svc *slim_corev1.Service) (s *Service, f

if cfg.KubeProxyReplacement {
// NodePort
if svc.Spec.Type == slim_corev1.ServiceTypeNodePort || svc.Spec.Type == slim_corev1.ServiceTypeLoadBalancer {
if (svc.Spec.Type == slim_corev1.ServiceTypeNodePort || svc.Spec.Type == slim_corev1.ServiceTypeLoadBalancer) &&
expType.CanExpose(slim_corev1.ServiceTypeNodePort) {

for _, scope := range scopes {
for _, family := range getIPFamilies(svc) {
if (!cfg.EnableIPv6 && family == slim_corev1.IPv6Protocol) ||
Expand Down Expand Up @@ -517,7 +531,7 @@ func convertService(cfg ExternalConfig, svc *slim_corev1.Service) (s *Service, f
}

// LoadBalancer
if svc.Spec.Type == slim_corev1.ServiceTypeLoadBalancer {
if svc.Spec.Type == slim_corev1.ServiceTypeLoadBalancer && expType.CanExpose(slim_corev1.ServiceTypeLoadBalancer) {
for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP == "" {
continue
Expand Down
3 changes: 2 additions & 1 deletion pkg/loadbalancer/experimental/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package experimental

import (
"log/slog"
"testing"

"github.com/cilium/cilium/pkg/k8s"
Expand Down Expand Up @@ -35,7 +36,7 @@ func BenchmarkConvertService(b *testing.B) {

b.ResetTimer()
for range b.N {
convertService(benchmarkExternalConfig, svc)
convertService(benchmarkExternalConfig, slog.New(slog.DiscardHandler), svc)
}
b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "services/sec")
}
Expand Down
Loading
Loading
0