From 76da6593642189f2b1f28b14b527523582fbdf43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 4 Jun 2025 22:09:38 +0000 Subject: [PATCH 01/69] s3: rootCAs/tlsHost (#3001) consolidate on S3Config inside peer, add rootCA/tlsHost for minio setups --- .github/workflows/flow.yml | 12 +- flow/connectors/clickhouse/clickhouse.go | 27 ++-- flow/connectors/s3/s3.go | 11 +- flow/connectors/snowflake/qrep.go | 5 +- flow/connectors/utils/aws.go | 71 +++++++-- flow/e2e/clickhouse/clickhouse.go | 19 +-- flow/e2e/s3/cdc_s3_test.go | 18 ++- flow/e2e/s3/qrep_flow_s3_test.go | 5 + flow/e2e/s3/s3_helper.go | 69 ++++++--- nexus/analyzer/src/lib.rs | 24 +-- protos/peers.proto | 3 + ui/app/mirrors/create/handlers.ts | 3 +- ui/app/peers/create/[peerType]/helpers/ch.ts | 146 +++++++++++++----- .../peers/create/[peerType]/helpers/common.ts | 1 + ui/app/peers/create/[peerType]/helpers/my.ts | 5 +- ui/app/peers/create/[peerType]/helpers/pg.ts | 5 +- ui/app/peers/create/[peerType]/helpers/s3.ts | 26 ++++ ui/app/peers/create/[peerType]/helpers/sf.ts | 5 +- ui/app/peers/create/[peerType]/page.tsx | 5 +- ui/components/PeerForms/BigqueryConfig.tsx | 6 +- ui/components/PeerForms/ClickhouseConfig.tsx | 48 +----- .../PeerForms/ElasticsearchConfigForm.tsx | 2 +- .../PeerForms/Eventhubs/EventhubConfig.tsx | 2 +- .../Eventhubs/EventhubGroupConfig.tsx | 2 +- ui/components/PeerForms/KafkaConfig.tsx | 4 +- ui/components/PeerForms/MySqlForm.tsx | 74 +-------- ui/components/PeerForms/PostgresForm.tsx | 71 +-------- ui/components/PeerForms/PubSubConfig.tsx | 2 +- ui/components/PeerForms/S3Form.tsx | 5 +- ui/components/PeerForms/SnowflakeForm.tsx | 31 +--- ui/components/PeerForms/common.ts | 65 ++++++++ 31 files changed, 421 insertions(+), 351 deletions(-) create mode 100644 ui/components/PeerForms/common.ts diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 2c2e41607d..b74bbad64c 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -77,8 +77,6 @@ jobs: run: | # No need to update man pages on package install sudo apt-get remove --purge man-db - - sudo apt-get update sudo apt-get install libgeos-dev - run: go mod download @@ -126,6 +124,15 @@ jobs: if: matrix.db-version.mysql == 'maria' run: docker run -d --rm --name mariadb -p 3306:3306 -e MARIADB_ROOT_PASSWORD=cipass mariadb:lts --log-bin=maria + - name: MinIO TLS + run: > + mkdir -p certs && + openssl genrsa -out certs/cert.key 2048 && + openssl req -new -key certs/cert.key -out certs/cert.csr -subj /CN=minio.local && + openssl x509 -req -days 3650 -in certs/cert.csr -signkey certs/cert.key -out certs/cert.crt && + chown -R 1001 certs && + docker run -d --rm --name miniotls -p 9998:9998 -v "$PWD/certs:/certs" -e MINIO_SCHEME=https bitnami/minio:latest + - name: create hstore extension, increase logical replication limits, and setup catalog database run: > docker exec "${{ job.services.catalog.id }}" psql -U postgres -c "CREATE EXTENSION hstore;" @@ -184,6 +191,7 @@ jobs: AWS_ACCESS_KEY_ID: minio AWS_SECRET_ACCESS_KEY: miniosecret AWS_REGION: us-east-1 + AWS_ENDPOINT_URL_S3_TLS: https://localhost:9998 PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: minio PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: miniosecret PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION: us-east-1 diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 7ab49fef0a..b67d321b15 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -52,19 +52,28 @@ func NewClickHouseConnector( return nil, err } - credentialsProvider, err := utils.GetAWSCredentialsProvider(ctx, "clickhouse", utils.PeerAWSCredentials{ - Credentials: aws.Credentials{ - AccessKeyID: config.AccessKeyId, - SecretAccessKey: config.SecretAccessKey, - }, - EndpointUrl: config.Endpoint, - Region: config.Region, - }) + var awsConfig utils.PeerAWSCredentials + var awsBucketPath string + if config.S3 != nil { + awsConfig = utils.NewPeerAWSCredentials(config.S3) + awsBucketPath = config.S3.Url + } else { + awsConfig = utils.PeerAWSCredentials{ + Credentials: aws.Credentials{ + AccessKeyID: config.AccessKeyId, + SecretAccessKey: config.SecretAccessKey, + }, + EndpointUrl: config.Endpoint, + Region: config.Region, + } + awsBucketPath = config.S3Path + } + + credentialsProvider, err := utils.GetAWSCredentialsProvider(ctx, "clickhouse", awsConfig) if err != nil { return nil, err } - awsBucketPath := config.S3Path if awsBucketPath == "" { deploymentUID := internal.PeerDBDeploymentUID() flowName, _ := ctx.Value(shared.FlowNameKey).(string) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9a3aa9865d..b362b17b12 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "go.temporal.io/sdk/log" @@ -30,15 +29,7 @@ func NewS3Connector( ) (*S3Connector, error) { logger := internal.LoggerFromCtx(ctx) - provider, err := utils.GetAWSCredentialsProvider(ctx, "s3", utils.PeerAWSCredentials{ - Credentials: aws.Credentials{ - AccessKeyID: config.GetAccessKeyId(), - SecretAccessKey: config.GetSecretAccessKey(), - }, - RoleArn: config.RoleArn, - EndpointUrl: config.Endpoint, - Region: config.GetRegion(), - }) + provider, err := utils.GetAWSCredentialsProvider(ctx, "s3", utils.NewPeerAWSCredentials(config)) if err != nil { return nil, err } diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 84f4027d01..1d35fdf9e0 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -131,7 +131,6 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName cleanURL := fmt.Sprintf("s3://%s/%s/%s", s3o.Bucket, s3o.Prefix, config.FlowJobName) - s3Int := c.config.S3Integration provider, err := utils.GetAWSCredentialsProvider(ctx, "snowflake", utils.PeerAWSCredentials{}) if err != nil { return "", err @@ -141,7 +140,7 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName if err != nil { return "", err } - if s3Int == "" { + if c.config.S3Integration == "" { credsStr := fmt.Sprintf("CREDENTIALS=(AWS_KEY_ID='%s' AWS_SECRET_KEY='%s' AWS_TOKEN='%s')", creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) stageStatement := ` @@ -156,7 +155,7 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName URL = '%s' STORAGE_INTEGRATION = %s FILE_FORMAT = (TYPE = AVRO);` - return fmt.Sprintf(stageStatement, stageName, cleanURL, s3Int), nil + return fmt.Sprintf(stageStatement, stageName, cleanURL, c.config.S3Integration), nil } } diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index f98697115b..55c161779e 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -2,6 +2,7 @@ package utils import ( "context" + "crypto/tls" "fmt" "net/http" "net/url" @@ -21,7 +22,9 @@ import ( "github.com/aws/smithy-go/ptr" "github.com/google/uuid" + "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" + "github.com/PeerDB-io/peerdb/flow/shared" ) const ( @@ -45,15 +48,26 @@ type PeerAWSCredentials struct { ChainedRoleArn *string EndpointUrl *string Region string + RootCAs *string + TlsHost string } -type S3PeerCredentials struct { - AccessKeyID string `json:"accessKeyId"` - SecretAccessKey string `json:"secretAccessKey"` - AwsRoleArn string `json:"awsRoleArn"` - SessionToken string `json:"sessionToken"` - Region string `json:"region"` - Endpoint string `json:"endpoint"` +func NewPeerAWSCredentials(s3 *protos.S3Config) PeerAWSCredentials { + if s3 == nil { + return PeerAWSCredentials{} + } + return PeerAWSCredentials{ + Credentials: aws.Credentials{ + AccessKeyID: s3.GetAccessKeyId(), + SecretAccessKey: s3.GetSecretAccessKey(), + }, + RoleArn: s3.RoleArn, + ChainedRoleArn: nil, + EndpointUrl: s3.Endpoint, + Region: s3.GetRegion(), + RootCAs: s3.RootCa, + TlsHost: s3.TlsHost, + } } type ClickHouseS3Credentials struct { @@ -71,6 +85,7 @@ type AWSCredentialsProvider interface { GetUnderlyingProvider() aws.CredentialsProvider GetRegion() string GetEndpointURL() string + GetTlsConfig() (*string, string) } type ConfigBasedAWSCredentialsProvider struct { @@ -98,6 +113,10 @@ func (r *ConfigBasedAWSCredentialsProvider) GetEndpointURL() string { return endpoint } +func (r *ConfigBasedAWSCredentialsProvider) GetTlsConfig() (*string, string) { + return nil, "" +} + // Retrieve should be called as late as possible in order to have credentials with latest expiry func (r *ConfigBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error) { retrieved, err := r.config.Credentials.Retrieve(ctx) @@ -113,12 +132,16 @@ func (r *ConfigBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCr type StaticAWSCredentialsProvider struct { credentials AWSCredentials region string + rootCAs *string + tlsHost string } -func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string) *StaticAWSCredentialsProvider { +func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string, rootCAs *string, tlsHost string) *StaticAWSCredentialsProvider { return &StaticAWSCredentialsProvider{ credentials: credentials, region: region, + rootCAs: rootCAs, + tlsHost: tlsHost, } } @@ -142,6 +165,10 @@ func (s *StaticAWSCredentialsProvider) GetEndpointURL() string { return "" } +func (s *StaticAWSCredentialsProvider) GetTlsConfig() (*string, string) { + return s.rootCAs, s.tlsHost +} + type AssumeRoleBasedAWSCredentialsProvider struct { Provider aws.CredentialsProvider // New Credentials config aws.Config // Initial Config @@ -194,6 +221,10 @@ func (a *AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL() string { return endpoint } +func (a *AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig() (*string, string) { + return nil, "" +} + func getPeerDBAWSEnv(connectorName string, awsKey string) string { return os.Getenv(fmt.Sprintf("PEERDB_%s_AWS_CREDENTIALS_%s", strings.ToUpper(connectorName), awsKey)) } @@ -203,6 +234,8 @@ func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsP secretAccessKey := getPeerDBAWSEnv(connectorName, "AWS_SECRET_ACCESS_KEY") region := getPeerDBAWSEnv(connectorName, "AWS_REGION") endpointUrl := getPeerDBAWSEnv(connectorName, "AWS_ENDPOINT_URL_S3") + rootCa := getPeerDBAWSEnv(connectorName, "ROOT_CA") + tlsHost := getPeerDBAWSEnv(connectorName, "TLS_HOST") var endpointUrlPtr *string if endpointUrl != "" { endpointUrlPtr = &endpointUrl @@ -212,13 +245,18 @@ func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsP return nil } + var rootCAs *string + if rootCa != "" { + rootCAs = &rootCa + } + return NewStaticAWSCredentialsProvider(AWSCredentials{ AWS: aws.Credentials{ AccessKeyID: accessKeyId, SecretAccessKey: secretAccessKey, }, EndpointUrl: endpointUrlPtr, - }, region) + }, region, rootCAs, tlsHost) } func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error) { @@ -230,7 +268,7 @@ func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCr staticProvider := NewStaticAWSCredentialsProvider(AWSCredentials{ AWS: peerCredentials.Credentials, EndpointUrl: peerCredentials.EndpointUrl, - }, peerCredentials.Region) + }, peerCredentials.Region, peerCredentials.RootCAs, peerCredentials.TlsHost) if peerCredentials.RoleArn == nil || *peerCredentials.RoleArn == "" { logger.Info("Received AWS credentials from peer for connector: " + connectorName) return staticProvider, nil @@ -364,6 +402,19 @@ func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) ( region: options.Region, }, } + } else { + rootCAs, tlsHost := credsProvider.GetTlsConfig() + if rootCAs != nil || tlsHost != "" { + // start with a clone of DefaultTransport so we keep http2, idle-conns, etc. + tlsConfig, err := shared.CreateTlsConfig(tls.VersionTLS13, rootCAs, tlsHost, tlsHost, tlsHost == "") + if err != nil { + return nil, err + } + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = tlsConfig + options.HTTPClient = &http.Client{Transport: tr} + } } } diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 0cdabfa4fd..6b7934a7f3 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -64,25 +64,16 @@ func (s ClickHouseSuite) Peer() *protos.Peer { } func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { - region := "" - if s.s3Helper.S3Config.Region != nil { - region = *s.s3Helper.S3Config.Region - } - ret := &protos.Peer{ Name: e2e.AddSuffix(s, dbname), Type: protos.DBType_CLICKHOUSE, Config: &protos.Peer_ClickhouseConfig{ ClickhouseConfig: &protos.ClickhouseConfig{ - Host: "localhost", - Port: 9000, - Database: dbname, - S3Path: s.s3Helper.BucketName, - AccessKeyId: *s.s3Helper.S3Config.AccessKeyId, - SecretAccessKey: *s.s3Helper.S3Config.SecretAccessKey, - Region: region, - DisableTls: true, - Endpoint: s.s3Helper.S3Config.Endpoint, + Host: "localhost", + Port: 9000, + Database: dbname, + DisableTls: true, + S3: s.s3Helper.S3Config, }, }, } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 9f3d3042da..5567e7ca57 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -60,20 +60,22 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { ctx, cancel := context.WithTimeout(s.t.Context(), 25*time.Second) defer cancel() files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.t.Logf("Files in Test_Complete_Simple_Flow_S3 %s: %d", flowJobName, len(files)) e2e.EnvNoError(s.t, env, err) + s.t.Logf("Files in Test_Complete_Simple_Flow_S3 %s: %d", flowJobName, len(files)) return len(files) == 4 }) - // s3 normalize is nop, so check peerdb_stats directly that batch finalized pool, err := internal.GetCatalogConnectionPoolFromEnv(s.t.Context()) require.NoError(s.t, err) - var count int64 - require.NoError(s.t, pool.QueryRow(s.t.Context(), - "select count(*) from peerdb_stats.cdc_batches where flow_name = $1 and end_time is not null", - flowJobName, - ).Scan(&count)) - require.Equal(s.t, int64(4), count) + e2e.EnvWaitFor(s.t, env, time.Minute, "waiting for cdc batch completion", func() bool { + // s3 normalize is nop, so check peerdb_stats directly that batch finalized + var count int64 + require.NoError(s.t, pool.QueryRow(s.t.Context(), + "select count(*) from peerdb_stats.cdc_batches where flow_name = $1 and end_time is not null", + flowJobName, + ).Scan(&count)) + return count == 4 + }) env.Cancel(s.t.Context()) e2e.RequireEnvCanceled(s.t, env) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 48143321ad..e77c138ba3 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -116,6 +116,11 @@ func SetupSuiteMinIO(t *testing.T) PeerFlowE2ETestSuiteS3 { return setupSuite(t, Minio) } +func SetupSuiteMinIO_TLS(t *testing.T) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, MinioTls) +} + func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { s.t.Skip("Skipping S3 test") diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 4524d5eaf0..1a76301fd0 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -2,12 +2,12 @@ package e2e_s3 import ( "context" + "encoding/base64" "encoding/json" "fmt" "os" "time" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" @@ -26,17 +26,29 @@ type S3TestHelper struct { type S3Environment int +type S3PeerCredentials struct { + AccessKeyID string `json:"accessKeyId"` + SecretAccessKey string `json:"secretAccessKey"` + AwsRoleArn string `json:"awsRoleArn"` + SessionToken string `json:"sessionToken"` + Region string `json:"region"` + Endpoint string `json:"endpoint"` +} + const ( Aws S3Environment = iota Gcs Minio + MinioTls ) func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestHelper, error) { - var config utils.S3PeerCredentials + var config S3PeerCredentials var endpoint string var credsPath string var bucketName string + var rootCA *string + var tlsHost string switch s3environment { case Aws: credsPath = os.Getenv("TEST_S3_CREDS") @@ -51,6 +63,18 @@ func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestH config.AccessKeyID = os.Getenv("AWS_ACCESS_KEY_ID") config.SecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY") config.Region = os.Getenv("AWS_REGION") + case MinioTls: + bucketName = "peerdb" + endpoint = os.Getenv("AWS_ENDPOINT_URL_S3_TLS") + config.AccessKeyID = os.Getenv("AWS_ACCESS_KEY_ID") + config.SecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY") + config.Region = os.Getenv("AWS_REGION") + bytes, err := e2eshared.ReadFileToBytes("./certs/cert.crt") + if err != nil { + return nil, err + } + rootCA = shared.Ptr(base64.StdEncoding.EncodeToString(bytes)) + tlsHost = "minio.local" default: panic(fmt.Sprintf("invalid s3environment %d", s3environment)) } @@ -66,35 +90,32 @@ func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestH } } - var endpointUrlPtr *string - if endpoint != "" { - endpointUrlPtr = &endpoint + prefix := fmt.Sprintf("peerdb_test/%d_%s", time.Now().Unix(), shared.RandomString(6)) + + s3config := &protos.S3Config{ + Url: fmt.Sprintf("s3://%s/%s", bucketName, prefix), + AccessKeyId: &config.AccessKeyID, + SecretAccessKey: &config.SecretAccessKey, + Region: &config.Region, + Endpoint: shared.Ptr(endpoint), + RootCa: rootCA, + TlsHost: tlsHost, + } + + provider, err := utils.GetAWSCredentialsProvider(ctx, "ci", utils.NewPeerAWSCredentials(s3config)) + if err != nil { + return nil, err } - provider := utils.NewStaticAWSCredentialsProvider(utils.AWSCredentials{ - AWS: aws.Credentials{ - AccessKeyID: config.AccessKeyID, - SecretAccessKey: config.SecretAccessKey, - SessionToken: config.SessionToken, - }, - EndpointUrl: endpointUrlPtr, - }, config.Region) client, err := utils.CreateS3Client(ctx, provider) if err != nil { return nil, err } - prefix := fmt.Sprintf("peerdb_test/%d_%s", time.Now().Unix(), shared.RandomString(6)) return &S3TestHelper{ - client, - &protos.S3Config{ - Url: fmt.Sprintf("s3://%s/%s", bucketName, prefix), - AccessKeyId: &config.AccessKeyID, - SecretAccessKey: &config.SecretAccessKey, - Region: &config.Region, - Endpoint: endpointUrlPtr, - }, - bucketName, - prefix, + client: client, + S3Config: s3config, + BucketName: bucketName, + prefix: prefix, }, nil } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 19601d3f5a..5f47ec6508 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -63,7 +63,7 @@ impl StatementAnalyzer for PeerExistanceAnalyzer<'_> { }; // Necessary as visit_relations fails to deeply visit some structures. - visit_statements(statement, |stmt| { + let _ = visit_statements(statement, |stmt| { match stmt { Statement::Drop { names, .. } => { for name in names { @@ -73,7 +73,7 @@ impl StatementAnalyzer for PeerExistanceAnalyzer<'_> { Statement::Declare { stmts } => { for stmt in stmts { if let Some(ref query) = stmt.for_query { - visit_relations(query, |relation| { + let _ = visit_relations(query, |relation| { analyze_name(&relation.0[0].value); ControlFlow::<()>::Continue(()) }); @@ -85,7 +85,7 @@ impl StatementAnalyzer for PeerExistanceAnalyzer<'_> { ControlFlow::<()>::Continue(()) }); - visit_relations(statement, |relation| { + let _ = visit_relations(statement, |relation| { analyze_name(&relation.0[0].value); ControlFlow::<()>::Continue(()) }); @@ -278,7 +278,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { let cdc_staging_path = match raw_options.remove("cdc_staging_path") { Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), - _ => Some("".to_string()), + _ => None, }; let max_batch_size: Option = match raw_options.remove("max_batch_size") @@ -603,11 +603,6 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu Config::BigqueryConfig(bq_config) } DbType::Snowflake => { - let s3_int = opts - .get("s3_integration") - .map(|s| s.to_string()) - .unwrap_or_default(); - let snowflake_config = SnowflakeConfig { account_id: opts .get("account_id") @@ -637,7 +632,10 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .context("unable to parse query_timeout")?, password: opts.get("password").map(|s| s.to_string()), metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), - s3_integration: s3_int, + s3_integration: opts + .get("s3_integration") + .map(|s| s.to_string()) + .unwrap_or_default(), }; Config::SnowflakeConfig(snowflake_config) } @@ -728,6 +726,11 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu region: opts.get("region").map(|s| s.to_string()), role_arn: opts.get("role_arn").map(|s| s.to_string()), endpoint: opts.get("endpoint").map(|s| s.to_string()), + root_ca: opts.get("root_ca").map(|s| s.to_string()), + tls_host: opts + .get("tls_host") + .map(|s| s.to_string()) + .unwrap_or_default(), }; Config::S3Config(s3_config) } @@ -800,6 +803,7 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .get("tls_host") .map(|s| s.to_string()) .unwrap_or_default(), + s3: None, }; Config::ClickhouseConfig(clickhouse_config) } diff --git a/protos/peers.proto b/protos/peers.proto index c54ce35ff6..6ef70ec20b 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -141,6 +141,8 @@ message S3Config { optional string role_arn = 4; optional string region = 5; optional string endpoint = 6; + optional string root_ca = 7 [(peerdb_redacted) = true]; + string tls_host = 8; } message ClickhouseConfig{ @@ -159,6 +161,7 @@ message ClickhouseConfig{ optional string private_key = 13 [(peerdb_redacted) = true]; optional string root_ca = 14 [(peerdb_redacted) = true]; string tls_host = 15; + optional S3Config s3 = 16; } message SqlServerConfig { diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index a1120b0997..e7696d8608 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -258,8 +258,7 @@ export async function handleCreateCDC( } as CreateCDCFlowRequest), }); if (!res.ok) { - // I don't know why but if the order is reversed the error message is not - // shown + // don't know why but if order is reversed the error message is not shown setLoading(false); notifyErr((await res.json()).message || 'Unable to create mirror.'); return; diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index dede1c2d83..9d2d390bde 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -1,5 +1,6 @@ import { ClickhouseConfig } from '@/grpc_generated/peers'; import { PeerSetting } from './common'; +import { blankS3Setting } from './s3'; export const clickhouseSetting: PeerSetting[] = [ { @@ -44,96 +45,161 @@ export const clickhouseSetting: PeerSetting[] = [ tips: 'If you are using a non-TLS connection for ClickHouse server, check this box.', optional: true, }, + { + label: 'Certificate', + stateHandler: (value, setter) => { + if (!value) { + // remove key from state if empty + setter((curr) => { + const newCurr = { ...curr } as ClickhouseConfig; + delete newCurr.certificate; + return newCurr; + }); + } else setter((curr) => ({ ...curr, certificate: value as string })); + }, + type: 'file', + optional: true, + tips: 'This is only needed if the user is authenticated via certificate.', + }, + { + label: 'Private Key', + stateHandler: (value, setter) => { + if (!value) { + // remove key from state if empty + setter((curr) => { + const newCurr = { ...curr } as ClickhouseConfig; + delete newCurr.privateKey; + return newCurr; + }); + } else setter((curr) => ({ ...curr, privateKey: value as string })); + }, + type: 'file', + optional: true, + tips: 'This is only needed if the user is authenticated via certificate.', + }, { label: 'S3 Path', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, s3Path: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + url: value as string, + }, + })), tips: `This is an S3 bucket/object URL field. This bucket will be used as our intermediate stage for CDC`, placeholder: 's3://', + s3: true, }, { label: 'Access Key ID', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, accessKeyId: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + accessKeyId: value as string, + }, + })), tips: 'The AWS access key ID associated with your account.', helpfulLink: 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + s3: true, }, { label: 'Secret Access Key', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, secretAccessKey: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + secretAccessKey: value as string, + }, + })), tips: 'The AWS secret access key associated with the above bucket.', helpfulLink: 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + s3: true, }, { label: 'Region', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, region: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + region: value as string, + }, + })), tips: 'The region where your bucket is located. For example, us-east-1.', + s3: true, }, { label: 'Endpoint', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, endpoint: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + endpoint: value as string, + }, + })), helpfulLink: 'https://docs.aws.amazon.com/general/latest/gr/s3.html#s3_region', tips: 'An endpoint is the URL of the entry point for an AWS web service.', optional: true, + s3: true, }, { - label: 'Certificate', - stateHandler: (value, setter) => { - if (!value) { - // remove key from state if empty - setter((curr) => { - delete (curr as ClickhouseConfig)['certificate']; - return curr; - }); - } else setter((curr) => ({ ...curr, certificate: value as string })); - }, - type: 'file', - optional: true, - tips: 'This is only needed if the user is authenticated via certificate.', - }, - { - label: 'Private Key', - stateHandler: (value, setter) => { - if (!value) { - // remove key from state if empty - setter((curr) => { - delete (curr as ClickhouseConfig)['privateKey']; - return curr; - }); - } else setter((curr) => ({ ...curr, privateKey: value as string })); - }, - type: 'file', - optional: true, - tips: 'This is only needed if the user is authenticated via certificate.', - }, - { - label: 'Root Certificate', + label: 'S3 Root Certificate', stateHandler: (value, setter) => { if (!value) { // remove key from state if empty setter((curr) => { - delete (curr as ClickhouseConfig)['rootCa']; + const s3 = (curr as ClickhouseConfig).s3; + if (s3) { + const new3 = { ...s3 }; + delete new3.rootCa; + return { ...curr, s3: new3 }; + } return curr; }); - } else setter((curr) => ({ ...curr, rootCa: value as string })); + } else { + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + rootCa: value as string, + }, + })); + } }, type: 'file', optional: true, tips: 'If not provided, host CA roots will be used.', + s3: true, }, { - label: 'TLS Hostname', + label: 'S3 TLS Hostname', field: 'tlsHost', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, tlsHost: value as string })), + setter((curr) => ({ + ...curr, + s3: { + ...blankS3Setting, + ...(curr as ClickhouseConfig).s3, + tlsHost: value as string, + }, + })), tips: 'Overrides expected hostname during tls cert verification.', optional: true, + s3: true, }, ]; diff --git a/ui/app/peers/create/[peerType]/helpers/common.ts b/ui/app/peers/create/[peerType]/helpers/common.ts index 2a5b4ff2b7..f53f6a5a82 100644 --- a/ui/app/peers/create/[peerType]/helpers/common.ts +++ b/ui/app/peers/create/[peerType]/helpers/common.ts @@ -21,6 +21,7 @@ export interface PeerSetting { default?: string | number; placeholder?: string; options?: { value: string; label: string }[]; + s3?: true | undefined; } export function getBlankSetting(dbType: string): PeerConfig { diff --git a/ui/app/peers/create/[peerType]/helpers/my.ts b/ui/app/peers/create/[peerType]/helpers/my.ts index 145ec90d13..e5483bba0f 100644 --- a/ui/app/peers/create/[peerType]/helpers/my.ts +++ b/ui/app/peers/create/[peerType]/helpers/my.ts @@ -104,8 +104,9 @@ export const mysqlSetting: PeerSetting[] = [ if (!value) { // remove key from state if empty setter((curr) => { - delete (curr as MySqlConfig)['rootCa']; - return curr; + const newCurr = { ...curr } as MySqlConfig; + delete newCurr.rootCa; + return newCurr; }); } else setter((curr) => ({ ...curr, rootCa: value as string })); }, diff --git a/ui/app/peers/create/[peerType]/helpers/pg.ts b/ui/app/peers/create/[peerType]/helpers/pg.ts index ad1da01c7b..dfe4c75be2 100644 --- a/ui/app/peers/create/[peerType]/helpers/pg.ts +++ b/ui/app/peers/create/[peerType]/helpers/pg.ts @@ -66,8 +66,9 @@ export const postgresSetting: PeerSetting[] = [ if (!value) { // remove key from state if empty setter((curr) => { - delete (curr as PostgresConfig)['rootCa']; - return curr; + const newCurr = { ...curr } as PostgresConfig; + delete newCurr.rootCa; + return newCurr; }); } else setter((curr) => ({ ...curr, rootCa: value as string })); }, diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts index 8e98b91649..54b1f7fd3c 100644 --- a/ui/app/peers/create/[peerType]/helpers/s3.ts +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -50,6 +50,30 @@ export const s3Setting: PeerSetting[] = [ 'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns', optional: true, }, + { + label: 'Root Certificate', + stateHandler: (value, setter) => { + if (!value) { + // remove key from state if empty + setter((curr) => { + const newCurr = { ...curr } as S3Config; + delete newCurr.rootCa; + return newCurr; + }); + } else setter((curr) => ({ ...curr, rootCa: value as string })); + }, + type: 'file', + optional: true, + tips: 'If not provided, host CA roots will be used.', + }, + { + label: 'TLS Host', + field: 'tlsHost', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, tlsHost: value as string })), + tips: 'Overrides expected hostname during tls cert verification.', + optional: true, + }, ]; export const blankS3Setting: S3Config = { @@ -59,4 +83,6 @@ export const blankS3Setting: S3Config = { roleArn: undefined, region: undefined, endpoint: '', + rootCa: undefined, + tlsHost: '', }; diff --git a/ui/app/peers/create/[peerType]/helpers/sf.ts b/ui/app/peers/create/[peerType]/helpers/sf.ts index 7ab474d1e8..7866c055bb 100644 --- a/ui/app/peers/create/[peerType]/helpers/sf.ts +++ b/ui/app/peers/create/[peerType]/helpers/sf.ts @@ -54,8 +54,9 @@ export const snowflakeSetting: PeerSetting[] = [ if (!value) { // remove password key from state if empty setter((curr) => { - delete (curr as SnowflakeConfig)['password']; - return curr; + const newCurr = { ...curr } as SnowflakeConfig; + delete newCurr.password; + return newCurr; }); } else setter((curr) => ({ ...curr, password: value as string })); }, diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index a25fb70b6c..b62ad770b5 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -74,7 +74,7 @@ export default function CreateConfig({ return peerType; }; - const configComponentMap = (peerType: string) => { + const configComponentMap = () => { switch (getDBType()) { case 'POSTGRES': return ( @@ -82,7 +82,6 @@ export default function CreateConfig({ settings={postgresSetting} setter={setConfig} config={config as PostgresConfig} - type={peerType} /> ); case 'MYSQL': @@ -206,7 +205,7 @@ export default function CreateConfig({ Configuration -
{configComponentMap(peerType)}
+
{configComponentMap()}
- + ); } diff --git a/ui/components/PeerForms/KafkaConfig.tsx b/ui/components/PeerForms/KafkaConfig.tsx index 7fb40d6aee..4cb22af8a9 100644 --- a/ui/components/PeerForms/KafkaConfig.tsx +++ b/ui/components/PeerForms/KafkaConfig.tsx @@ -26,7 +26,7 @@ export default function KafkaForm({ setter }: KafkaProps) { {!setting.optional && (