diff --git a/Cargo.lock b/Cargo.lock index 4b744f9c1f..061c4b3c81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1081,22 +1081,24 @@ dependencies = [ [[package]] name = "diesel" -version = "1.4.8" -source = "git+https://github.com/mobilecoinofficial/diesel?rev=026f6379715d27c8be48396e5ca9059f4a263198#026f6379715d27c8be48396e5ca9059f4a263198" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4391a22b19c916e50bec4d6140f29bdda3e3bb187223fe6e3ea0b6e4d1021c04" dependencies = [ "bitflags 1.3.2", "byteorder", "chrono", "diesel_derives", + "itoa 1.0.1", "pq-sys", "r2d2", ] [[package]] name = "diesel-derive-enum" -version = "1.1.2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8910921b014e2af16298f006de12aa08af894b71f0f49a486ab6d74b17bbed" +checksum = "6b10c03b954333d05bfd5be1d8a74eae2c9ca77b86e0f1c3a1ea29c49da1d6c2" dependencies = [ "heck", "proc-macro2", @@ -1106,10 +1108,11 @@ dependencies = [ [[package]] name = "diesel_derives" -version = "1.4.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" +checksum = "0ad74fdcf086be3d4fdd142f67937678fe60ed431c3b2f08599e7687269410c4" dependencies = [ + "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", @@ -1117,10 +1120,11 @@ dependencies = [ [[package]] name = "diesel_migrations" -version = "1.4.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c" +checksum = "e9ae22beef5e9d6fab9225ddb073c1c6c1a7a6ded5019d5da11d1e5c5adc34e2" dependencies = [ + "diesel", "migrations_internals", "migrations_macros", ] @@ -5859,23 +5863,23 @@ dependencies = [ [[package]] name = "migrations_internals" -version = "1.4.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b4fc84e4af020b837029e017966f86a1c2d5e83e64b589963d5047525995860" +checksum = "c493c09323068c01e54c685f7da41a9ccf9219735c3766fbfd6099806ea08fbc" dependencies = [ - "diesel", + "serde", + "toml 0.5.11", ] [[package]] name = "migrations_macros" -version = "1.4.2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c" +checksum = "8a8ff27a350511de30cdabb77147501c36ef02e0451d957abea2f30caffb2b58" dependencies = [ "migrations_internals", "proc-macro2", "quote", - "syn 1.0.109", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6a708bc76e..a267625fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -242,8 +242,5 @@ schnorrkel-og = { git = "https://github.com/mobilecoinfoundation/schnorrkel.git" # See: https://github.com/pyfisch/cbor/pull/198 serde_cbor = { git = "https://github.com/mobilecoinofficial/cbor", rev = "4c886a7c1d523aae1ec4aa7386f402cb2f4341b5" } -# Override diesel dependency with our fork, to statically link SQLite. -diesel = { git = "https://github.com/mobilecoinofficial/diesel", rev = "026f6379715d27c8be48396e5ca9059f4a263198" } - # Fix issues with recent nightlies, bump curve25519-dalek version x25519-dalek = { git = "https://github.com/mobilecoinfoundation/x25519-dalek.git", rev = "4fbaa3343301c62cfdbc3023c9f485257e6b718a" } diff --git a/fog/sql_recovery_db/Cargo.toml b/fog/sql_recovery_db/Cargo.toml index 6742f0e0d3..10c1755a27 100644 --- a/fog/sql_recovery_db/Cargo.toml +++ b/fog/sql_recovery_db/Cargo.toml @@ -34,9 +34,9 @@ mc-fog-types = { path = "../types" } chrono = "0.4" clap = { version = "4.1", features = ["derive", "env"] } -diesel = { version = "1.4", features = ["chrono", "postgres", "r2d2"] } -diesel-derive-enum = { version = "1", features = ["postgres"] } -diesel_migrations = { version = "1.4", features = ["postgres"] } +diesel = { version = "2.0.3", features = ["chrono", "postgres", "r2d2"] } +diesel-derive-enum = { version = "2.0.1", features = ["postgres"] } +diesel_migrations = { version = "2.0.0", features = ["postgres"] } displaydoc = { version = "0.2", default-features = false } prost = "0.11" r2d2 = "0.8.10" @@ -56,7 +56,3 @@ mc-util-test-helper = { path = "../../util/test-helper" } pem = "2.0" rand = "0.8" - -[build-dependencies] -# clippy fails to run without this. -diesel = { version = "1.4.8", features = ["chrono", "postgres", "r2d2"] } diff --git a/fog/sql_recovery_db/src/bin/fog_sql_recovery_db_migrations.rs b/fog/sql_recovery_db/src/bin/fog_sql_recovery_db_migrations.rs index eacb22c44a..f01e8a8255 100644 --- a/fog/sql_recovery_db/src/bin/fog_sql_recovery_db_migrations.rs +++ b/fog/sql_recovery_db/src/bin/fog_sql_recovery_db_migrations.rs @@ -3,22 +3,20 @@ //! A helper utility for running migrations on a database configured via //! DATABASE_URL. -#[macro_use] -extern crate diesel_migrations; - use diesel::{prelude::*, PgConnection}; -use diesel_migrations::embed_migrations; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use std::env; -embed_migrations!("migrations/"); +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); fn main() { let database_url = env::var("DATABASE_URL").expect("Missing DATABASE_URL environment variable"); - let conn = PgConnection::establish(&database_url) + let conn = &mut PgConnection::establish(&database_url) .expect("fog-sql-recovery-db-migrations cannot connect to PG database"); - embedded_migrations::run(&conn).expect("Failed running migrations"); + conn.run_pending_migrations(MIGRATIONS) + .expect("Failed running migrations"); println!("Done migrating Fog recovery DB!"); } diff --git a/fog/sql_recovery_db/src/lib.rs b/fog/sql_recovery_db/src/lib.rs index 1d927096e5..245575cb0c 100644 --- a/fog/sql_recovery_db/src/lib.rs +++ b/fog/sql_recovery_db/src/lib.rs @@ -5,7 +5,6 @@ #[macro_use] extern crate diesel; -#[macro_use] extern crate diesel_migrations; pub use error::Error; @@ -162,7 +161,7 @@ impl SqlRecoveryDb { /// Mark a given ingest invocation as decommissioned. fn decommission_ingest_invocation_impl( &self, - conn: &PgConnection, + conn: &mut PgConnection, ingest_invocation_id: &IngestInvocationId, ) -> Result<(), Error> { // Mark the ingest invocation as decommissioned. @@ -172,7 +171,7 @@ impl SqlRecoveryDb { ) .set(( schema::ingest_invocations::dsl::decommissioned.eq(true), - schema::ingest_invocations::dsl::last_active_at.eq(diesel::expression::dsl::now), + schema::ingest_invocations::dsl::last_active_at.eq(diesel::dsl::now), )) .execute(conn)?; @@ -190,21 +189,24 @@ impl SqlRecoveryDb { /// Mark a given ingest invocation as still being alive. fn update_last_active_at_impl( &self, - conn: &PgConnection, + conn: &mut PgConnection, ingest_invocation_id: &IngestInvocationId, ) -> Result<(), Error> { diesel::update( schema::ingest_invocations::dsl::ingest_invocations .filter(schema::ingest_invocations::dsl::id.eq(**ingest_invocation_id)), ) - .set(schema::ingest_invocations::dsl::last_active_at.eq(diesel::expression::dsl::now)) + .set(schema::ingest_invocations::dsl::last_active_at.eq(diesel::dsl::now)) .execute(conn)?; Ok(()) } /// Get missed block ranges. - fn get_missed_block_ranges_impl(&self, conn: &PgConnection) -> Result, Error> { + fn get_missed_block_ranges_impl( + &self, + conn: &mut PgConnection, + ) -> Result, Error> { let query = schema::user_events::dsl::user_events .filter(schema::user_events::dsl::event_type.eq(UserEventType::MissingBlocks)) .select(( @@ -231,7 +233,7 @@ impl SqlRecoveryDb { fn get_ingress_key_status_impl( &self, - conn: &PgConnection, + conn: &mut PgConnection, key: &CompressedRistrettoPublic, ) -> Result, Error> { let key_bytes: &[u8] = key.as_ref(); @@ -256,7 +258,7 @@ impl SqlRecoveryDb { } } - fn get_highest_known_block_index_impl(conn: &PgConnection) -> Result, Error> { + fn get_highest_known_block_index_impl(conn: &mut PgConnection) -> Result, Error> { Ok(schema::ingested_blocks::dsl::ingested_blocks .select(diesel::dsl::max(schema::ingested_blocks::dsl::block_number)) .first::>(conn)? @@ -265,7 +267,7 @@ impl SqlRecoveryDb { fn get_expired_invocations_impl( &self, - conn: &PgConnection, + conn: &mut PgConnection, expiration: NaiveDateTime, ) -> Result, Error> { use schema::ingest_invocations::dsl; @@ -310,8 +312,8 @@ impl SqlRecoveryDb { &self, key: &CompressedRistrettoPublic, ) -> Result, Error> { - let conn = self.pool.get()?; - self.get_ingress_key_status_impl(&conn, key) + let conn = &mut self.pool.get()?; + self.get_ingress_key_status_impl(conn, key) } fn new_ingress_key_retriable( @@ -319,12 +321,12 @@ impl SqlRecoveryDb { key: &CompressedRistrettoPublic, start_block_count: u64, ) -> Result { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; conn.build_transaction() .read_write() - .run(|| -> Result { + .run(|conn| -> Result { let highest_known_block_count: u64 = - SqlRecoveryDb::get_highest_known_block_index_impl(&conn)? + SqlRecoveryDb::get_highest_known_block_index_impl(conn)? .map(|index| index + 1) .unwrap_or(0); @@ -340,7 +342,7 @@ impl SqlRecoveryDb { let inserted_row_count = diesel::insert_into(schema::ingress_keys::table) .values(&obj) .on_conflict_do_nothing() - .execute(&conn)?; + .execute(conn)?; if inserted_row_count > 0 { Ok(accepted_start_block_count) @@ -359,11 +361,11 @@ impl SqlRecoveryDb { ) -> Result<(), Error> { let key_bytes: &[u8] = key.as_ref(); - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; use schema::ingress_keys::dsl; diesel::update(dsl::ingress_keys.filter(dsl::ingress_public_key.eq(key_bytes))) .set(dsl::retired.eq(set_retired)) - .execute(&conn)?; + .execute(conn)?; Ok(()) } @@ -373,13 +375,13 @@ impl SqlRecoveryDb { ) -> Result, Error> { let key_bytes: &[u8] = key.as_ref(); - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; use schema::ingested_blocks::dsl; let maybe_index: Option = dsl::ingested_blocks .filter(dsl::ingress_public_key.eq(key_bytes)) .select(diesel::dsl::max(dsl::block_number)) - .first(&conn)?; + .first(conn)?; Ok(maybe_index.map(|val| val as u64)) } @@ -389,7 +391,7 @@ impl SqlRecoveryDb { start_block_at_least: u64, ingress_public_key_record_filters: &IngressPublicKeyRecordFilters, ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; use schema::ingress_keys::dsl; let last_scanned_block = diesel::dsl::sql::( @@ -433,7 +435,7 @@ impl SqlRecoveryDb { bool, bool, Option, - )>(&conn)? + )>(conn)? .into_iter() .map( |( @@ -468,15 +470,15 @@ impl SqlRecoveryDb { egress_public_key: &KexRngPubkey, start_block: u64, ) -> Result { - let conn = self.pool.get()?; - conn.build_transaction().read_write().run(|| { + let conn = &mut self.pool.get()?; + conn.build_transaction().read_write().run(|conn| { // Optionally decommission old invocation. if let Some(prev_ingest_invocation_id) = prev_ingest_invocation_id { - self.decommission_ingest_invocation_impl(&conn, &prev_ingest_invocation_id)?; + self.decommission_ingest_invocation_impl(conn, &prev_ingest_invocation_id)?; } // Write new invocation. - let now = diesel::select(diesel::dsl::now).get_result::(&conn)?; + let now = diesel::select(diesel::dsl::now).get_result::(conn)?; let obj = models::NewIngestInvocation { ingress_public_key: (*ingress_public_key).into(), @@ -490,14 +492,14 @@ impl SqlRecoveryDb { let inserted_obj: models::IngestInvocation = diesel::insert_into(schema::ingest_invocations::table) .values(&obj) - .get_result(&conn)?; + .get_result(conn)?; // Write a user event. let new_event = models::NewUserEvent::new_ingest_invocation(inserted_obj.id); diesel::insert_into(schema::user_events::table) .values(&new_event) - .execute(&conn)?; + .execute(conn)?; // Success. Ok(IngestInvocationId::from(inserted_obj.id)) @@ -507,7 +509,7 @@ impl SqlRecoveryDb { fn get_ingestable_ranges_retriable( &self, ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; // For each ingest invocation we are aware of get its id, start block, is // decommissioned and the max block number it has ingested (if @@ -524,7 +526,7 @@ impl SqlRecoveryDb { .order_by(schema::ingest_invocations::dsl::id); // The list of fields here must match the .select() clause above. - let data = query.load::<(i64, i64, bool, Option)>(&conn)?; + let data = query.load::<(i64, i64, bool, Option)>(conn)?; Ok(data .into_iter() .map(|row| { @@ -551,11 +553,11 @@ impl SqlRecoveryDb { &self, ingest_invocation_id: &IngestInvocationId, ) -> Result<(), Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; conn.build_transaction() .read_write() - .run(|| self.decommission_ingest_invocation_impl(&conn, ingest_invocation_id)) + .run(|conn| self.decommission_ingest_invocation_impl(conn, ingest_invocation_id)) } fn add_block_data_retriable( @@ -565,12 +567,12 @@ impl SqlRecoveryDb { block_signature_timestamp: u64, txs: &[mc_fog_types::ETxOutRecord], ) -> Result { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; match conn .build_transaction() .read_write() - .run(|| -> Result<(), Error> { + .run(|conn| -> Result<(), Error> { // Get ingress pubkey of this ingest invocation id, which is also stored in the // ingested_block record // @@ -581,7 +583,7 @@ impl SqlRecoveryDb { let ingress_key_bytes: Vec = schema::ingest_invocations::table .filter(schema::ingest_invocations::dsl::id.eq(**ingest_invocation_id)) .select(schema::ingest_invocations::ingress_public_key) - .first(&conn)?; + .first(conn)?; // Get bytes of encoded proto ingested block data let proto_bytes = { @@ -606,10 +608,10 @@ impl SqlRecoveryDb { diesel::insert_into(schema::ingested_blocks::table) .values(&new_ingested_block) - .execute(&conn)?; + .execute(conn)?; // Update last active at. - self.update_last_active_at_impl(&conn, ingest_invocation_id)?; + self.update_last_active_at_impl(conn, ingest_invocation_id)?; // Success. Ok(()) @@ -638,16 +640,16 @@ impl SqlRecoveryDb { &self, lost_ingress_key: CompressedRistrettoPublic, ) -> Result<(), Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; - conn.build_transaction().read_write().run(|| { + conn.build_transaction().read_write().run(|conn| { // Find the ingress key and update it to be marked lost let key_bytes: &[u8] = lost_ingress_key.as_ref(); use schema::ingress_keys::dsl; let key_records: Vec = diesel::update(dsl::ingress_keys.filter(dsl::ingress_public_key.eq(key_bytes))) .set(dsl::lost.eq(true)) - .get_results(&conn)?; + .get_results(conn)?; // Compute a missed block range based on looking at the key status, // which is correct if no blocks have actually been scanned using the key. @@ -671,7 +673,7 @@ impl SqlRecoveryDb { dsl::ingested_blocks .filter(dsl::ingress_public_key.eq(key_bytes)) .select(diesel::dsl::max(dsl::block_number)) - .first(&conn)? + .first(conn)? }; if let Some(block_index) = maybe_block_index { @@ -699,15 +701,15 @@ impl SqlRecoveryDb { diesel::insert_into(schema::user_events::table) .values(&new_event) - .execute(&conn)?; + .execute(conn)?; Ok(()) }) } fn get_missed_block_ranges_retriable(&self) -> Result, Error> { - let conn = self.pool.get()?; - self.get_missed_block_ranges_impl(&conn) + let conn = &mut self.pool.get()?; + self.get_missed_block_ranges_impl(conn) } fn search_user_events_retriable( @@ -719,7 +721,7 @@ impl SqlRecoveryDb { return Ok((Default::default(), i64::MAX)); } - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let mut events: Vec<(i64, FogUserEvent)> = Vec::new(); // Collect all events of interest @@ -771,7 +773,7 @@ impl SqlRecoveryDb { // For MissingBlocks events Option, // user_events.missing_blocks_start Option, // user_events.missing_blocks_end - )>(&conn)?; + )>(conn)?; // If no events are found, return start_from_user_event_id and not 0 let mut max_user_event_id = start_from_user_event_id; @@ -889,14 +891,14 @@ impl SqlRecoveryDb { start_block: u64, search_keys: &[Vec], ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let query = schema::ingested_blocks::dsl::ingested_blocks .filter(schema::ingested_blocks::dsl::block_number.ge(start_block as i64)) .select(schema::ingested_blocks::dsl::proto_ingested_block_data); let mut search_key_to_payload = HashMap::, Vec>::default(); - for proto_bytes in query.load::>(&conn)? { + for proto_bytes in query.load::>(conn)? { let proto = ProtoIngestedBlockData::decode(&*proto_bytes)?; for e_tx_out_record in proto.e_tx_out_records { search_key_to_payload.insert(e_tx_out_record.search_key, e_tx_out_record.payload); @@ -928,8 +930,8 @@ impl SqlRecoveryDb { &self, ingest_invocation_id: &IngestInvocationId, ) -> Result<(), Error> { - let conn = self.pool.get()?; - self.update_last_active_at_impl(&conn, ingest_invocation_id) + let conn = &mut self.pool.get()?; + self.update_last_active_at_impl(conn, ingest_invocation_id) } /// Get any ETxOutRecords produced by a given ingress key for a given @@ -947,7 +949,7 @@ impl SqlRecoveryDb { ingress_key: CompressedRistrettoPublic, block_index: u64, ) -> Result>, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let key_bytes: &[u8] = ingress_key.as_ref(); let query = schema::ingested_blocks::dsl::ingested_blocks @@ -957,7 +959,7 @@ impl SqlRecoveryDb { // The result of load should be 0 or 1, since there is a database constraint // around ingress keys and block indices - let protos: Vec> = query.load::>(&conn)?; + let protos: Vec> = query.load::>(conn)?; if protos.is_empty() { Ok(None) @@ -988,7 +990,7 @@ impl SqlRecoveryDb { block_index: u64, block_count: usize, ) -> Result>, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; // The idea is: // Similar to get_tx_outs_by_block_and_key_retriable, but now @@ -1009,7 +1011,7 @@ impl SqlRecoveryDb { }; // We will get one row for each hit in the table we found - let rows: Vec<(i64, Vec)> = query.load(&conn)?; + let rows: Vec<(i64, Vec)> = query.load(conn)?; if rows.len() > block_count { log::warn!( @@ -1048,7 +1050,7 @@ impl SqlRecoveryDb { ingress_key: CompressedRistrettoPublic, block_index: u64, ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let key_bytes: &[u8] = ingress_key.as_ref(); let query = schema::ingested_blocks::dsl::ingested_blocks @@ -1058,7 +1060,7 @@ impl SqlRecoveryDb { // The result of load should be 0 or 1, since there is a database constraint // around ingress keys and block indices - let iids: Vec = query.load::(&conn)?; + let iids: Vec = query.load::(conn)?; if iids.is_empty() { Ok(None) @@ -1082,13 +1084,13 @@ impl SqlRecoveryDb { &self, block_index: u64, ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let query = schema::ingested_blocks::dsl::ingested_blocks .filter(schema::ingested_blocks::dsl::block_number.eq(block_index as i64)) .select(schema::ingested_blocks::dsl::cumulative_txo_count); - let data = query.load::(&conn)?; + let data = query.load::(conn)?; if data.is_empty() { Ok(None) } else { @@ -1118,20 +1120,20 @@ impl SqlRecoveryDb { &self, block_index: u64, ) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let query = schema::ingested_blocks::dsl::ingested_blocks .filter(schema::ingested_blocks::dsl::block_number.eq(block_index as i64)) .select(schema::ingested_blocks::dsl::block_signature_timestamp); - let data = query.load::(&conn)?; + let data = query.load::(conn)?; Ok(data.first().map(|val| *val as u64)) } /// Get the highest block index for which we have any data at all. fn get_highest_known_block_index_retriable(&self) -> Result, Error> { - let conn = self.pool.get()?; - SqlRecoveryDb::get_highest_known_block_index_impl(&conn) + let conn = &mut self.pool.get()?; + SqlRecoveryDb::get_highest_known_block_index_impl(conn) } //// @@ -1140,7 +1142,7 @@ impl SqlRecoveryDb { //// fn get_all_reports_retriable(&self) -> Result, Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; let query = schema::reports::dsl::reports .select(( @@ -1152,7 +1154,7 @@ impl SqlRecoveryDb { .order_by(schema::reports::dsl::id); query - .load::<(Option, String, Vec, i64)>(&conn)? + .load::<(Option, String, Vec, i64)>(conn)? .into_iter() .map(|(ingest_invocation_id, report_id, report, pubkey_expiry)| { let report = VerificationReport::decode(&*report)?; @@ -1175,11 +1177,11 @@ impl SqlRecoveryDb { report_id: &str, data: &ReportData, ) -> Result { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; conn.build_transaction() .read_write() - .run(|| -> Result { + .run(|conn| -> Result { // First, try to update the pubkey_expiry value on this ingress key, only // allowing it to increase, and only if it is not retired let result: IngressPublicKeyStatus = { @@ -1193,7 +1195,7 @@ impl SqlRecoveryDb { .filter(dsl::pubkey_expiry.lt(data.pubkey_expiry as i64)), ) .set(dsl::pubkey_expiry.eq(data.pubkey_expiry as i64)) - .get_results(&conn)?; + .get_results(conn)?; if key_records.is_empty() { // If the result is empty, the key might not exist, or it might have had a @@ -1201,7 +1203,7 @@ impl SqlRecoveryDb { // so we need to make another query to find which is the case log::info!(self.logger, "update was a no-op"); let maybe_key_status = - self.get_ingress_key_status_impl(&conn, ingress_key)?; + self.get_ingress_key_status_impl(conn, ingress_key)?; log::info!(self.logger, "check ingress key passed"); maybe_key_status.ok_or(Error::MissingIngressKey(*ingress_key))? } else if key_records.len() > 1 { @@ -1245,18 +1247,18 @@ impl SqlRecoveryDb { schema::reports::dsl::report.eq(report_bytes.clone()), schema::reports::dsl::pubkey_expiry.eq(report.pubkey_expiry), )) - .execute(&conn)?; + .execute(conn)?; Ok(result) }) } /// Remove report data associated with a given report id. fn remove_report_retriable(&self, report_id: &str) -> Result<(), Error> { - let conn = self.pool.get()?; + let conn = &mut self.pool.get()?; diesel::delete( schema::reports::dsl::reports.filter(schema::reports::dsl::fog_report_id.eq(report_id)), ) - .execute(&conn)?; + .execute(conn)?; Ok(()) } @@ -1264,8 +1266,8 @@ impl SqlRecoveryDb { &self, expiration: NaiveDateTime, ) -> Result, Error> { - let conn = self.pool.get()?; - self.get_expired_invocations_impl(&conn, expiration) + let conn = &mut self.pool.get()?; + self.get_expired_invocations_impl(conn, expiration) } } @@ -1661,11 +1663,11 @@ mod tests { assert_ne!(invoc_id1, invoc_id2); // Both ingest invocations should appear in the ingest_invocations table - let conn = db_test_context.new_conn(); + let conn = &mut db_test_context.new_conn(); let ingest_invocations: Vec = schema::ingest_invocations::dsl::ingest_invocations .order_by(schema::ingest_invocations::dsl::id) - .load(&conn) + .load(conn) .expect("failed getting ingest invocations"); assert_eq!(ingest_invocations.len(), 2); @@ -1927,7 +1929,7 @@ mod tests { let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]); let db_test_context = test_utils::SqlRecoveryDbTestContext::new(logger); let db = db_test_context.get_db_instance(); - let conn = db_test_context.new_conn(); + let conn = &mut db_test_context.new_conn(); let ingress_key = CompressedRistrettoPublic::from(RistrettoPublic::from_random(&mut rng)); db.new_ingress_key(&ingress_key, 20).unwrap(); @@ -1952,7 +1954,7 @@ mod tests { schema::ingest_invocations::dsl::ingest_invocations .select(schema::ingest_invocations::dsl::last_active_at) .order_by(schema::ingest_invocations::dsl::id) - .load(&conn) + .load(conn) .unwrap(); let mut invoc1_orig_last_active_at = invocs_last_active_at[0]; let invoc2_orig_last_active_at = invocs_last_active_at[1]; @@ -1967,7 +1969,7 @@ mod tests { let blocks: Vec = schema::ingested_blocks::dsl::ingested_blocks .order_by(schema::ingested_blocks::dsl::id) - .load(&conn) + .load(conn) .unwrap(); assert_eq!(blocks.len(), 1); assert_eq!( @@ -1996,7 +1998,7 @@ mod tests { schema::ingest_invocations::dsl::ingest_invocations .select(schema::ingest_invocations::dsl::last_active_at) .order_by(schema::ingest_invocations::dsl::id) - .load(&conn) + .load(conn) .unwrap(); assert!(invocs_last_active_at[0] > invoc1_orig_last_active_at); assert_eq!(invocs_last_active_at[1], invoc2_orig_last_active_at); @@ -2027,7 +2029,7 @@ mod tests { schema::ingest_invocations::dsl::ingest_invocations .select(schema::ingest_invocations::dsl::last_active_at) .order_by(schema::ingest_invocations::dsl::id) - .load(&conn) + .load(conn) .unwrap(); assert_eq!(invocs_last_active_at[0], invoc1_orig_last_active_at); assert_eq!(invocs_last_active_at[1], invoc2_orig_last_active_at); @@ -2045,7 +2047,7 @@ mod tests { let blocks: Vec = schema::ingested_blocks::dsl::ingested_blocks .order_by(schema::ingested_blocks::dsl::id) - .load(&conn) + .load(conn) .unwrap(); assert_eq!(blocks.len(), 2); assert_eq!( @@ -2085,7 +2087,7 @@ mod tests { schema::ingest_invocations::dsl::ingest_invocations .select(schema::ingest_invocations::dsl::last_active_at) .order_by(schema::ingest_invocations::dsl::id) - .load(&conn) + .load(conn) .unwrap(); assert_eq!(invocs_last_active_at[0], invoc1_orig_last_active_at); assert!(invocs_last_active_at[1] > invoc2_orig_last_active_at); diff --git a/fog/sql_recovery_db/src/models.rs b/fog/sql_recovery_db/src/models.rs index 05d82bfbcd..125d42a7da 100644 --- a/fog/sql_recovery_db/src/models.rs +++ b/fog/sql_recovery_db/src/models.rs @@ -14,7 +14,7 @@ pub struct IngressKey { } #[derive(Debug, Insertable)] -#[table_name = "ingress_keys"] +#[diesel(table_name = ingress_keys)] pub struct NewIngressKey { pub ingress_public_key: Vec, pub start_block: i64, @@ -35,7 +35,7 @@ pub struct IngestInvocation { } #[derive(Debug, Insertable)] -#[table_name = "ingest_invocations"] +#[diesel(table_name = ingest_invocations)] pub struct NewIngestInvocation { pub ingress_public_key: Vec, pub egress_public_key: Vec, @@ -57,7 +57,7 @@ pub struct IngestedBlock { } #[derive(Debug, Insertable)] -#[table_name = "ingested_blocks"] +#[diesel(table_name = ingested_blocks)] pub struct NewIngestedBlock { pub ingress_public_key: Vec, pub ingest_invocation_id: i64, @@ -68,7 +68,7 @@ pub struct NewIngestedBlock { } #[derive(Debug, Insertable)] -#[table_name = "user_events"] +#[diesel(table_name = user_events)] pub struct NewUserEvent { pub event_type: UserEventType, pub new_ingest_invocation_id: Option, @@ -112,7 +112,7 @@ impl NewUserEvent { } #[derive(Debug, Insertable)] -#[table_name = "reports"] +#[diesel(table_name = reports)] pub struct NewReport<'a> { pub ingress_public_key: &'a [u8], pub ingest_invocation_id: Option, diff --git a/fog/sql_recovery_db/src/sql_types.rs b/fog/sql_recovery_db/src/sql_types.rs index fbcdb1caf7..c280de3cad 100644 --- a/fog/sql_recovery_db/src/sql_types.rs +++ b/fog/sql_recovery_db/src/sql_types.rs @@ -1,15 +1,13 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation use diesel::{ - backend::Backend, deserialize::{self, FromSql}, - pg::Pg, + pg::{Pg, PgValue}, serialize::{self, Output, ToSql}, }; use diesel_derive_enum::DbEnum; use mc_crypto_keys::CompressedRistrettoPublic; -use mc_util_repr_bytes::ReprBytes; -use std::{fmt, io::Write, ops::Deref}; +use std::{fmt, ops::Deref}; #[derive(Debug, PartialEq, DbEnum)] #[DieselType = "User_event_type"] @@ -20,7 +18,7 @@ pub enum UserEventType { } #[derive(AsExpression, FromSqlRow, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] -#[sql_type = "diesel::sql_types::Binary"] +#[diesel(sql_type = diesel::sql_types::Binary)] pub struct SqlCompressedRistrettoPublic(CompressedRistrettoPublic); impl Deref for SqlCompressedRistrettoPublic { @@ -49,13 +47,13 @@ impl fmt::Display for SqlCompressedRistrettoPublic { } } -impl> FromSql - for SqlCompressedRistrettoPublic -{ - fn from_sql(bytes: Option<&DB::RawValue>) -> deserialize::Result { - let vec = as FromSql>::from_sql(bytes)?; +impl FromSql for SqlCompressedRistrettoPublic { + fn from_sql(value: PgValue) -> deserialize::Result { + let vec = >::from_sql(value)?; if vec.len() != 32 { - return Err("SqlCompressedRistrettoPublic: Invalid array length".into()); + return Err("SqlCompressedRistrettoPublic: Invalid array + length" + .into()); } let mut key = [0; 32]; @@ -69,7 +67,10 @@ impl> FromSql } impl ToSql for SqlCompressedRistrettoPublic { - fn to_sql(&self, out: &mut Output) -> serialize::Result { - as ToSql>::to_sql(&self.0.to_bytes().to_vec(), out) + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + as ToSql>::to_sql( + &self.0.as_bytes().to_vec(), + &mut out.reborrow(), + ) } } diff --git a/fog/sql_recovery_db/src/test_utils.rs b/fog/sql_recovery_db/src/test_utils.rs index aa55329097..bed5f1813f 100644 --- a/fog/sql_recovery_db/src/test_utils.rs +++ b/fog/sql_recovery_db/src/test_utils.rs @@ -4,7 +4,7 @@ use crate::{SqlRecoveryDb, SqlRecoveryDbConnectionConfig}; use diesel::{prelude::*, PgConnection}; -use diesel_migrations::embed_migrations; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use mc_common::logger::{log, Logger}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use retry::{ @@ -13,8 +13,7 @@ use retry::{ }; use std::time::Duration; -embed_migrations!("migrations/"); - +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); const DB_CONNECTION_SLEEP_PERIOD: Duration = Duration::from_secs(3); const TOTAL_RETRY_COUNT: usize = 5; @@ -44,19 +43,20 @@ impl SqlRecoveryDbTestContext { // database. let postgres_url = format!("{base_url}/postgres"); log::info!(&logger, "Connecting to root PG DB {}", postgres_url); - let conn = SqlRecoveryDbTestContext::establish_connection(&postgres_url); + let mut conn = SqlRecoveryDbTestContext::establish_connection(&postgres_url); // Create a new database for the test let query = diesel::sql_query(format!("CREATE DATABASE {db_name};").as_str()); let _ = query - .execute(&conn) + .execute(&mut conn) .unwrap_or_else(|err| panic!("Could not create database {db_name}: {err:?}")); // Now we can connect to the database and run the migrations let db_url = format!("{base_url}/{db_name}"); log::info!(&logger, "Connecting to newly created PG DB '{}'", db_url); - let conn = SqlRecoveryDbTestContext::establish_connection(&db_url); - embedded_migrations::run(&conn).expect("failed running migrations"); + let conn = &mut SqlRecoveryDbTestContext::establish_connection(&db_url); + conn.run_pending_migrations(MIGRATIONS) + .expect("failed running migrations"); // Success Self { @@ -107,7 +107,7 @@ impl SqlRecoveryDbTestContext { impl Drop for SqlRecoveryDbTestContext { fn drop(&mut self) { let postgres_url = format!("{}/postgres", self.base_url); - let conn = + let mut conn = PgConnection::establish(&postgres_url).expect("Cannot connect to postgres database."); let disconnect_users = format!( @@ -116,12 +116,12 @@ impl Drop for SqlRecoveryDbTestContext { ); diesel::sql_query(disconnect_users.as_str()) - .execute(&conn) + .execute(&mut conn) .unwrap(); let query = diesel::sql_query(format!("DROP DATABASE {};", self.db_name).as_str()); query - .execute(&conn) + .execute(&mut conn) .unwrap_or_else(|err| panic!("Couldn't drop database {}: {:?}", self.db_name, err)); } }