From 1186d811214fd50c4607fcf95e2a3b7d18eab442 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Mon, 7 Feb 2022 14:47:21 -0800 Subject: [PATCH] Add config and handling for remote cluster cert --- common/config/config.go | 2 + common/rpc.go | 4 +- .../rpc/encryption/localStoreTlsProvider.go | 102 +++++++++++++++--- .../testDynamicTLSConfigProvider.go | 5 + common/rpc/encryption/tlsFactory.go | 1 + common/rpc/rpc.go | 60 ++++++++--- common/rpc/test/rpc_common_test.go | 29 +++-- common/rpc/test/rpc_localstore_tls_test.go | 53 ++++++--- temporal/server_impl.go | 2 +- 9 files changed, 198 insertions(+), 60 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index 3be6b3a3c35..7780e803b4e 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -116,6 +116,8 @@ type ( Frontend GroupTLS `yaml:"frontend"` // SystemWorker controls TLS setting for System Workers connecting to Frontend. SystemWorker WorkerTLS `yaml:"systemWorker"` + // RemoteFrontendClients controls TLS setting for talking to remote cluster. + RemoteClusters map[string]GroupTLS `yaml:"remoteClusters"` // ExpirationChecks defines settings for periodic checks for expiration of certificates ExpirationChecks CertExpirationValidation `yaml:"expirationChecks"` // Interval between refreshes of certificates loaded from files diff --git a/common/rpc.go b/common/rpc.go index 33a733a65f7..20a374a2711 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -38,7 +38,7 @@ type ( GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) GetGRPCListener() net.Listener GetRingpopChannel() *tchannel.Channel - CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn - CreateInternodeGRPCConnection(hostName string) *grpc.ClientConn + CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn + CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn } ) diff --git a/common/rpc/encryption/localStoreTlsProvider.go b/common/rpc/encryption/localStoreTlsProvider.go index 74273656894..65a782fb5cd 100644 --- a/common/rpc/encryption/localStoreTlsProvider.go +++ b/common/rpc/encryption/localStoreTlsProvider.go @@ -51,17 +51,18 @@ type localStoreTlsProvider struct { settings *config.RootTLS - internodeCertProvider CertProvider - internodeClientCertProvider CertProvider - frontendCertProvider CertProvider - workerCertProvider CertProvider - - frontendPerHostCertProviderMap *localStorePerHostCertProviderMap - - cachedInternodeServerConfig *tls.Config - cachedInternodeClientConfig *tls.Config - cachedFrontendServerConfig *tls.Config - cachedFrontendClientConfig *tls.Config + internodeCertProvider CertProvider + internodeClientCertProvider CertProvider + frontendCertProvider CertProvider + workerCertProvider CertProvider + remoteClusterClientCertProvider map[string]CertProvider + frontendPerHostCertProviderMap *localStorePerHostCertProviderMap + + cachedInternodeServerConfig *tls.Config + cachedInternodeClientConfig *tls.Config + cachedFrontendServerConfig *tls.Config + cachedFrontendClientConfig *tls.Config + cachedRemoteClusterClientConfig map[string]*tls.Config ticker *time.Ticker logger log.Logger @@ -84,6 +85,11 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerProvider = internodeWorkerProvider } + remoteClusterClientCertProvider := make(map[string]CertProvider) + for hostname, groupTLS := range tlsConfig.RemoteClusters { + remoteClusterClientCertProvider[hostname] = certProviderFactory(&groupTLS, nil, nil, tlsConfig.RefreshInterval, logger) + } + provider := &localStoreTlsProvider{ internodeCertProvider: internodeProvider, internodeClientCertProvider: internodeProvider, @@ -91,10 +97,12 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerCertProvider: workerProvider, frontendPerHostCertProviderMap: newLocalStorePerHostCertProviderMap( tlsConfig.Frontend.PerHostOverrides, certProviderFactory, tlsConfig.RefreshInterval, logger), - RWMutex: sync.RWMutex{}, - settings: tlsConfig, - scope: scope, - logger: logger, + remoteClusterClientCertProvider: remoteClusterClientCertProvider, + RWMutex: sync.RWMutex{}, + settings: tlsConfig, + scope: scope, + logger: logger, + cachedRemoteClusterClientConfig: make(map[string]*tls.Config), } provider.initialize() return provider, nil @@ -155,6 +163,26 @@ func (s *localStoreTlsProvider) GetFrontendClientConfig() (*tls.Config, error) { ) } +func (s *localStoreTlsProvider) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + groupTLS, ok := s.settings.RemoteClusters[hostname] + if !ok { + return nil, nil + } + + return s.getOrCreateRemoteClusterClientConfig( + hostname, + func() (*tls.Config, error) { + return newClientTLSConfig( + s.remoteClusterClientCertProvider[hostname], + groupTLS.Client.ServerName, + groupTLS.Server.RequireClientAuth, + false, + !groupTLS.Client.DisableHostVerification) + }, + groupTLS.IsClientEnabled(), + ) +} + func (s *localStoreTlsProvider) GetFrontendServerConfig() (*tls.Config, error) { return s.getOrCreateConfig( &s.cachedFrontendServerConfig, @@ -239,6 +267,41 @@ func (s *localStoreTlsProvider) getOrCreateConfig( return *cachedConfig, nil } +func (s *localStoreTlsProvider) getOrCreateRemoteClusterClientConfig( + hostname string, + configConstructor tlsConfigConstructor, + isEnabled bool, +) (*tls.Config, error) { + if !isEnabled { + return nil, nil + } + + // Check if exists under a read lock first + s.RLock() + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + defer s.RUnlock() + return clientConfig, nil + } + // Not found, promote to write lock to initialize + s.RUnlock() + s.Lock() + defer s.Unlock() + // Check if someone got here first while waiting for write lock + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + return clientConfig, nil + } + + // Load configuration + localConfig, err := configConstructor() + + if err != nil { + return nil, err + } + + s.cachedRemoteClusterClientConfig[hostname] = localConfig + return localConfig, nil +} + func newServerTLSConfig( certProvider CertProvider, perHostCertProviderMap PerHostCertProviderMap, @@ -321,8 +384,13 @@ func getServerTLSConfigFromCertProvider( logger), nil } -func newClientTLSConfig(clientProvider CertProvider, serverName string, isAuthRequired bool, - isWorker bool, enableHostVerification bool) (*tls.Config, error) { +func newClientTLSConfig( + clientProvider CertProvider, + serverName string, + isAuthRequired bool, + isWorker bool, + enableHostVerification bool, +) (*tls.Config, error) { // Optional ServerCA for client if not already trusted by host serverCa, err := clientProvider.FetchServerRootCAsForClient(isWorker) if err != nil { diff --git a/common/rpc/encryption/testDynamicTLSConfigProvider.go b/common/rpc/encryption/testDynamicTLSConfigProvider.go index 69e9662773d..279ad413433 100644 --- a/common/rpc/encryption/testDynamicTLSConfigProvider.go +++ b/common/rpc/encryption/testDynamicTLSConfigProvider.go @@ -72,6 +72,11 @@ func (t *TestDynamicTLSConfigProvider) GetExpiringCerts(timeWindow time.Duration panic("implement me") } +func (t *TestDynamicTLSConfigProvider) GetRemoteClusterClientConfig(hostName string) (*tls.Config, error) { + //TODO implement me + panic("implement me") +} + var _ TLSConfigProvider = (*TestDynamicTLSConfigProvider)(nil) func NewTestDynamicTLSConfigProvider( diff --git a/common/rpc/encryption/tlsFactory.go b/common/rpc/encryption/tlsFactory.go index 3b97c8dc7c0..5e7c1bf9a6d 100644 --- a/common/rpc/encryption/tlsFactory.go +++ b/common/rpc/encryption/tlsFactory.go @@ -44,6 +44,7 @@ type ( GetInternodeClientConfig() (*tls.Config, error) GetFrontendServerConfig() (*tls.Config, error) GetFrontendClientConfig() (*tls.Config, error) + GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) GetExpiringCerts(timeWindow time.Duration) (expiring CertExpirationMap, expired CertExpirationMap, err error) } diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 297144275b2..6938501fe5c 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -31,23 +31,24 @@ import ( "sync" "github.com/uber/tchannel-go" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/rpc/encryption" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // RPCFactory is an implementation of service.RPCFactory interface type RPCFactory struct { - config *config.RPC - serviceName string - logger log.Logger - dc *dynamicconfig.Collection + config *config.RPC + serviceName string + logger log.Logger + dc *dynamicconfig.Collection + clusterMetadata *cluster.Config sync.Mutex grpcListener net.Listener @@ -57,13 +58,21 @@ type RPCFactory struct { // NewFactory builds a new RPCFactory // conforming to the underlying configuration -func NewFactory(cfg *config.RPC, sName string, logger log.Logger, tlsProvider encryption.TLSConfigProvider, dc *dynamicconfig.Collection) *RPCFactory { +func NewFactory( + cfg *config.RPC, + sName string, + logger log.Logger, + tlsProvider encryption.TLSConfigProvider, + dc *dynamicconfig.Collection, + clusterMetadata *cluster.Config, +) *RPCFactory { return &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - dc: dc, - tlsFactory: tlsProvider, + config: cfg, + serviceName: sName, + logger: logger, + dc: dc, + tlsFactory: tlsProvider, + clusterMetadata: clusterMetadata, } } @@ -92,6 +101,14 @@ func (d *RPCFactory) GetFrontendClientTlsConfig() (*tls.Config, error) { return nil, nil } +func (d *RPCFactory) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + if d.tlsFactory != nil { + return d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + + return nil, nil +} + func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) { var opts []grpc.ServerOption @@ -237,18 +254,29 @@ func getListenIP(cfg *config.RPC, logger log.Logger) net.IP { } // CreateFrontendGRPCConnection creates connection for gRPC calls -func (d *RPCFactory) CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn { +func (d *RPCFactory) CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn { var tlsClientConfig *tls.Config var err error if d.tlsFactory != nil { - tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + currCluster := d.clusterMetadata.ClusterInformation[d.clusterMetadata.CurrentClusterName] + + if currCluster.RPCAddress == rpcAddress { + tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + } else { + hostname, _, err2 := net.SplitHostPort(rpcAddress) + if err2 != nil { + d.logger.Fatal("Invalid rpcAddress for remote cluster", tag.Error(err2)) + } + tlsClientConfig, err = d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + if err != nil { d.logger.Fatal("Failed to create tls config for gRPC connection", tag.Error(err)) return nil } } - return d.dial(hostName, tlsClientConfig) + return d.dial(rpcAddress, tlsClientConfig) } // CreateInternodeGRPCConnection creates connection for gRPC calls diff --git a/common/rpc/test/rpc_common_test.go b/common/rpc/test/rpc_common_test.go index accc5c21f4c..2ecdfd698d0 100644 --- a/common/rpc/test/rpc_common_test.go +++ b/common/rpc/test/rpc_common_test.go @@ -28,19 +28,19 @@ import ( "context" "crypto/tls" "math/rand" + "net" "strings" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/peer" - "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - "google.golang.org/grpc/examples/helloworld/helloworld" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/log" "go.temporal.io/server/common/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/examples/helloworld/helloworld" + "google.golang.org/grpc/peer" ) // HelloServer is used to implement helloworld.GreeterServer. @@ -53,6 +53,7 @@ type ServerUsageType int32 const ( Frontend ServerUsageType = iota Internode + RemoteCluster ) const ( @@ -82,6 +83,10 @@ var ( BroadcastAddress: localhostIPv4, }, } + clusterMetadata = &cluster.Config{ + CurrentClusterName: "test", + ClusterInformation: map[string]cluster.ClusterInformation{"test": {RPCAddress: localhostIPv4 + ":1234"}}, + } ) func startHelloWorldServer(s suite.Suite, factory *TestFactory) (*grpc.Server, string) { @@ -166,15 +171,21 @@ func dialHelloAndGetTLSInfo( logger := log.NewNoopLogger() var cfg *tls.Config var err error - if serverType == Internode { + switch serverType { + case Internode: cfg, err = clientFactory.GetInternodeClientTlsConfig() - } else { + case Frontend: cfg, err = clientFactory.GetFrontendClientTlsConfig() + case RemoteCluster: + host, _, err := net.SplitHostPort(hostport) + s.NoError(err) + cfg, err = clientFactory.GetRemoteClusterClientConfig(host) } - s.NoError(err) + clientConn, err := rpc.Dial(hostport, cfg, logger) s.NoError(err) + client := helloworld.NewGreeterClient(clientConn) request := &helloworld.HelloRequest{Name: convert.Uint64ToString(rand.Uint64())} diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index bf37bb80877..6827b2b4230 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -76,6 +76,7 @@ type localStoreRPCSuite struct { internodeDynamicTLSFactory *TestFactory internodeMutualTLSRPCRefreshFactory *TestFactory frontendMutualTLSRPCRefreshFactory *TestFactory + remoteClusterMutualTLSRPCFactory *TestFactory frontendConfigRootCAForceTLSFactory *TestFactory internodeCertDir string @@ -138,7 +139,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, nil, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -331,24 +332,32 @@ func (s *localStoreRPCSuite) setupFrontend() { }, } + localStoreMutualTLSRemoteCluster := &config.Global{ + Membership: s.membershipConfig, + TLS: config.RootTLS{ + Frontend: s.frontendConfigPerHostOverrides, + RemoteClusters: map[string]config.GroupTLS{localhostIPv4: s.frontendConfigPerHostOverrides}, + }, + } + provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, nil, s.logger, nil) s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -362,7 +371,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.frontendRollingCerts, s.dynamicCACertPool, s.wrongCACertPool) - dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection()) + dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -370,9 +379,15 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) + + provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, nil, s.logger, nil) + s.NoError(err) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + s.NotNil(remoteClusterMutualTLSRPCFactory) + s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } func (s *localStoreRPCSuite) setupInternode() { @@ -406,22 +421,22 @@ func (s *localStoreRPCSuite) setupInternode() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) @@ -454,16 +469,16 @@ func (s *localStoreRPCSuite) setupInternodeRingpop() { provider, err := encryption.NewTLSConfigProviderFromConfig(ringpopMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryA) - ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryB) provider, err = encryption.NewTLSConfigProviderFromConfig(ringpopServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryA) - ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryB) s.ringpopMutualTLSRPCFactoryA = i(ringpopMutualTLSFactoryA) @@ -580,6 +595,10 @@ func i(r *rpc.RPCFactory) *TestFactory { return &TestFactory{serverUsage: Internode, RPCFactory: r} } +func r(r *rpc.RPCFactory) *TestFactory { + return &TestFactory{serverUsage: RemoteCluster, RPCFactory: r} +} + func convertFileToBase64(file string) string { fileBytes, err := os.ReadFile(file) if err != nil { @@ -605,6 +624,10 @@ func (s *localStoreRPCSuite) TestMutualTLSFrontendToFrontend() { runHelloWorldTest(s.Suite, localhostIPv4, s.frontendMutualTLSRPCFactory, s.frontendMutualTLSRPCFactory, true) } +func (s *localStoreRPCSuite) TestMutualTLSFrontendToRemoteCluster() { + runHelloWorldTest(s.Suite, localhostIPv4, s.remoteClusterMutualTLSRPCFactory, s.remoteClusterMutualTLSRPCFactory, true) +} + func (s *localStoreRPCSuite) TestMutualTLSButClientInsecure() { runHelloWorldTest(s.Suite, localhostIPv4, s.internodeMutualTLSRPCFactory, s.insecureRPCFactory, false) } diff --git a/temporal/server_impl.go b/temporal/server_impl.go index 0ceb7dcd8dc..eb6ed658201 100644 --- a/temporal/server_impl.go +++ b/temporal/server_impl.go @@ -183,7 +183,7 @@ func newBootstrapParams( } svcCfg := cfg.Services[svcName] - rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc) + rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc, clusterMetadata) params.RPCFactory = rpcFactory // Ringpop uses a different port to register handlers, this map is needed to resolve