Skip to content

Commit

Permalink
Merge pull request #1063 from wprzytula/fix-cached-metadata
Browse files Browse the repository at this point in the history
result: fix bug: paging state taken from PREPARED response instead of RESULT
  • Loading branch information
wprzytula authored Aug 22, 2024
2 parents bac7f9e + 5249309 commit 45f0288
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
6 changes: 5 additions & 1 deletion scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,11 @@ fn deser_rows(
let server_metadata = deser_result_metadata(buf)?;

let metadata = match cached_metadata {
Some(metadata) => metadata.clone(),
Some(cached) => ResultMetadata {
col_count: cached.col_count,
paging_state: server_metadata.paging_state,
col_specs: cached.col_specs.clone(),
},
None => {
// No cached_metadata provided. Server is supposed to provide the result metadata.
if server_metadata.col_count != server_metadata.col_specs.len() {
Expand Down
65 changes: 65 additions & 0 deletions scylla/tests/integration/skip_metadata_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
#[cfg(not(scylla_cloud_tests))]
async fn test_skip_result_metadata() {
setup_tracing();
use bytes::Bytes;
use scylla_proxy::{ResponseOpcode, ResponseRule};

const NO_METADATA_FLAG: i32 = 0x0004;
Expand Down Expand Up @@ -79,6 +80,70 @@ async fn test_skip_result_metadata() {
prepared.set_use_cached_result_metadata(true);
test_with_flags_predicate(&session, &prepared, &mut rx, |flags| flags & NO_METADATA_FLAG != 0).await;

// Verify that the optimisation does not break paging
{
let ks = unique_keyspace_name();

session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, true).await.unwrap();

type RowT = (i32, i32, String);
session
.query(
"CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))",
&[],
)
.await
.unwrap();

let insert_stmt = session
.prepare("INSERT INTO t2 (a, b, c) VALUES (?, ?, ?)")
.await
.unwrap();

for idx in 0..10 {
session
.execute(&insert_stmt, (idx, idx + 1, "Some text"))
.await
.unwrap();
}

{
let select_query = "SELECT a, b, c FROM t2";

let rs = session
.query(select_query, ())
.await
.unwrap()
.rows_typed::<RowT>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();

let mut results_from_manual_paging: Vec<RowT> = vec![];
let mut prepared_paged = session.prepare(select_query).await.unwrap();
prepared_paged.set_use_cached_result_metadata(true);
prepared_paged.set_page_size(1);
let mut paging_state: Option<Bytes> = None;
let mut watchdog = 0;
loop {
let mut rs_manual = session
.execute_paged(&prepared_paged, &[], paging_state)
.await
.unwrap();
eprintln!("Paging state: {:?}", rs_manual.paging_state);
paging_state = rs_manual.paging_state.take();
results_from_manual_paging
.extend(rs_manual.rows_typed::<RowT>().unwrap().map(Result::unwrap));
if watchdog > 30 || paging_state.is_none() {
break;
}
watchdog += 1;
}
assert_eq!(results_from_manual_paging, rs);
}
}

running_proxy
}).await;

Expand Down

0 comments on commit 45f0288

Please sign in to comment.