8000 release-22.1: sql: add bulkio.column_backfill.update_chunk_size_threshold_bytes by postamar · Pull Request #83816 · cockroachdb/cockroach · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

release-22.1: sql: add bulkio.column_backfill.update_chunk_size_threshold_bytes #83816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt, /* validateFn */
)

// columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which
// an update batch is run at once when adding or removing columns.
var columnBackfillUpdateChunkSizeThresholdBytes = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.column_backfill.update_chunk_size_threshold_bytes",
"the batch size in bytes above which an update is immediately run when adding/removing columns",
10<<20, /* 10 MiB */
settings.NonNegativeInt, /* validateFn */
)

var _ sort.Interface = columnsByID{}
var _ sort.Interface = indexesByID{}

Expand Down Expand Up @@ -1187,6 +1197,7 @@ func (sc *SchemaChanger) distColumnBackfill(
ctx context.Context,
version descpb.DescriptorVersion,
backfillChunkSize int64,
backfillUpdateChunkSizeThresholdBytes uint64,
filter backfill.MutationFilter,
) error {
duration := checkpointInterval
Expand Down Expand Up @@ -1281,7 +1292,7 @@ func (sc *SchemaChanger) distColumnBackfill(

planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
DistributionTypeSystemTenantOnly)
spec, err := initColumnBackfillerSpec(*tableDesc.TableDesc(), duration, chunkSize, readAsOf)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
}
Expand Down Expand Up @@ -2149,8 +2160,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
log.Infof(ctx, "clearing and backfilling columns")

if err := sc.distColumnBackfill(
ctx, version, columnBackfillBatchSize.Get(&sc.settings.SV),
backfill.ColumnMutationFilter); err != nil {
ctx,
version,
columnBackfillBatchSize.Get(&sc.settings.SV),
uint64(columnBackfillUpdateChunkSizeThresholdBytes.Get(&sc.settings.SV)),
backfill.ColumnMutationFilter,
); err != nil {
return err
}
log.Info(ctx, "finished clearing and backfilling columns")
Expand Down Expand Up @@ -2579,9 +2594,12 @@ func columnBackfillInTxn(
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunColumnBackfillChunk(ctx,
txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV)
scanBatchSize := rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV))
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV))
const alsoCommit = false
sp.Key, err = backfiller.RunColumnBackfillChunk(
ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV,
)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
tableDesc catalog.TableDescriptor,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
Expand Down Expand Up @@ -375,6 +376,20 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
); err != nil {
return roachpb.Key{}, err
}

// Exit early to flush if the batch byte size exceeds a predefined
// threshold. This can happen when table rows are more on the "fat" side,
// typically with large BYTES or JSONB columns.
//
// This helps prevent exceedingly large raft commands which will
// for instance cause schema changes to be unable to either proceed or to
// roll back.
//
// The threshold is ignored when zero.
//
if updateChunkSizeThresholdBytes > 0 && b.ApproximateMutationBytes() >= int(updateChunkSizeThresholdBytes) {
break
}
}
// Write the new row values.
writeBatch := txn.Run
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand All @@ -26,14 +27,19 @@ import (
)

func initColumnBackfillerSpec(
desc descpb.TableDescriptor, duration time.Duration, chunkSize int64, readAsOf hlc.Timestamp,
tbl catalog.TableDescriptor,
duration time.Duration,
chunkSize int64,
updateChunkSizeThresholdBytes uint64,
readAsOf hlc.Timestamp,
) (execinfrapb.BackfillerSpec, error) {
return execinfrapb.BackfillerSpec{
Table: desc,
Duration: duration,
ChunkSize: chunkSize,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
Table: *tbl.TableDesc(),
Duration: duration,
ChunkSize: chunkSize,
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
}, nil
}

