Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Oct 17, 2024
1 parent 969a00b commit 925ede8
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions test/Garnet.test.cluster/ClusterMigrateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1770,5 +1770,91 @@ private void ClusterMigrateExpirationWithVaryingPayload(bool expiration, List<(b
ClassicAssert.IsNotNull(db.KeyTimeToLive(pair.Item1), "key does not have expiry after migration");
}
}

//[Test, Order(16)]
public void ClusterMigrateSlotWalk()
{
var shards = 5;
context.CreateInstances(shards, useTLS: UseTLS);
context.CreateConnection(useTLS: UseTLS);

int sourceNode = 0;
int targetNode = shards - 1;
int slotCount = 10;

// Assign all slots to first node
_ = context.clusterTestUtils.AddSlotsRange(sourceNode, [(0, 16383)], logger: context.logger);

// Set config epoch
for (var i = 0; i < shards; i++)
context.clusterTestUtils.SetConfigEpoch(i, i + 1, logger: context.logger);

// Introduce nodes to each other
for (var i = 1; i < shards; i++)
context.clusterTestUtils.Meet(0, i, logger: context.logger);

// Acquire node ids
var srcNodeIds = new string[shards];
for (var i = 0; i < shards; i++)
srcNodeIds[i] = context.clusterTestUtils.ClusterMyId(i, logger: context.logger);

var srcNodeEndpoints = new IPEndPoint[shards];
for (var i = 0; i < shards; i++)
srcNodeEndpoints[i] = context.clusterTestUtils.GetEndPoint(i);

for (var slot = 0; slot < slotCount; slot++)
{
var _src = sourceNode;
var _tgt = _src + 1;

while (_tgt < shards)
{
// Issue IMPORTING and spinWait until it succeeds
var status = context.clusterTestUtils.SetSlot(_tgt, slot, "IMPORTING", srcNodeIds[_src], logger: context.logger);
while (!status.Equals("OK"))
{
ClusterTestUtils.BackOff();
status = context.clusterTestUtils.SetSlot(_tgt, slot, "IMPORTING", srcNodeIds[_src], logger: context.logger);
}

// Issue MIGRATING and spinWait until it succeeds
status = context.clusterTestUtils.SetSlot(_src, slot, "MIGRATING", srcNodeIds[_tgt], logger: context.logger);
while (!status.Equals("OK"))
{
ClusterTestUtils.BackOff();
status = context.clusterTestUtils.SetSlot(_src, slot, "MIGRATING", srcNodeIds[_tgt], logger: context.logger);
}

// Transfer keys if any
var countKeys = context.clusterTestUtils.CountKeysInSlot(_src, slot, context.logger);
if (countKeys > 0)
{
var keysInSlot = context.clusterTestUtils.GetKeysInSlot(_src, slot, countKeys, context.logger);
context.logger.LogDebug("6. GetKeysInSlot {keysInSlot.Count}", keysInSlot.Count);

context.logger.LogDebug("7. MigrateKeys starting");
context.clusterTestUtils.MigrateKeys(srcNodeEndpoints[_src], srcNodeEndpoints[_tgt], keysInSlot, context.logger);
context.logger.LogDebug("8. MigrateKeys done");
}

// Set slot to stable state
_ = context.clusterTestUtils.SetSlot(_tgt, slot, "NODE", srcNodeIds[_tgt], logger: context.logger);
_ = context.clusterTestUtils.SetSlot(_src, slot, "NODE", srcNodeIds[_tgt], logger: context.logger);

_src++;
_tgt++;
}
}

for (var i = 0; i < shards; i++)
{
var config = context.clusterTestUtils.ClusterNodes(i, logger: context.logger);
for (var slot = 0; slot < slotCount; slot++)
{
var node = config.GetBySlot(slot);

}
}
}
}
}

0 comments on commit 925ede8

Please sign in to comment.