Skip to content

Commit

Permalink
background replay consume implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Nov 8, 2024
1 parent 4c3e338 commit e1a111c
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public bool TryReplicateFromPrimary(out ReadOnlySpan<byte> errorMessage, bool ba
storeWrapper.appendOnlyFile?.WaitForCommit();
}

// Reset background replay iterator
ResetReplayIterator();

// Reset replication offset
ReplicationOffset = 0;

Expand Down
118 changes: 118 additions & 0 deletions libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Threading;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IBulkLogEntryConsumer, IDisposable
{
TsavoriteLogScanSingleIterator replayIterator = null;
CancellationTokenSource replicaReplayTaskCts;
SingleWriterMultiReaderLock activeReplay;

void ResetReplayIterator()
{
ResetReplayCts();
replayIterator?.Dispose();
replayIterator = null;

void ResetReplayCts()
{
if (replicaReplayTaskCts == null)
{
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}
else
{
replicaReplayTaskCts.Cancel();
try
{
activeReplay.WriteLock();
replicaReplayTaskCts.Dispose();
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}
finally
{
activeReplay.WriteUnlock();
}
}
}
}

public void Throttle() { }

public unsafe void Consume(byte* record, int recordLength, long currentAddress, long nextAddress, bool isProtected)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();

if (ReplicationOffset != currentAddress)
{
logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false);
}

ReplicationOffset = currentAddress;
var ptr = record;
while (ptr < record + recordLength)
{
var entryLength = storeWrapper.appendOnlyFile.HeaderSize;
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
{
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
}
TsavoriteLogRecoveryInfo info = new();
info.Initialize(new ReadOnlySpan<byte>(ptr + entryLength, -payloadLength));
//TODO: Verify again that this does not write into the AOF
storeWrapper.appendOnlyFile?.UnsafeCommitMetadataOnly(info, isProtected);
entryLength += TsavoriteLog.UnsafeAlign(-payloadLength);
}
ptr += entryLength;
ReplicationOffset += entryLength;
}

if (ReplicationOffset != nextAddress)
{
logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false);
}
}

public async void ReplicaReplayTask()
{
try
{
activeReplay.ReadLock();
while (true)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
await replayIterator.BulkConsumeAllAsync(
this,
clusterProvider.serverOptions.ReplicaSyncDelayMs,
maxChunkSize: 1 << 20,
replicaReplayTaskCts.Token);
}
}
catch (Exception ex)
{
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ReplicaReplayTask - terminating");
}
finally
{
activeReplay.ReadUnlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@
// Licensed under the MIT license.

using System;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IDisposable
{
void ThrottlePrimary()
{
while (replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicaMaxLag)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
Thread.Yield();
}
}

/// <summary>
/// Apply primary AOF records.
/// </summary>
Expand Down Expand Up @@ -48,66 +58,37 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
)
{
logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress);
storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress);
storeWrapper.appendOnlyFile.SafeInitialize(currentAddress, currentAddress);
ReplicationOffset = currentAddress;
}
}
}

// Address check
if (ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
}

// Enqueue to AOF
_ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span<byte>(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit);

// TODO: rest of the processing can be moved off the critical path
// Throttle to give the opportunity to the background replay task to catch up
ThrottlePrimary();

ReplicationOffset = currentAddress;
var ptr = record;
while (ptr < record + recordLength)
// If background task has not been initialized
// initialize it here and start background replay task
if (replayIterator == null)
{
var entryLength = storeWrapper.appendOnlyFile.HeaderSize;
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
{
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
}
TsavoriteLogRecoveryInfo info = new();
info.Initialize(new ReadOnlySpan<byte>(ptr + entryLength, -payloadLength));
storeWrapper.appendOnlyFile?.UnsafeCommitMetadataOnly(info);
entryLength += TsavoriteLog.UnsafeAlign(-payloadLength);
}
ptr += entryLength;
ReplicationOffset += entryLength;
}
replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(
previousAddress,
long.MaxValue,
scanUncommitted: true,
recover: false,
logger: logger);

if (ReplicationOffset != nextAddress)
{
logger?.LogWarning("Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, nextAddress {nextAddress}", ReplicationOffset, nextAddress);
throw new GarnetException($"Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, nextAddress {nextAddress}", LogLevel.Warning, clientResponse: false);
}

if (ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogWarning("After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
Task.Run(ReplicaReplayTask);
}
//Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ProcessPrimaryStream");
ResetReplayIterator();
throw new GarnetException(ex.Message, ex, LogLevel.Warning, clientResponse: false);
}
}
Expand Down
4 changes: 4 additions & 0 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null

// After initializing replication history propagate replicationId to ReplicationLogCheckpointManager
SetPrimaryReplicationId();
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}

/// <summary>
Expand Down Expand Up @@ -183,6 +184,9 @@ public void Dispose()

checkpointStore.WaitForReplicas();
replicaSyncSessionTaskStore.Dispose();
replicaReplayTaskCts.Cancel();
activeReplay.WriteLock();
replicaReplayTaskCts.Dispose();
ctsRepManager.Cancel();
ctsRepManager.Dispose();
aofTaskStore.Dispose();
Expand Down

0 comments on commit e1a111c

Please sign in to comment.