diff --git a/data/csv/bug_7578.csv b/data/csv/bug_7578.csv new file mode 100644 index 000000000000..2da5b8b2bd5f --- /dev/null +++ b/data/csv/bug_7578.csv @@ -0,0 +1,3 @@ +transaction_id team_id direction amount account_id transaction_date recorded_date tags.transaction_id tags.team_id tags +01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 1 3.91 39 2022-11-22 01:00:00 2023-04-06 16:07:26 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"} +01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 -1 3.91 41 2022-11-22 01:00:00 2023-04-06 16:07:26 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"} diff --git a/src/function/table/read_csv.cpp b/src/function/table/read_csv.cpp index c458d487b9d6..4e208d114a61 100644 --- a/src/function/table/read_csv.cpp +++ b/src/function/table/read_csv.cpp @@ -378,8 +378,8 @@ struct ParallelCSVGlobalState : public GlobalTableFunctionState { //! Current File Number idx_t file_number = 0; idx_t max_tuple_end = 0; - //! the vector stores positions where threads ended the last line they read in the CSV File, and the set stores - //! positions where they started reading the first line. + //! The vector stores positions where threads ended the last line they read in the CSV File, and the set stores + //! Positions where they started reading the first line. vector> tuple_end; vector> tuple_start; //! Tuple end to batch @@ -559,15 +559,13 @@ bool ParallelCSVGlobalState::Next(ClientContext &context, const ReadCSVData &bin } void ParallelCSVGlobalState::UpdateVerification(VerificationPositions positions, idx_t file_number_p, idx_t batch_idx) { lock_guard parallel_lock(main_mutex); - if (positions.beginning_of_first_line < positions.end_of_last_line) { - if (positions.end_of_last_line > max_tuple_end) { - max_tuple_end = positions.end_of_last_line; - } - tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx; - batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size(); - tuple_start[file_number_p].insert(positions.beginning_of_first_line); - tuple_end[file_number_p].push_back(positions.end_of_last_line); + if (positions.end_of_last_line > max_tuple_end) { + max_tuple_end = positions.end_of_last_line; } + tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx; + batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size(); + tuple_start[file_number_p].insert(positions.beginning_of_first_line); + tuple_end[file_number_p].push_back(positions.end_of_last_line); } void ParallelCSVGlobalState::UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx) { @@ -690,17 +688,14 @@ static void ParallelReadCSVFunction(ClientContext &context, TableFunctionInput & } if (csv_local_state.csv_reader->finished) { auto verification_updates = csv_local_state.csv_reader->GetVerificationPositions(); - if (verification_updates.beginning_of_first_line != verification_updates.end_of_last_line) { - csv_global_state.UpdateVerification(verification_updates, - csv_local_state.csv_reader->buffer->buffer->GetFileNumber(), - csv_local_state.csv_reader->buffer->local_batch_index); - } + csv_global_state.UpdateVerification(verification_updates, + csv_local_state.csv_reader->buffer->buffer->GetFileNumber(), + csv_local_state.csv_reader->buffer->local_batch_index); csv_global_state.UpdateLinesRead(*csv_local_state.csv_reader->buffer, csv_local_state.csv_reader->file_idx); auto has_next = csv_global_state.Next(context, bind_data, csv_local_state.csv_reader); if (csv_local_state.csv_reader) { csv_local_state.csv_reader->linenr = 0; } - if (!has_next) { csv_global_state.DecrementThread(); break; diff --git a/test/parallel_csv/test_parallel_csv.cpp b/test/parallel_csv/test_parallel_csv.cpp index f8d5c37c00c9..6f6f0008c811 100644 --- a/test/parallel_csv/test_parallel_csv.cpp +++ b/test/parallel_csv/test_parallel_csv.cpp @@ -243,9 +243,23 @@ TEST_CASE("Test Parallel CSV All Files - data/csv", "[parallel-csv][.]") { std::set skip; // This file is too big, executing on it is slow and unreliable skip.insert("data/csv/sequences.csv.gz"); + // This file requires specific parameters + skip.insert("data/csv/bug_7578.csv"); RunTestOnFolder("data/csv/", &skip); } +//! Test case with specific parameters that allow us to run the bug_7578.csv we were skipping +TEST_CASE("Test Parallel CSV All Files - data/csv/bug_7578.csv", "[parallel-csv][.]") { + DuckDB db(nullptr); + Connection con(db); + string add_parameters = ", delim=\'\\t\', header=true, quote = \'`\', columns={ \'transaction_id\': \'VARCHAR\', " + "\'team_id\': \'INT\', \'direction\': \'INT\', \'amount\':\'DOUBLE\', " + "\'account_id\':\'INT\', \'transaction_date\':\'DATE\', \'recorded_date\':\'DATE\', " + "\'tags.transaction_id\':\'VARCHAR\', \'tags.team_id\':\'INT\', \'tags\':\'varchar\'}"; + string file = "data/csv/bug_7578.csv"; + REQUIRE(RunFull(file, con, nullptr, add_parameters)); +} + TEST_CASE("Test Parallel CSV All Files - data/csv/decimal_separators", "[parallel-csv][.]") { RunTestOnFolder("data/csv/decimal_separators/"); } diff --git a/test/sql/copy/csv/parallel/test_7578.test b/test/sql/copy/csv/parallel/test_7578.test new file mode 100644 index 000000000000..c0b11c5b3519 --- /dev/null +++ b/test/sql/copy/csv/parallel/test_7578.test @@ -0,0 +1,52 @@ +# name: test/sql/copy/csv/parallel/test_7578.test +# description: Test parallel read CSV function on sample file from #7578 +# group: [parallel] + +# force parallelism of the queries +statement ok +PRAGMA verify_parallelism + +loop thr 1 8 + +statement ok +pragma threads=${thr} + +query IIIIIIIIII +select * +from read_csv('data/csv/bug_7578.csv', delim='\t', header=true, quote = '`', columns={ + 'transaction_id': 'VARCHAR', + 'team_id': 'INT', + 'direction': 'INT', + 'amount':'DOUBLE', + 'account_id':'INT', + 'transaction_date':'DATE', + 'recorded_date':'DATE', + 'tags.transaction_id':'VARCHAR', + 'tags.team_id':'INT', + 'tags':'varchar' +}, +parallel=true) order by all +---- +01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 -1 3.91 41 2022-11-22 2023-04-06 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"} +01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 1 3.91 39 2022-11-22 2023-04-06 01GXBE9CMV5R0Q4TEXZ4XFXPQR 58 {"_journalize_rule": "four score and seven years ago our fathers brought forth upon this continent a new nation conceived in liberty and dedicated to the proposition that all men are created equal. now we are engaged in a great civil war", "transaction_match_id": "1234567", "transaction id": "1234456", "job id": "", "transacting party": "customer", "user id": "1278729", "state": "NY", "event type": "Sale", "product name": "acme", "event description": "", "related transaction id": "11234813", "transaction created at": "2022-11-11T01:12:31", "created on": "2022-11-11T01:12:31", "last modified on": "2022-11-14T09:03:57", "base amount": "48.0000", "tax": "4.120000", "stripe payment id": "ch_1234567889", "stripe payment amount": "54.1100", "stripe payment date": "2022-11-14T09:03:55", "payment fail date": "", "write-off id": "", "write-off date": "", "write-off amount": "", "amount due": "0.0000"} + +endloop + +statement ok +pragma threads=2 + +statement error +select * +from read_csv('data/csv/bug_7578.csv', delim='\t', header=true, columns={ + 'transaction_id': 'VARCHAR', + 'team_id': 'INT', + 'direction': 'INT', + 'amount':'DOUBLE', + 'account_id':'INT', + 'transaction_date':'DATE', + 'recorded_date':'DATE', + 'tags.transaction_id':'VARCHAR', + 'tags.team_id':'INT', + 'tags':'varchar' +}, +parallel=true)