8000 Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ message BackfillerSpec {
// of entries backfilled per chunk.
optional int64 chunk_size = 5 [(gogoproto.nullable) = false];

// The column backfiller will run an update batch immediately
// once its estimated byte size reaches UpdateChunkSizeThresholdBytes, if nonzero.
optional uint64 update_chunk_size_threshold_bytes = 14 [(gogoproto.nullable) = false];

// WriteAsOf is the time that the backfill entries should be written.
// Note: Older nodes may also use this as the read time instead of readAsOf.
optional util.hlc.Timestamp writeAsOf = 7 [(gogoproto.nullable) = false];
Expand All @@ -86,7 +90,7 @@ message BackfillerSpec {
// check MVCCAddSSTable before setting this option.
optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false];

// NEXTID: 14.
// NEXTID: 15.
}

// JobProgress identifies the job to report progress on. This reporting
Expand Down
70 changes: 65 additions & 5 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -2184,6 +2184,66 @@ SELECT * FROM multipleinstmt ORDER BY id ASC;
2 b b false NULL true NULL
3 c c false NULL true NULL

subtest column_backfiller_update_batching

let $use_decl_sc
SHOW use_declarative_schema_changer

statement ok
SET use_declarative_schema_changer = 'off';

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

# Check that the column backfiller batches all its Puts into one batch.
query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
1

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;

# Bring the threshold way down to force column backfiller batches to have no more 1 Put each.
statement ok
SET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes = 1;

statement ok
BEGIN;
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
SET tracing = on,kv;
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
SET tracing = off;

query I
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
----
10

query I
SELECT count(*) FROM tb WHERE v = 'abc';
----
10

statement ok
ROLLBACK;

# Undo subtest side effects.
statement ok
RESET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes;

statement ok
SET use_declarative_schema_changer = $use_decl_sc;

subtest storage_params

statement ok
Expand Down Expand Up @@ -2294,8 +2354,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
205 test_serial_b_seq PUBLIC 204
204 test_serial PUBLIC NULL
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL

statement ok
DROP TABLE test_serial;
Expand Down Expand Up @@ -2329,8 +2389,8 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
207 test_serial_b_seq PUBLIC 206
206 test_serial PUBLIC NULL
209 test_serial_b_seq PUBLIC 208
208 test_serial PUBLIC NULL

statement ok
ALTER TABLE test_serial DROP COLUMN b;
Expand All @@ -2345,7 +2405,7 @@ FROM (
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
----
table_id name state refobjid
206 test_serial PUBLIC NULL
208 test_serial PUBLIC NULL

statement ok
DROP TABLE test_serial;
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type chunkBackfiller interface {
ctx context.Context,
span roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
readAsOf hlc.Timestamp,
) (roachpb.Key, error)

Expand Down Expand Up @@ -135,6 +136,8 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
// fill more than this amount and cause a flush, then it likely also fills
// a non-trivial part of the next buffer.
const opportunisticCheckpointThreshold = 0.8
chunkSize := rowinfra.RowLimit(b.spec.ChunkSize)
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(b.spec.UpdateChunkSizeThresholdBytes)
start := timeutil.Now()
totalChunks := 0
totalSpans := 0
Expand All @@ -148,7 +151,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
for todo.Key != nil {
log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo)
var err error
todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf)
todo.Key, err = b.chunks.runChunk(ctx, todo, chunkSize, updateChunkSizeThresholdBytes, b.spec.ReadAsOf)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/rowexec/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 {

// runChunk implements the chunkBackfiller interface.
func (cb *columnBackfiller) runChunk(
ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp,
ctx context.Context,
sp roachpb.Span,
chunkSize rowinfra.RowLimit,
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
_ hlc.Timestamp,
) (roachpb.Key, error) {
var key roachpb.Key
var commitWaitFn func(context.Context) error
Expand All @@ -123,16 +127,16 @@ func (cb *columnBackfiller) runChunk(
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
updateChunkSizeThresholdBytes,
true, /*alsoCommit*/
cb.flowCtx.TraceKV,
)
return err
})
Expand Down
0