Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codewide: Allow setting Serial,LocalSerial for Consistency #792

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::frame::frame_errors::{FrameError, ParseError};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::types::LegacyConsistency;
use crate::frame::value::SerializeValuesError;
use crate::Consistency;
use bytes::Bytes;
use std::io::ErrorKind;
use std::sync::Arc;
Expand Down Expand Up @@ -108,7 +108,7 @@ pub enum DbError {
)]
Unavailable {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes required to be alive to satisfy required consistency level
required: i32,
/// Found number of active nodes
Expand All @@ -132,7 +132,7 @@ pub enum DbError {
(consistency: {consistency}, received: {received}, required: {required}, data_present: {data_present})")]
ReadTimeout {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -146,7 +146,7 @@ pub enum DbError {
(consistency: {consistency}, received: {received}, required: {required}, write_type: {write_type})")]
WriteTimeout {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the write request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -163,7 +163,7 @@ pub enum DbError {
)]
ReadFailure {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand All @@ -182,7 +182,7 @@ pub enum DbError {
)]
WriteFailure {
/// Consistency level of the query
consistency: LegacyConsistency,
consistency: Consistency,
/// Number of nodes that responded to the read request
received: i32,
/// Number of nodes required to respond to satisfy required consistency level
Expand Down Expand Up @@ -550,7 +550,7 @@ impl WriteType {
#[cfg(test)]
mod tests {
use super::{DbError, QueryError, WriteType};
use crate::frame::types::{Consistency, LegacyConsistency};
use crate::frame::types::Consistency;

#[test]
fn write_type_from_str() {
Expand Down Expand Up @@ -581,7 +581,7 @@ mod tests {
fn dberror_full_info() {
// Test that DbError::Unavailable is displayed correctly
let db_error = DbError::Unavailable {
consistency: LegacyConsistency::Regular(Consistency::Three),
consistency: Consistency::Three,
required: 3,
alive: 2,
};
Expand Down
26 changes: 10 additions & 16 deletions scylla-cql/src/frame/request/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{borrow::Cow, convert::TryInto};
use crate::frame::{
frame_errors::ParseError,
request::{RequestOpcode, SerializableRequest},
types,
types::{self, SerialConsistency},
value::{BatchValues, BatchValuesIterator, SerializedValues},
};

Expand Down Expand Up @@ -202,13 +202,7 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<SerializedV
})
.collect::<Result<Vec<_>, ParseError>>()?;

let consistency = match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => Ok(reg),
types::LegacyConsistency::Serial(ser) => Err(ParseError::BadIncomingData(format!(
"Expected regular Consistency, got SerialConsistency {}",
ser
))),
}?;
let consistency = types::read_consistency(buf)?;

let flags = buf.get_u8();
let unknown_flags = flags & (!ALL_FLAGS);
Expand All @@ -224,15 +218,15 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<SerializedV
let serial_consistency = serial_consistency_flag
.then(|| types::read_consistency(buf))
.transpose()?
.map(|legacy_consistency| match legacy_consistency {
types::LegacyConsistency::Regular(reg) => {
Err(ParseError::BadIncomingData(format!(
.map(
|consistency| match SerialConsistency::try_from(consistency) {
Ok(serial_consistency) => Ok(serial_consistency),
Err(_) => Err(ParseError::BadIncomingData(format!(
"Expected SerialConsistency, got regular Consistency {}",
reg
)))
}
types::LegacyConsistency::Serial(ser) => Ok(ser),
})
consistency
))),
},
)
.transpose()?;

let timestamp = default_timestamp_flag
Expand Down
7 changes: 2 additions & 5 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod tests {
query::{Query, QueryParameters},
DeserializableRequest, SerializableRequest,
},
types::{self, LegacyConsistency, SerialConsistency},
types::{self, SerialConsistency},
value::SerializedValues,
},
Consistency,
Expand Down Expand Up @@ -236,10 +236,7 @@ mod tests {

// Now buf_ptr points at consistency.
let consistency = types::read_consistency(&mut buf_ptr).unwrap();
assert_eq!(
consistency,
LegacyConsistency::Regular(Consistency::default())
);
assert_eq!(consistency, Consistency::default());

// Now buf_ptr points at flags, but it is immutable. Get mutable reference into the buffer.
let flags_idx = buf.len() - buf_ptr.len();
Expand Down
34 changes: 14 additions & 20 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;

use crate::frame::frame_errors::ParseError;
use crate::frame::{frame_errors::ParseError, types::SerialConsistency};
use bytes::{Buf, BufMut, Bytes};

use crate::{
Expand Down Expand Up @@ -134,13 +134,7 @@ impl QueryParameters<'_> {

impl<'q> QueryParameters<'q> {
pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ParseError> {
let consistency = match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => Ok(reg),
types::LegacyConsistency::Serial(ser) => Err(ParseError::BadIncomingData(format!(
"Expected regular Consistency, got SerialConsistency {}",
ser
))),
}?;
let consistency = types::read_consistency(buf)?;

let flags = buf.get_u8();
let unknown_flags = flags & (!ALL_FLAGS);
Expand Down Expand Up @@ -169,19 +163,19 @@ impl<'q> QueryParameters<'q> {
} else {
None
};
let serial_consistency = if serial_consistency_flag {
match types::read_consistency(buf)? {
types::LegacyConsistency::Regular(reg) => {
return Err(ParseError::BadIncomingData(format!(
let serial_consistency = serial_consistency_flag
.then(|| types::read_consistency(buf))
.transpose()?
.map(
|consistency| match SerialConsistency::try_from(consistency) {
Ok(serial_consistency) => Ok(serial_consistency),
Err(_) => Err(ParseError::BadIncomingData(format!(
"Expected SerialConsistency, got regular Consistency {}",
reg
)))
}
types::LegacyConsistency::Serial(ser) => Some(ser),
}
} else {
None
};
consistency
))),
},
)
.transpose()?;
let timestamp = if default_timestamp_flag {
Some(types::read_long(buf)?)
} else {
Expand Down
11 changes: 5 additions & 6 deletions scylla-cql/src/frame/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ mod tests {
use super::Error;
use crate::errors::{DbError, OperationType, WriteType};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::types::LegacyConsistency;
use crate::Consistency;
use bytes::Bytes;
use std::convert::TryInto;
Expand Down Expand Up @@ -151,7 +150,7 @@ mod tests {
assert_eq!(
error.error,
DbError::Unavailable {
consistency: LegacyConsistency::Regular(Consistency::One),
consistency: Consistency::One,
required: 2,
alive: 3,
}
Expand All @@ -178,7 +177,7 @@ mod tests {
assert_eq!(
error.error,
DbError::WriteTimeout {
consistency: LegacyConsistency::Regular(Consistency::Quorum),
consistency: Consistency::Quorum,
received: -5, // Allow negative values when they don't make sense, it's better than crashing with ProtocolError
required: 100,
write_type: WriteType::Simple,
Expand All @@ -202,7 +201,7 @@ mod tests {
assert_eq!(
error.error,
DbError::ReadTimeout {
consistency: LegacyConsistency::Regular(Consistency::Two),
consistency: Consistency::Two,
received: 8,
required: 32,
data_present: false,
Expand All @@ -227,7 +226,7 @@ mod tests {
assert_eq!(
error.error,
DbError::ReadFailure {
consistency: LegacyConsistency::Regular(Consistency::Three),
consistency: Consistency::Three,
received: 4,
required: 5,
numfailures: 6,
Expand Down Expand Up @@ -299,7 +298,7 @@ mod tests {
assert_eq!(
error.error,
DbError::WriteFailure {
consistency: LegacyConsistency::Regular(Consistency::Any),
consistency: Consistency::Any,
received: 2,
required: 4,
numfailures: 8,
Expand Down
66 changes: 38 additions & 28 deletions scylla-cql/src/frame/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::convert::TryInto;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::str;
use thiserror::Error;
use uuid::Uuid;

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)]
Expand All @@ -27,6 +28,11 @@ pub enum Consistency {
LocalQuorum = 0x0006,
EachQuorum = 0x0007,
LocalOne = 0x000A,

// Apparently, Consistency can be set to Serial or LocalSerial in SELECT statements
// to make them use Paxos.
Serial = 0x0008,
LocalSerial = 0x0009,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)]
Expand All @@ -38,13 +44,34 @@ pub enum SerialConsistency {
LocalSerial = 0x0009,
}

// LegacyConsistency exists, because Scylla may return a SerialConsistency value
// as Consistency when returning certain error types - the distinction between
// Consistency and SerialConsistency is not really a thing in CQL.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum LegacyConsistency {
Regular(Consistency),
Serial(SerialConsistency),
impl Consistency {
pub fn is_serial(&self) -> bool {
matches!(self, Consistency::Serial | Consistency::LocalSerial)
}
}

#[derive(Debug, Error)]
#[error("Expected Consistency Serial or LocalSerial, got: {0}")]
pub struct NonSerialConsistencyError(Consistency);
wprzytula marked this conversation as resolved.
Show resolved Hide resolved

impl TryFrom<Consistency> for SerialConsistency {
type Error = NonSerialConsistencyError;

fn try_from(c: Consistency) -> Result<Self, Self::Error> {
match c {
Consistency::Any
| Consistency::One
| Consistency::Two
| Consistency::Three
| Consistency::Quorum
| Consistency::All
| Consistency::LocalQuorum
| Consistency::EachQuorum
| Consistency::LocalOne => Err(NonSerialConsistencyError(c)),
Consistency::Serial => Ok(SerialConsistency::Serial),
Consistency::LocalSerial => Ok(SerialConsistency::LocalSerial),
}
}
}

impl std::fmt::Display for Consistency {
Expand All @@ -59,15 +86,6 @@ impl std::fmt::Display for SerialConsistency {
}
}

impl std::fmt::Display for LegacyConsistency {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Regular(c) => c.fmt(f),
Self::Serial(c) => c.fmt(f),
}
}
}

impl From<std::num::TryFromIntError> for ParseError {
fn from(_err: std::num::TryFromIntError) -> Self {
ParseError::BadIncomingData("Integer conversion out of range".to_string())
Expand Down Expand Up @@ -439,18 +457,10 @@ fn type_uuid() {
assert_eq!(u, u2);
}

pub fn read_consistency(buf: &mut &[u8]) -> Result<LegacyConsistency, ParseError> {
pub fn read_consistency(buf: &mut &[u8]) -> Result<Consistency, ParseError> {
let raw = read_short(buf)?;
let parsed = match Consistency::try_from(raw) {
Ok(c) => LegacyConsistency::Regular(c),
Err(_) => {
let parsed_serial = SerialConsistency::try_from(raw).map_err(|_| {
ParseError::BadIncomingData(format!("unknown consistency: {}", raw))
})?;
LegacyConsistency::Serial(parsed_serial)
}
};
Ok(parsed)
Consistency::try_from(raw)
.map_err(|_| ParseError::BadIncomingData(format!("unknown consistency: {}", raw)))
}

pub fn write_consistency(c: Consistency, buf: &mut impl BufMut) {
Expand All @@ -467,7 +477,7 @@ fn type_consistency() {
let mut buf = Vec::new();
write_consistency(c, &mut buf);
let c2 = read_consistency(&mut &*buf).unwrap();
assert_eq!(LegacyConsistency::Regular(c), c2);
assert_eq!(c, c2);

let c: i16 = 0x1234;
buf.clear();
Expand Down
Loading