8000 Verify that Parallel CSV Reader skips lines mid-threads by pdet · Pull Request #7637 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
/ duckdb Public
< 8000 /div>

Verify that Parallel CSV Reader skips lines mid-threads #7637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data/csv/bug_7578.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
transaction_id team_id direction amount account_id transaction_date recorded_date tags.transaction_id tags.team_id tags
01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 1 3.91 39 2022-11-22 01:00:00 2023-04-06 16:07:26 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"}
01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 -1 3.91 41 2022-11-22 01:00:00 2023-04-06 16:07:26 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"}
27 changes: 11 additions & 16 deletions src/function/table/read_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ struct ParallelCSVGlobalState : public GlobalTableFunctionState {
//! Current File Number
idx_t file_number = 0;
idx_t max_tuple_end = 0;
//! the vector stores positions where threads ended the last line they read in the CSV File, and the set stores
//! positions where they started reading the first line.
//! The vector stores positions where threads ended the last line they read in the CSV File, and the set stores
//! Positions where they started reading the first line.
vector<vector<idx_t>> tuple_end;
vector<set<idx_t>> tuple_start;
//! Tuple end to batch
Expand Down Expand Up @@ -559,15 +559,13 @@ bool ParallelCSVGlobalState::Next(ClientContext &context, const ReadCSVData &bin
}
void ParallelCSVGlobalState::UpdateVerification(VerificationPositions positions, idx_t file_number_p, idx_t batch_idx) {
lock_guard<mutex> parallel_lock(main_mutex);
if (positions.beginning_of_first_line < positions.end_of_last_line) {
if (positions.end_of_last_line > max_tuple_end) {
max_tuple_end = positions.end_of_last_line;
}
tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx;
batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size();
tuple_start[file_number_p].insert(positions.beginning_of_first_line);
tuple_end[file_number_p].push_back(positions.end_of_last_line);
if (positions.end_of_last_line > max_tuple_end) {
max_tuple_end = positions.end_of_last_line;
}
tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx;
batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size();
tuple_start[file_number_p].insert(positions.beginning_of_first_line);
tuple_end[file_number_p].push_back(positions.end_of_last_line);
}

void ParallelCSVGlobalState::UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx) {
Expand Down Expand Up @@ -690,17 +688,14 @@ static void ParallelReadCSVFunction(ClientContext &context, TableFunctionInput &
}
if (csv_local_state.csv_reader->finished) {
auto verification_updates = csv_local_state.csv_reader->GetVerificationPositions();
if (verification_updates.beginning_of_first_line != verification_updates.end_of_last_line) {
csv_global_state.UpdateVerification(verification_updates,
csv_local_state.csv_reader->buffer->buffer->GetFileNumber(),
csv_local_state.csv_reader->buffer->local_batch_index);
}
csv_global_state.UpdateVerification(verification_updates,
csv_local_state.csv_reader->buffer->buffer->GetFileNumber(),
csv_local_state.csv_reader->buffer->local_batch_index);
csv_global_state.UpdateLinesRead(*csv_local_state.csv_reader->buffer, csv_local_state.csv_reader->file_idx);
auto has_next = csv_global_state.Next(context, bind_data, csv_local_state.csv_reader);
if (csv_local_state.csv_reader) {
csv_local_state.csv_reader->linenr = 0;
}

if (!has_next) {
csv_global_state.DecrementThread();
break;
Expand Down
14 changes: 14 additions & 0 deletions test/parallel_csv/test_parallel_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,23 @@ TEST_CASE("Test Parallel CSV All Files - data/csv", "[parallel-csv][.]") {
std::set<std::string> skip;
// This file is too big, executing on it is slow and unreliable
skip.insert("data/csv/sequences.csv.gz");
// This file requires specific parameters
skip.insert("data/csv/bug_7578.csv");
RunTestOnFolder("data/csv/", &skip);
}

//! Test case with specific parameters that allow us to run the bug_7578.csv we were skipping
TEST_CASE("Test Parallel CSV All Files - data/csv/bug_7578.csv", "[parallel-csv][.]") {
DuckDB db(nullptr);
Connection con(db);
string add_parameters = ", delim=\'\\t\', header=true, quote = \'`\', columns={ \'transaction_id\': \'VARCHAR\', "
"\'team_id\': \'INT\', \'direction\': \'INT\', \'amount\':\'DOUBLE\', "
"\'account_id\':\'INT\', \'transaction_date\':\'DATE\', \'recorded_date\':\'DATE\', "
"\'tags.transaction_id\':\'VARCHAR\', \'tags.team_id\':\'INT\', \'tags\':\'varchar\'}";
string file = "data/csv/bug_7578.csv";
REQUIRE(RunFull(file, con, nullptr, add_parameters));
}

TEST_CASE("Test Parallel CSV All Files - data/csv/decimal_separators", "[parallel-csv][.]") {
RunTestOnFolder("data/csv/decimal_separators/");
}
Expand Down
52 changes: 52 additions & 0 deletions test/sql/copy/csv/parallel/test_7578.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# name: test/sql/copy/csv/parallel/test_7578.test
# description: Test parallel read CSV function on sample file from #7578
# group: [parallel]

# force parallelism of the queries
statement ok
PRAGMA verify_parallelism

loop thr 1 8

statement ok
pragma threads=${thr}

query IIIIIIIIII
select *
from read_csv('data/csv/bug_7578.csv', delim='\t', header=true, quote = '`', columns={
'transaction_id': 'VARCHAR',
'team_id': 'INT',
'direction': 'INT',
'amount':'DOUBLE',
'account_id':'INT',
'transaction_date':'DATE',
'recorded_date':'DATE',
'tags.transaction_id':'VARCHAR',
'tags.team_id':'INT',
'tags':'varchar'
},
parallel=true) order by all
----
01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 -1 3.91 41 2022-11-22 2023-04-06 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"}
01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 1 3.91 39 2022-11-22 2023-04-06 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"}

endloop

statement ok
pragma threads=2

statement error
select *
from read_csv('data/csv/bug_7578.csv', delim='\t', header=true, columns={
'transaction_id': 'VARCHAR',
'team_id': 'INT',
'direction': 'INT',
'amount':'DOUBLE',
'account_id':'INT',
'transaction_date':'DATE',
'recorded_date':'DATE',
'tags.transaction_id':'VARCHAR',
'tags.team_id':'INT',
'tags':'varchar'
},
parallel=true)
0