Skip to content

Commit

Permalink
codewide: allow setting Consistency to Serial
Browse files Browse the repository at this point in the history
It appears that, contrary to what we thought, LocalSerial and Serial
are also valid values for Consistency in some cases: for LWT SELECTs.
Therefore, Consistency is extended with Serial and LocalSerial variants,
and LegacyConsistency is removed as no longer making sense.
  • Loading branch information
wprzytula committed Aug 22, 2023
1 parent decac4e commit e697125
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 150 deletions.
16 changes: 8 additions & 8 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::frame::frame_errors::{FrameError, ParseError};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::types::LegacyConsistency;
use crate::frame::value::SerializeValuesError;
use crate::Consistency;
use bytes::Bytes;
use std::io::ErrorKind;
use std::sync::Arc;
Expand Down Expand Up @@ -108,7 +108,7 @@ pub enum DbError {
)]
Unavailable {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes required to be alive to satisfy required consistency level
required: i32,
/// Found number of active nodes
Expand All @@ -132,7 +132,7 @@ pub enum DbError {
(consistency: {consistency}, received: {received}, required: {required}, data_present: {data_present})")]
ReadTimeout {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -146,7 +146,7 @@ pub enum DbError {
(consistency: {consistency}, received: {received}, required: {required}, write_type: {write_type})")]
WriteTimeout {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the write request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -163,7 +163,7 @@ pub enum DbError {
)]
ReadFailure {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -182,7 +182,7 @@ pub enum DbError {
)]
WriteFailure {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand Down Expand Up @@ -550,7 +550,7 @@ impl WriteType {
#[cfg(test)]
mod tests {
use super::{DbError, QueryError, WriteType};
use crate::frame::types::{Consistency, LegacyConsistency};
use crate::frame::types::Consistency;

#[test]
fn write_type_from_str() {
Expand Down Expand Up @@ -581,7 +581,7 @@ mod tests {
fn dberror_full_info() {
// Test that DbError::Unavailable is displayed correctly
let db_error = DbError::Unavailable {
consistency: LegacyConsistency::Regular(Consistency::Three),
consistency: Consistency::Three,
required: 3,
alive: 2,
};
Expand Down
26 changes: 10 additions & 16 deletions scylla-cql/src/frame/request/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{borrow::Cow, convert::TryInto};
use crate::frame::{
frame_errors::ParseError,
request::{RequestOpcode, SerializableRequest},
types,
types::{self, SerialConsistency},
value::{BatchValues, BatchValuesIterator, SerializedValues},
};

Expand Down Expand Up @@ -202,13 +202,7 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<SerializedV
})
.collect::<Result<Vec<_>, ParseError>>()?;

let consistency = match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => Ok(reg),
types::LegacyConsistency::Serial(ser) => Err(ParseError::BadIncomingData(format!(
"Expected regular Consistency, got SerialConsistency {}",
ser
))),
}?;
let consistency = types::read_consistency(buf)?;

let flags = buf.get_u8();
let unknown_flags = flags & (!ALL_FLAGS);
Expand All @@ -224,15 +218,15 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<SerializedV
let serial_consistency = serial_consistency_flag
.then(|| types::read_consistency(buf))
.transpose()?
.map(|legacy_consistency| match legacy_consistency {
types::LegacyConsistency::Regular(reg) => {
Err(ParseError::BadIncomingData(format!(
.map(
|consistency| match SerialConsistency::try_from(consistency) {
Ok(serial_consistency) => Ok(serial_consistency),
Err(_) => Err(ParseError::BadIncomingData(format!(
"Expected SerialConsistency, got regular Consistency {}",
reg
)))
}
types::LegacyConsistency::Serial(ser) => Ok(ser),
})
consistency
))),
},
)
.transpose()?;

let timestamp = default_timestamp_flag
Expand Down
7 changes: 2 additions & 5 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod tests {
query::{Query, QueryParameters},
DeserializableRequest, SerializableRequest,
},
types::{self, LegacyConsistency, SerialConsistency},
types::{self, SerialConsistency},
value::SerializedValues,
},
Consistency,
Expand Down Expand Up @@ -236,10 +236,7 @@ mod tests {

// Now buf_ptr points at consistency.
let consistency = types::read_consistency(&mut buf_ptr).unwrap();
assert_eq!(
consistency,
LegacyConsistency::Regular(Consistency::default())
);
assert_eq!(consistency, Consistency::default());

// Now buf_ptr points at flags, but it is immutable. Get mutable reference into the buffer.
let flags_idx = buf.len() - buf_ptr.len();
Expand Down
34 changes: 14 additions & 20 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;

use crate::frame::frame_errors::ParseError;
use crate::frame::{frame_errors::ParseError, types::SerialConsistency};
use bytes::{Buf, BufMut, Bytes};

use crate::{
Expand Down Expand Up @@ -134,13 +134,7 @@ impl QueryParameters<'_> {

impl<'q> QueryParameters<'q> {
pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ParseError> {
let consistency = match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => Ok(reg),
types::LegacyConsistency::Serial(ser) => Err(ParseError::BadIncomingData(format!(
"Expected regular Consistency, got SerialConsistency {}",
ser
))),
}?;
let consistency = types::read_consistency(buf)?;

let flags = buf.get_u8();
let unknown_flags = flags & (!ALL_FLAGS);
Expand Down Expand Up @@ -169,19 +163,19 @@ impl<'q> QueryParameters<'q> {
} else {
None
};
let serial_consistency = if serial_consistency_flag {
match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => {
return Err(ParseError::BadIncomingData(format!(
let serial_consistency = serial_consistency_flag
.then(|| types::read_consistency(buf))
.transpose()?
.map(
|consistency| match SerialConsistency::try_from(consistency) {
Ok(serial_consistency) => Ok(serial_consistency),
Err(_) => Err(ParseError::BadIncomingData(format!(
"Expected SerialConsistency, got regular Consistency {}",
reg
)))
}
types::LegacyConsistency::Serial(ser) => Some(ser),
}
} else {
None
};
consistency
))),
},
)
.transpose()?;
let timestamp = if default_timestamp_flag {
Some(types::read_long(buf)?)
} else {
Expand Down
11 changes: 5 additions & 6 deletions scylla-cql/src/frame/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ mod tests {
use super::Error;
use crate::errors::{DbError, OperationType, WriteType};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::types::LegacyConsistency;
use crate::Consistency;
use bytes::Bytes;
use std::convert::TryInto;
Expand Down Expand Up @@ -151,7 +150,7 @@ mod tests {
assert_eq!(
error.error,
DbError::Unavailable {
consistency: LegacyConsistency::Regular(Consistency::One),
consistency: Consistency::One,
required: 2,
alive: 3,
}
Expand All @@ -178,7 +177,7 @@ mod tests {
assert_eq!(
error.error,
DbError::WriteTimeout {
consistency: LegacyConsistency::Regular(Consistency::Quorum),
consistency: Consistency::Quorum,
received: -5, // Allow negative values when they don't make sense, it's better than crashing with ProtocolError
required: 100,
write_type: WriteType::Simple,
Expand All @@ -202,7 +201,7 @@ mod tests {
assert_eq!(
error.error,
DbError::ReadTimeout {
consistency: LegacyConsistency::Regular(Consistency::Two),
consistency: Consistency::Two,
received: 8,
required: 32,
data_present: false,
Expand All @@ -227,7 +226,7 @@ mod tests {
assert_eq!(
error.error,
DbError::ReadFailure {
consistency: LegacyConsistency::Regular(Consistency::Three),
consistency: Consistency::Three,
received: 4,
required: 5,
numfailures: 6,
Expand Down Expand Up @@ -299,7 +298,7 @@ mod tests {
assert_eq!(
error.error,
DbError::WriteFailure {
consistency: LegacyConsistency::Regular(Consistency::Any),
consistency: Consistency::Any,
received: 2,
required: 4,
numfailures: 8,
Expand Down
66 changes: 38 additions & 28 deletions scylla-cql/src/frame/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::convert::TryInto;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::str;
use thiserror::Error;
use uuid::Uuid;

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)]
Expand All @@ -27,6 +28,11 @@ pub enum Consistency {
LocalQuorum = 0x0006,
EachQuorum = 0x0007,
LocalOne = 0x000A,

// Apparently, Consistency can be set to Serial or LocalSerial in SELECT statements
// to make them use Paxos.
Serial = 0x0008,
LocalSerial = 0x0009,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)]
Expand All @@ -38,13 +44,34 @@ pub enum SerialConsistency {
LocalSerial = 0x0009,
}

// LegacyConsistency exists, because Scylla may return a SerialConsistency value
// as Consistency when returning certain error types - the distinction between
// Consistency and SerialConsistency is not really a thing in CQL.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum LegacyConsistency {
Regular(Consistency),
Serial(SerialConsistency),
impl Consistency {
pub fn is_serial(&self) -> bool {
matches!(self, Consistency::Serial | Consistency::LocalSerial)
}
}

#[derive(Debug, Error)]
#[error("Expected Consistency Serial or LocalSerial, got: {0}")]
pub struct NonSerialConsistencyError(Consistency);

impl TryFrom<Consistency> for SerialConsistency {
type Error = NonSerialConsistencyError;

fn try_from(c: Consistency) -> Result<Self, Self::Error> {
match c {
Consistency::Any
| Consistency::One
| Consistency::Two
| Consistency::Three
| Consistency::Quorum
| Consistency::All
| Consistency::LocalQuorum
| Consistency::EachQuorum
| Consistency::LocalOne => Err(NonSerialConsistencyError(c)),
Consistency::Serial => Ok(SerialConsistency::Serial),
Consistency::LocalSerial => Ok(SerialConsistency::LocalSerial),
}
}
}

impl std::fmt::Display for Consistency {
Expand All @@ -59,15 +86,6 @@ impl std::fmt::Display for SerialConsistency {
}
}

impl std::fmt::Display for LegacyConsistency {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Regular(c) => c.fmt(f),
Self::Serial(c) => c.fmt(f),
}
}
}

impl From<std::num::TryFromIntError> for ParseError {
fn from(_err: std::num::TryFromIntError) -> Self {
ParseError::BadIncomingData("Integer conversion out of range".to_string())
Expand Down Expand Up @@ -439,18 +457,10 @@ fn type_uuid() {
assert_eq!(u, u2);
}

pub fn read_consistency(buf: &mut &[u8]) -> Result<LegacyConsistency, ParseError> {
pub fn read_consistency(buf: &mut &[u8]) -> Result<Consistency, ParseError> {
let raw = read_short(buf)?;
let parsed = match Consistency::try_from(raw) {
Ok(c) => LegacyConsistency::Regular(c),
Err(_) => {
let parsed_serial = SerialConsistency::try_from(raw).map_err(|_| {
ParseError::BadIncomingData(format!("unknown consistency: {}", raw))
})?;
LegacyConsistency::Serial(parsed_serial)
}
};
Ok(parsed)
Consistency::try_from(raw)
.map_err(|_| ParseError::BadIncomingData(format!("unknown consistency: {}", raw)))
}

pub fn write_consistency(c: Consistency, buf: &mut impl BufMut) {
Expand All @@ -467,7 +477,7 @@ fn type_consistency() {
let mut buf = Vec::new();
write_consistency(c, &mut buf);
let c2 = read_consistency(&mut &*buf).unwrap();
assert_eq!(LegacyConsistency::Regular(c), c2);
assert_eq!(c, c2);

let c: i16 = 0x1234;
buf.clear();
Expand Down
Loading

0 comments on commit e697125

Please sign in to comment.