Skip to content

Commit

Permalink
add aof header versioning (#786)
Browse files Browse the repository at this point in the history
Co-authored-by: Hamdaan Khalid <[email protected]>
  • Loading branch information
hamdaankhalid and hamdaankhalidmsft authored Nov 11, 2024
1 parent 72e405c commit fdf7852
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
30 changes: 26 additions & 4 deletions libs/server/AOF/AofHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,38 @@

namespace Garnet.server
{
[StructLayout(LayoutKind.Explicit, Size = 14)]
[StructLayout(LayoutKind.Explicit, Size = 15)]
struct AofHeader
{
// UPDATE THIS VERSION WHENEVER THE LAYOUT OF THIS STRUCT CHANGES
const byte AOF_HEADER_VERSION = 1;

[FieldOffset(0)]
public AofEntryType opType;
public byte aofHeaderVersion;
[FieldOffset(1)]
public byte type;
public AofEntryType opType;
[FieldOffset(2)]
public byte type;
[FieldOffset(3)]
public long version;
[FieldOffset(10)]
[FieldOffset(11)]
public int sessionID;

public AofHeader(AofEntryType opType, byte type, long version, int sessionID)
{
this.aofHeaderVersion = AOF_HEADER_VERSION;
this.opType = opType;
this.type = type;
this.version = version;
this.sessionID = sessionID;
}

public AofHeader(AofEntryType opType, long version, int sessionID)
{
this.aofHeaderVersion = AOF_HEADER_VERSION;
this.opType = opType;
this.version = version;
this.sessionID = sessionID;
}
}
}
6 changes: 3 additions & 3 deletions libs/server/Storage/Functions/MainStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ void WriteLogUpsert(ref SpanByte key, ref RawStringInput input, ref SpanByte val
input.header.flags |= RespInputFlags.Deterministic;

functionsState.appendOnlyFile.Enqueue(
new AofHeader { opType = AofEntryType.StoreUpsert, version = version, sessionID = sessionId },
new AofHeader(opType: AofEntryType.StoreUpsert, version: version, sessionID: sessionId),
ref key, ref value, ref input, out _);
}

Expand All @@ -692,7 +692,7 @@ void WriteLogRMW(ref SpanByte key, ref RawStringInput input, long version, int s
input.header.flags |= RespInputFlags.Deterministic;

functionsState.appendOnlyFile.Enqueue(
new AofHeader { opType = AofEntryType.StoreRMW, version = version, sessionID = sessionId },
new AofHeader(opType: AofEntryType.StoreRMW, version: version, sessionID: sessionId),
ref key, ref input, out _);
}

Expand All @@ -705,7 +705,7 @@ void WriteLogDelete(ref SpanByte key, long version, int sessionID)
{
if (functionsState.StoredProcMode) return;
SpanByte def = default;
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.StoreDelete, version = version, sessionID = sessionID }, ref key, ref def, out _);
functionsState.appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.StoreDelete, version: version, sessionID: sessionID), ref key, ref def, out _);
}

BitFieldCmdArgs GetBitFieldArguments(ref RawStringInput input)
Expand Down
6 changes: 3 additions & 3 deletions libs/server/Storage/Functions/ObjectStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void WriteLogUpsert(ref byte[] key, ref ObjectInput input, ref IGarnetObject val
var valSB = SpanByte.FromPinnedPointer(valPtr, valueBytes.Length);

functionsState.appendOnlyFile.Enqueue(
new AofHeader { opType = AofEntryType.ObjectStoreUpsert, version = version, sessionID = sessionID },
new AofHeader(opType: AofEntryType.ObjectStoreUpsert, version: version, sessionID: sessionID),
ref keySB, ref valSB, out _);
}
}
Expand All @@ -57,7 +57,7 @@ void WriteLogRMW(ref byte[] key, ref ObjectInput input, long version, int sessio
var sbKey = SpanByte.FromPinnedPointer(keyPtr, key.Length);

functionsState.appendOnlyFile.Enqueue(
new AofHeader { opType = AofEntryType.ObjectStoreRMW, version = version, sessionID = sessionID },
new AofHeader(opType: AofEntryType.ObjectStoreRMW, version: version, sessionID: sessionID),
ref sbKey, ref input, out _);
}
}
Expand All @@ -75,7 +75,7 @@ void WriteLogDelete(ref byte[] key, long version, int sessionID)
var keySB = SpanByte.FromPinnedPointer(ptr, key.Length);
SpanByte valSB = default;

functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.ObjectStoreDelete, version = version, sessionID = sessionID }, ref keySB, ref valSB, out _);
functionsState.appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.ObjectStoreDelete, version: version, sessionID: sessionID), ref keySB, ref valSB, out _);
}
}

Expand Down
6 changes: 3 additions & 3 deletions libs/server/Transaction/TransactionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ internal void Log(byte id, ref CustomProcedureInput procInput)
{
Debug.Assert(functionsState.StoredProcMode);

appendOnlyFile?.Enqueue(new AofHeader { opType = AofEntryType.StoredProcedure, type = id, version = basicContext.Session.Version, sessionID = basicContext.Session.ID }, ref procInput, out _);
appendOnlyFile?.Enqueue(new AofHeader(opType: AofEntryType.StoredProcedure, type: id, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), ref procInput, out _);
}

internal void Commit(bool internal_txn = false)
{
if (appendOnlyFile != null && !functionsState.StoredProcMode)
{
appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.TxnCommit, version = basicContext.Session.Version, sessionID = basicContext.Session.ID }, out _);
appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.TxnCommit, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), out _);
}
if (!internal_txn)
watchContainer.Reset();
Expand Down Expand Up @@ -335,7 +335,7 @@ internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, Tim

if (appendOnlyFile != null && !functionsState.StoredProcMode)
{
appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.TxnStart, version = basicContext.Session.Version, sessionID = basicContext.Session.ID }, out _);
appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.TxnStart, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), out _);
}

state = TxnState.Running;
Expand Down

0 comments on commit fdf7852

Please sign in to comment.