From 1708743e786cde41eb1bba51d4e5267895d8227d Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:27:10 -0400 Subject: [PATCH] pageserver: wait for lsn lease duration after transition into AttachedSingle (#9024) Part of #7497, closes https://github.com/neondatabase/neon/issues/8890. ## Problem Since leases are in-memory objects, we need to take special care of them after pageserver restarts and while doing a live migration. The approach we took for pageserver restart is to wait for at least lease duration before doing first GC. We want to do the same for live migration. Since we do not do any GC when a tenant is in `AttachedStale` or `AttachedMulti` mode, only the transition from `AttachedMulti` to `AttachedSingle` requires this treatment. ## Summary of changes - Added `lsn_lease_deadline` field in `GcBlock::reasons`: the tenant is temporarily blocked from GC until we reach the deadline. This information does not persist to S3. - In `GCBlock::start`, skip the GC iteration if we are blocked by the lsn lease deadline. - In `TenantManager::upsert_location`, set the lsn_lease_deadline to `Instant::now() + lsn_lease_length` so the granted leases have a chance to be renewed before we run GC for the first time after transitioned from AttachedMulti to AttachedSingle. Signed-off-by: Yuchen Liang Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/gc_block.rs | 81 ++++++++++++++----- pageserver/src/tenant/mgr.rs | 6 ++ pageserver/src/tenant/tasks.rs | 18 +---- test_runner/regress/test_branch_and_gc.py | 1 + test_runner/regress/test_branch_behind.py | 4 +- test_runner/regress/test_branching.py | 2 +- test_runner/regress/test_compaction.py | 1 + test_runner/regress/test_hot_standby.py | 2 +- test_runner/regress/test_layer_eviction.py | 1 + .../regress/test_pageserver_generations.py | 1 + test_runner/regress/test_remote_storage.py | 2 + test_runner/regress/test_sharding.py | 1 + .../regress/test_storage_controller.py | 2 +- test_runner/regress/test_storage_scrubber.py | 1 + test_runner/regress/test_tenant_detach.py | 4 +- .../regress/test_timeline_gc_blocking.py | 5 +- 16 files changed, 90 insertions(+), 42 deletions(-) diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs index 8b41ba174669..1271d25b7659 100644 --- a/pageserver/src/tenant/gc_block.rs +++ b/pageserver/src/tenant/gc_block.rs @@ -1,11 +1,29 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; +use super::remote_timeline_client::index::GcBlockingReason; +use tokio::time::Instant; use utils::id::TimelineId; -use super::remote_timeline_client::index::GcBlockingReason; +type TimelinesBlocked = HashMap>; -type Storage = HashMap>; +#[derive(Default)] +struct Storage { + timelines_blocked: TimelinesBlocked, + /// The deadline before which we are blocked from GC so that + /// leases have a chance to be renewed. + lsn_lease_deadline: Option, +} + +impl Storage { + fn is_blocked_by_lsn_lease_deadline(&self) -> bool { + self.lsn_lease_deadline + .map(|d| Instant::now() < d) + .unwrap_or(false) + } +} +/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc +/// blocking. #[derive(Default)] pub(crate) struct GcBlock { /// The timelines which have current reasons to block gc. @@ -13,6 +31,12 @@ pub(crate) struct GcBlock { /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`. reasons: std::sync::Mutex, + + /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this. + /// + /// Do not add any more features taking and forbidding taking this lock. It should be + /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`] + /// synchronizes with gc attempts by locking and unlocking this mutex. blocking: tokio::sync::Mutex<()>, } @@ -42,6 +66,20 @@ impl GcBlock { } } + /// Sets a deadline before which we cannot proceed to GC due to lsn lease. + /// + /// We do this as the leases mapping are not persisted to disk. By delaying GC by lease + /// length, we guarantee that all the leases we granted before will have a chance to renew + /// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle. + pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) { + let deadline = Instant::now() + lsn_lease_length; + let mut g = self.reasons.lock().unwrap(); + g.lsn_lease_deadline = Some(deadline); + } + + /// Describe the current gc blocking reasons. + /// + /// TODO: make this json serializable. pub(crate) fn summary(&self) -> Option { let g = self.reasons.lock().unwrap(); @@ -64,7 +102,7 @@ impl GcBlock { ) -> anyhow::Result { let (added, uploaded) = { let mut g = self.reasons.lock().unwrap(); - let set = g.entry(timeline.timeline_id).or_default(); + let set = g.timelines_blocked.entry(timeline.timeline_id).or_default(); let added = set.insert(reason); // LOCK ORDER: intentionally hold the lock, see self.reasons. @@ -95,7 +133,7 @@ impl GcBlock { let (remaining_blocks, uploaded) = { let mut g = self.reasons.lock().unwrap(); - match g.entry(timeline.timeline_id) { + match g.timelines_blocked.entry(timeline.timeline_id) { Entry::Occupied(mut oe) => { let set = oe.get_mut(); set.remove(reason); @@ -109,7 +147,7 @@ impl GcBlock { } } - let remaining_blocks = g.len(); + let remaining_blocks = g.timelines_blocked.len(); // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons let uploaded = timeline @@ -134,11 +172,11 @@ impl GcBlock { pub(crate) fn before_delete(&self, timeline: &super::Timeline) { let unblocked = { let mut g = self.reasons.lock().unwrap(); - if g.is_empty() { + if g.timelines_blocked.is_empty() { return; } - g.remove(&timeline.timeline_id); + g.timelines_blocked.remove(&timeline.timeline_id); BlockingReasons::clean_and_summarize(g).is_none() }; @@ -149,10 +187,11 @@ impl GcBlock { } /// Initialize with the non-deleted timelines of this tenant. - pub(crate) fn set_scanned(&self, scanned: Storage) { + pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) { let mut g = self.reasons.lock().unwrap(); - assert!(g.is_empty()); - g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); + assert!(g.timelines_blocked.is_empty()); + g.timelines_blocked + .extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); if let Some(reasons) = BlockingReasons::clean_and_summarize(g) { tracing::info!(summary=?reasons, "initialized with gc blocked"); @@ -166,6 +205,7 @@ pub(super) struct Guard<'a> { #[derive(Debug)] pub(crate) struct BlockingReasons { + tenant_blocked_by_lsn_lease_deadline: bool, timelines: usize, reasons: enumset::EnumSet, } @@ -174,8 +214,8 @@ impl std::fmt::Display for BlockingReasons { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{} timelines block for {:?}", - self.timelines, self.reasons + "tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}", + self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons ) } } @@ -183,13 +223,15 @@ impl std::fmt::Display for BlockingReasons { impl BlockingReasons { fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option { let mut reasons = enumset::EnumSet::empty(); - g.retain(|_key, value| { + g.timelines_blocked.retain(|_key, value| { reasons = reasons.union(*value); !value.is_empty() }); - if !g.is_empty() { + let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); + if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline { Some(BlockingReasons { - timelines: g.len(), + tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, + timelines: g.timelines_blocked.len(), reasons, }) } else { @@ -198,14 +240,17 @@ impl BlockingReasons { } fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option { - if g.is_empty() { + let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); + if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline { None } else { let reasons = g + .timelines_blocked .values() .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next)); Some(BlockingReasons { - timelines: g.len(), + tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, + timelines: g.timelines_blocked.len(), reasons, }) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2104f415319e..1e7c1e10a5e7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -949,6 +949,12 @@ impl TenantManager { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { match attach_conf.generation.cmp(&tenant.generation) { Ordering::Equal => { + if attach_conf.attach_mode == AttachmentMode::Single { + tenant + .gc_block + .set_lsn_lease_deadline(tenant.get_lsn_lease_length()); + } + // A transition from Attached to Attached in the same generation, we may // take our fast path and just provide the updated configuration // to the tenant. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 478e9bb4f074..57f0123d8fa3 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; + tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length()); loop { tokio::select! { _ = cancel.cancelled() => { @@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { first = false; let delays = async { - delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?; random_init_delay(period, &cancel).await?; Ok::<_, Cancelled>(()) }; @@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay( let mut rng = rand::thread_rng(); rng.gen_range(Duration::ZERO..=period) }; - match tokio::time::timeout(d, cancel.cancelled()).await { Ok(_) => Err(Cancelled), Err(_) => Ok(()), } } -/// Delays GC by defaul lease length at restart. -/// -/// We do this as the leases mapping are not persisted to disk. By delaying GC by default -/// length, we gurantees that all the leases we granted before the restart will expire -/// when we run GC for the first time after the restart. -pub(crate) async fn delay_by_lease_length( - length: Duration, - cancel: &CancellationToken, -) -> Result<(), Cancelled> { - match tokio::time::timeout(length, cancel.cancelled()).await { - Ok(_) => Err(Cancelled), - Err(_) => Ok(()), - } -} - struct Iteration { started_at: Instant, period: Duration, diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index f2e3855c123e..d7c4cf059a4e 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): "image_creation_threshold": "1", # set PITR interval to be small, so we can do GC "pitr_interval": "0 s", + "lsn_lease_length": "0s", } ) diff --git a/test_runner/regress/test_branch_behind.py b/test_runner/regress/test_branch_behind.py index 0a5336f5a246..2bf7041cf14b 100644 --- a/test_runner/regress/test_branch_behind.py +++ b/test_runner/regress/test_branch_behind.py @@ -11,7 +11,9 @@ # def test_branch_behind(neon_env_builder: NeonEnvBuilder): # Disable pitr, because here we want to test branch creation after GC - env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) + env = neon_env_builder.init_start( + initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"} + ) error_regexes = [ ".*invalid branch start lsn.*", diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 1729e2fc9887..3d5c34a5958b 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -419,7 +419,7 @@ def start_creating_timeline(): def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) client = env.pageserver.http_client() diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index be787e064262..cb34551b53fc 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -240,6 +240,7 @@ def test_uploads_and_deletions( "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", "compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}), + "lsn_lease_length": "0s", } env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index d94704012fad..35e0c0decb26 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep): # Without hs feedback enabled we'd see 'User query might have needed to see row # versions that must be removed.' errors. def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) agressive_vacuum_conf = [ "log_autovacuum_min_duration = 0", "autovacuum_naptime = 10s", diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index 193149ea0388..97093ea535c2 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder): # "image_creation_threshold": set at runtime "compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers "image_layer_creation_check_threshold": "0", # always check if a new image layer can be created + "lsn_lease_length": "0s", } def tenant_update_config(changes): diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index c92371343233..519994f77446 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -53,6 +53,7 @@ # create image layers eagerly, so that GC can remove some layers "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", + "lsn_lease_length": "0s", } diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 2e5260ca781a..0a57fc960563 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries( # create image layers eagerly, so that GC can remove some layers "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", + "lsn_lease_length": "0s", } ) @@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric( # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", "compaction_period": "0s", + "lsn_lease_length": "0s", } ) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 4a84dca399a3..1eb33b2d39ca 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: # Disable automatic creation of image layers, as we will create them explicitly when we want them "image_creation_threshold": 9999, "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", } neon_env_builder.storage_controller_config = { diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index dc90a6e9a087..4106efd4f9cc 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -485,7 +485,7 @@ def handler(request: Request): httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler) # Start running - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) # Initial notification from tenant creation assert len(notifications) == 1 diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 848e214c5e46..b6c19f03f6ab 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors( # No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas # and makes them GC'able "pitr_interval": "0s", + "lsn_lease_length": "0s", }, ) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index b165588636c7..e7c6d5a4c382 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -266,13 +266,13 @@ async def reattach_while_busy( def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) pageserver_http = env.pageserver.http_client() env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # create new nenant - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id, timeline_id = env.initial_tenant, env.initial_timeline # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index ddfe9b911fd8..765c72cf2a2f 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool tenant_after = http.tenant_status(env.initial_tenant) assert tenant_before != tenant_after gc_blocking = tenant_after["gc_blocking"] - assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }" + assert ( + gc_blocking + == "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }" + ) wait_for_another_gc_round() pss.assert_log_contains(gc_skipped_line)