Skip to content

Commit

Permalink
Return 409 on duplicate message insert (#1466)
Browse files Browse the repository at this point in the history
Ensure that we don't return a 5xx when user attempts to insert a
duplicate message (i.e., due to msg-uid conflict)

Fixes #1454
  • Loading branch information
jaymell authored Oct 2, 2024
1 parent 034a6a4 commit 7fe243a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
4 changes: 2 additions & 2 deletions server/svix-server/src/v1/endpoints/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
},
},
db::models::{application, message, messagecontent},
error::{Error, HttpError, Result},
error::{http_error_on_conflict, Error, HttpError, Result},
queue::{MessageTaskBatch, TaskQueueProducer},
v1::utils::{
filter_and_paginate_time_limited, openapi_tag, validation_error, ApplicationMsgPath,
Expand Down Expand Up @@ -360,7 +360,7 @@ pub(crate) async fn create_message_inner(
let (msg, msg_content) = db
.transaction(|txn| {
async move {
let msg = msg.insert(txn).await?;
let msg = msg.insert(txn).await.map_err(http_error_on_conflict)?;
let msg_content = messagecontent::ActiveModel::new(msg.id.clone(), payload);
let msg_content = msg_content.insert(txn).await?;
Ok((msg, msg_content))
Expand Down
45 changes: 44 additions & 1 deletion server/svix-server/tests/it/e2e_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
use chrono::{Duration, Utc};
use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, QueryFilter};
use serde::de::IgnoredAny;
use svix_server::{
core::types::{EventTypeName, MessageUid},
db::models::messagecontent,
expired_message_cleaner,
v1::{
endpoints::{
attempt::MessageAttemptOut,
message::{MessageOut, RawPayload},
message::{MessageIn, MessageOut, RawPayload},
},
utils::ListResponse,
},
Expand Down Expand Up @@ -472,3 +474,44 @@ async fn test_expunge_message_payload() {

assert_eq!(msg.payload.0.get(), r#"{"expired":true}"#);
}

#[tokio::test]
async fn test_message_conflict() {
let (client, _jh) = start_svix_server().await;

let app_id = create_test_app(&client, "v1MessageCRTestApp")
.await
.unwrap()
.id;

let _endp_id = create_test_endpoint(&client, &app_id, "http://localhost:2/bad/url/")
.await
.unwrap()
.id;

let msg_in = MessageIn {
event_type: EventTypeName("user.signup".to_owned()),
payload: RawPayload::from_string(serde_json::json!({"test": "value"}).to_string()).unwrap(),
payload_retention_period: 5,
channels: None,
uid: Some(MessageUid("test1".to_owned())),
};

let _: MessageOut = client
.post(
&format!("api/v1/app/{}/msg/", &app_id),
msg_in.clone(),
StatusCode::ACCEPTED,
)
.await
.unwrap();

let _: IgnoredAny = client
.post(
&format!("api/v1/app/{}/msg/", &app_id),
msg_in,
StatusCode::CONFLICT,
)
.await
.unwrap();
}

0 comments on commit 7fe243a

Please sign in to comment.