From 8b2dac37b07b47b6c843f02e27dab70f784041a7 Mon Sep 17 00:00:00 2001 From: Vitor Guidi Date: Fri, 8 Nov 2024 15:29:10 -0300 Subject: [PATCH 1/7] [Monitoring] Adding a counter metric for rate limit events (#4394) ### Motivation Adding a metric to keep track of rate limits Part of #4271 --- src/clusterfuzz/_internal/bot/tasks/commands.py | 6 ++++++ src/clusterfuzz/_internal/metrics/monitoring_metrics.py | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/src/clusterfuzz/_internal/bot/tasks/commands.py b/src/clusterfuzz/_internal/bot/tasks/commands.py index 86f6faabe7..cd78211ba1 100644 --- a/src/clusterfuzz/_internal/bot/tasks/commands.py +++ b/src/clusterfuzz/_internal/bot/tasks/commands.py @@ -39,6 +39,7 @@ from clusterfuzz._internal.datastore import data_handler from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.metrics import monitoring_metrics from clusterfuzz._internal.system import environment from clusterfuzz._internal.system import process_handler from clusterfuzz._internal.system import shell @@ -211,6 +212,11 @@ def run_command(task_name, task_argument, job_name, uworker_env): rate_limiter = task_rate_limiting.TaskRateLimiter(task_name, task_argument, job_name) if rate_limiter.is_rate_limited(): + monitoring_metrics.TASK_RATE_LIMIT_COUNT.increment(labels={ + 'job': job_name, + 'task': task_name, + 'argument': task_argument, + }) logs.error(f'Rate limited task: {task_name} {task_argument} {job_name}') if task_name == 'fuzz': # Wait 10 seconds. We don't want to try again immediately because if we diff --git a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py index dff8f9ce46..c53c2f1f05 100644 --- a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py +++ b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py @@ -195,6 +195,15 @@ ], ) +TASK_RATE_LIMIT_COUNT = monitor.CounterMetric( + 'task/rate_limit', + description=('Counter for rate limit events.'), + field_spec=[ + monitor.StringField('task'), + monitor.StringField('job'), + monitor.StringField('argument'), + ]) + UTASK_SUBTASK_E2E_DURATION_SECS = monitor.CumulativeDistributionMetric( 'utask/subtask_e2e_duration_secs', description=( From 597d1e15ad665af8fdc82c88a11ff97cfbbc59a0 Mon Sep 17 00:00:00 2001 From: jonathanmetzman <31354670+jonathanmetzman@users.noreply.github.com> Date: Fri, 8 Nov 2024 14:37:16 -0500 Subject: [PATCH 2/7] Fix failing test (#4395) --- src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py index 2a99dc9dfb..6769fb75b6 100644 --- a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py @@ -52,11 +52,13 @@ def setUp(self): 'clusterfuzz._internal.base.persistent_cache.get_value', 'clusterfuzz._internal.base.persistent_cache.set_value', 'clusterfuzz._internal.base.utils.utcnow', + 'clusterfuzz._internal.base.tasks.task_utils.get_opted_in_tasks', 'time.sleep', ]) self.mock.get_value.return_value = None self.mock.sleep.return_value = None + self.mock.get_opted_in_tasks.return_value = False data_types.Job(name='job').put() client = pubsub.PubSubClient() From 20997be12c61dc549f58a51ff27512827be711ce Mon Sep 17 00:00:00 2001 From: jonathanmetzman <31354670+jonathanmetzman@users.noreply.github.com> Date: Tue, 12 Nov 2024 09:25:29 -0500 Subject: [PATCH 3/7] Give corpus pruning enough time to complete (#4396) Corpus pruning can take up to 24 hours to complete, this is more than the default of 6 hours. --- .../_internal/base/tasks/__init__.py | 2 +- .../_internal/google_cloud_utils/batch.py | 33 ++++++++++++----- .../core/google_cloud_utils/batch_test.py | 36 +++++++++++++------ 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index e57c739a24..edf3027435 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -281,7 +281,7 @@ def is_done_collecting_messages(): def get_postprocess_task(): """Gets a postprocess task if one exists.""" # This should only be run on non-preemptible bots. - if not (task_utils.is_remotely_executing_utasks() or + if not (task_utils.is_remotely_executing_utasks() and task_utils.get_opted_in_tasks()): return None # Postprocess is platform-agnostic, so we run all such tasks on our diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 0e0ccf037a..8af1b1bbab 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -21,6 +21,7 @@ from google.cloud import batch_v1 as batch from clusterfuzz._internal.base import retry +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.config import local_config @@ -33,7 +34,6 @@ _local = threading.local() -MAX_DURATION = f'{60 * 60 * 6}s' RETRY_COUNT = 0 TASK_BUNCH_SIZE = 20 @@ -46,9 +46,20 @@ MAX_CONCURRENT_VMS_PER_JOB = 1000 BatchWorkloadSpec = collections.namedtuple('BatchWorkloadSpec', [ - 'clusterfuzz_release', 'disk_size_gb', 'disk_type', 'docker_image', - 'user_data', 'service_account_email', 'subnetwork', 'preemptible', - 'project', 'gce_zone', 'machine_type', 'network', 'gce_region' + 'clusterfuzz_release', + 'disk_size_gb', + 'disk_type', + 'docker_image', + 'user_data', + 'service_account_email', + 'subnetwork', + 'preemptible', + 'project', + 'gce_zone', + 'machine_type', + 'network', + 'gce_region', + 'max_run_duration', ]) @@ -158,7 +169,7 @@ def _get_task_spec(batch_workload_spec): task_spec = batch.TaskSpec() task_spec.runnables = [runnable] task_spec.max_retry_count = RETRY_COUNT - task_spec.max_run_duration = MAX_DURATION + task_spec.max_run_duration = batch_workload_spec.max_duration return task_spec @@ -219,8 +230,7 @@ def _create_job(spec, input_urls): create_request.job_id = job_name # The job's parent is the region in which the job will run project_id = spec.project - create_request.parent = ( - f'projects/{project_id}/locations/{spec.gce_region}') + create_request.parent = f'projects/{project_id}/locations/{spec.gce_region}' job_result = _send_create_job_request(create_request) logs.info(f'Created batch job id={job_name}.', spec=spec) return job_result @@ -274,6 +284,11 @@ def _get_config_name(command, job_name): return config_name +def _get_task_duration(command): + return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command, + tasks.TASK_LEASE_SECONDS) + + def _get_spec_from_config(command, job_name): """Gets the configured specifications for a batch workload.""" config_name = _get_config_name(command, job_name) @@ -285,6 +300,7 @@ def _get_spec_from_config(command, job_name): docker_image = instance_spec['docker_image'] user_data = instance_spec['user_data'] clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod') + max_run_duration = f'{_get_task_duration(command)}s' spec = BatchWorkloadSpec( clusterfuzz_release=clusterfuzz_release, docker_image=docker_image, @@ -298,5 +314,6 @@ def _get_spec_from_config(command, job_name): network=instance_spec['network'], subnetwork=instance_spec['subnetwork'], preemptible=instance_spec['preemptible'], - machine_type=instance_spec['machine_type']) + machine_type=instance_spec['machine_type'], + max_run_duration=max_run_duration) return spec diff --git a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py index bba136fbc4..1dc08d0861 100644 --- a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py +++ b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py @@ -19,6 +19,8 @@ from clusterfuzz._internal.google_cloud_utils import batch from clusterfuzz._internal.tests.test_libs import test_utils +# pylint: disable=protected-access + @test_utils.with_cloud_emulators('datastore') class GetSpecFromConfigTest(unittest.TestCase): @@ -26,13 +28,13 @@ class GetSpecFromConfigTest(unittest.TestCase): def setUp(self): self.maxDiff = None + self.job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + self.job.put() - def test_nonpreemptible_get_spec_from_config(self): + def test_nonpreemptible(self): """Tests that get_spec_from_config works for non-preemptibles as expected.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = batch._get_spec_from_config('corpus_pruning', job.name) # pylint: disable=protected-access + spec = batch._get_spec_from_config('analyze', self.job.name) expected_spec = batch.BatchWorkloadSpec( clusterfuzz_release='prod', docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654', @@ -47,15 +49,14 @@ def test_nonpreemptible_get_spec_from_config(self): gce_zone='gce-zone', project='test-clusterfuzz', preemptible=False, - machine_type='n1-standard-1') + machine_type='n1-standard-1', + max_run_duration='21600s') self.assertCountEqual(spec, expected_spec) - def test_preemptible_get_spec_from_config(self): + def test_preemptible(self): """Tests that get_spec_from_config works for preemptibles as expected.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = batch._get_spec_from_config('fuzz', job.name) # pylint: disable=protected-access + spec = batch._get_spec_from_config('fuzz', self.job.name) expected_spec = batch.BatchWorkloadSpec( clusterfuzz_release='prod', docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654', @@ -70,6 +71,21 @@ def test_preemptible_get_spec_from_config(self): gce_region='gce-region', project='test-clusterfuzz', preemptible=True, - machine_type='n1-standard-1') + machine_type='n1-standard-1', + max_run_duration='21600s') self.assertCountEqual(spec, expected_spec) + + def test_corpus_pruning(self): + """Tests that corpus pruning uses a spec of 24 hours and a different one + than normal.""" + pruning_spec = batch._get_spec_from_config('corpus_pruning', self.job.name) + self.assertEqual(pruning_spec.max_run_duration, f'{24 * 60 * 60}s') + normal_spec = batch._get_spec_from_config('analyze', self.job.name) + self.assertNotEqual(pruning_spec, normal_spec) + job = data_types.Job(name='libfuzzer_chrome_msan', platform='LINUX') + job.put() + # This behavior is important for grouping batch alike tasks into a single + # batch job. + pruning_spec2 = batch._get_spec_from_config('corpus_pruning', job.name) + self.assertEqual(pruning_spec, pruning_spec2) From bbd3c6fb71bada7f1a6a4abb335678e8701c2709 Mon Sep 17 00:00:00 2001 From: jonathanmetzman <31354670+jonathanmetzman@users.noreply.github.com> Date: Tue, 12 Nov 2024 09:25:50 -0500 Subject: [PATCH 4/7] Add a field for TTL in WindowTaskRateLimit. (#4385) --- .../_internal/base/tasks/task_rate_limiting.py | 5 +++-- src/clusterfuzz/_internal/datastore/data_types.py | 11 ++++++++++- .../tests/core/base/tasks/task_rate_limiting_test.py | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/task_rate_limiting.py b/src/clusterfuzz/_internal/base/tasks/task_rate_limiting.py index f12e775f74..4d68a43436 100644 --- a/src/clusterfuzz/_internal/base/tasks/task_rate_limiting.py +++ b/src/clusterfuzz/_internal/base/tasks/task_rate_limiting.py @@ -37,7 +37,6 @@ class TaskRateLimiter: """Rate limiter for tasks. This limits tasks to 100 erroneous runs or 2000 succesful runs in 6 hours. It keeps track of task completion when record_task is called at the end of every task.""" - TASK_RATE_LIMIT_WINDOW = datetime.timedelta(hours=6) TASK_RATE_LIMIT_MAX_ERRORS = 100 # TODO(metzman): Reevaluate this number, it's probably too high. TASK_RATE_LIMIT_MAX_COMPLETIONS = 2000 @@ -74,7 +73,9 @@ def is_rate_limited(self) -> bool: if environment.get_value('COMMAND_OVERRIDE'): # A user wants to run this task. return False - window_start = _get_datetime_now() - self.TASK_RATE_LIMIT_WINDOW + window_start = ( + _get_datetime_now() - + data_types.WindowRateLimitTask.TASK_RATE_LIMIT_WINDOW) query = data_types.WindowRateLimitTask.query( data_types.WindowRateLimitTask.task_name == self.task_name, data_types.WindowRateLimitTask.task_argument == self.task_argument, diff --git a/src/clusterfuzz/_internal/datastore/data_types.py b/src/clusterfuzz/_internal/datastore/data_types.py index 28f765b606..fdb16d16ba 100644 --- a/src/clusterfuzz/_internal/datastore/data_types.py +++ b/src/clusterfuzz/_internal/datastore/data_types.py @@ -13,6 +13,7 @@ # limitations under the License. """Classes for objects stored in the datastore.""" +import datetime import re from google.cloud import ndb @@ -1119,13 +1120,21 @@ class WindowRateLimitTask(Model): it will have a different lifecycle (it's not needed after the window completes). This should have a TTL as TASK_RATE_LIMIT_WINDOW in task_rate_limiting.py (6 hours).""" - # TODO(metzman): Consider using task_id. + TASK_RATE_LIMIT_WINDOW = datetime.timedelta(hours=6) + timestamp = ndb.DateTimeProperty(auto_now_add=True, indexed=True) + # Only use this for TTL. It should only be saved to by ClusterFuzz, not read. + ttl_expiry_timestamp = ndb.DateTimeProperty() + # TODO(metzman): Consider using task_id. task_name = ndb.StringProperty(indexed=True) task_argument = ndb.StringProperty(indexed=True) job_name = ndb.StringProperty(indexed=True) status = ndb.StringProperty(choices=[TaskState.ERROR, TaskState.FINISHED]) + def _pre_put_hook(self): + self.ttl_expiry_timestamp = ( + datetime.datetime.now() + self.TASK_RATE_LIMIT_WINDOW) + class BuildMetadata(Model): """Metadata associated with a particular archived build.""" diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/task_rate_limiting_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/task_rate_limiting_test.py index 5b82c3f091..9f2ecde40f 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/task_rate_limiting_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/task_rate_limiting_test.py @@ -91,7 +91,7 @@ def test_is_rate_limited_old_tasks(self): """Test is_rate_limited() with old tasks outside the window.""" # Add tasks outside the time window. window_start = ( - self.now - task_rate_limiting.TaskRateLimiter.TASK_RATE_LIMIT_WINDOW) + self.now - data_types.WindowRateLimitTask.TASK_RATE_LIMIT_WINDOW) self._create_n_tasks( task_rate_limiting.TaskRateLimiter.TASK_RATE_LIMIT_MAX_COMPLETIONS + 1, timestamp=window_start - datetime.timedelta(minutes=10)) From e4f2d629f4941c1894148c67426c0a44abc7f64b Mon Sep 17 00:00:00 2001 From: Vitor Guidi Date: Tue, 12 Nov 2024 13:18:36 -0300 Subject: [PATCH 5/7] Using correct JOB_NAME env var for JOB_BUILD_RETRIEVAL_TIME (#4398) --- .../_internal/build_management/build_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/build_management/build_manager.py b/src/clusterfuzz/_internal/build_management/build_manager.py index 9a7c38c847..8cd992b3a0 100644 --- a/src/clusterfuzz/_internal/build_management/build_manager.py +++ b/src/clusterfuzz/_internal/build_management/build_manager.py @@ -440,7 +440,7 @@ def _download_and_open_build_archive(self, base_build_dir: str, build_download_duration = time.time() - start_time monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add( build_download_duration, { - 'job': os.getenv('JOB_TYPE'), + 'job': os.getenv('JOB_NAME'), 'platform': environment.platform(), 'step': 'download', 'build_type': self._build_type, @@ -556,7 +556,7 @@ def _unpack_build(self, monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add( unpack_elapsed_time, { - 'job': os.getenv('JOB_TYPE'), + 'job': os.getenv('JOB_NAME'), 'platform': environment.platform(), 'step': 'unpack', 'build_type': self._build_type, @@ -907,7 +907,7 @@ def _unpack_custom_build(self): build_download_time = time.time() - download_start_time monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add( build_download_time, { - 'job': os.getenv('JOB_TYPE'), + 'job': os.getenv('JOB_NAME'), 'platform': environment.platform(), 'step': 'download', 'build_type': self._build_type, @@ -935,7 +935,7 @@ def _unpack_custom_build(self): build_unpack_time = time.time() - unpack_start_time monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add( build_unpack_time, { - 'job': os.getenv('JOB_TYPE'), + 'job': os.getenv('JOB_NAME'), 'platform': environment.platform(), 'step': 'unpack', 'build_type': self._build_type, From 53ca701494493c783c13781e0c5becd21a610272 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Tue, 12 Nov 2024 12:13:27 -0500 Subject: [PATCH 6/7] fix typo --- src/clusterfuzz/_internal/google_cloud_utils/batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 8af1b1bbab..970df5c7c1 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -169,7 +169,7 @@ def _get_task_spec(batch_workload_spec): task_spec = batch.TaskSpec() task_spec.runnables = [runnable] task_spec.max_retry_count = RETRY_COUNT - task_spec.max_run_duration = batch_workload_spec.max_duration + task_spec.max_run_duration = batch_workload_spec.max_run_duration return task_spec From 082b0089023d1726f8d89512c2349c3a4da93f05 Mon Sep 17 00:00:00 2001 From: jonathanmetzman <31354670+jonathanmetzman@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:58:42 -0500 Subject: [PATCH 7/7] Log uworker input and output URLs. (#4401) This will help debug. Because these are not signed, one needs permissions to use these. --- src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py b/src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py index 2811a507d4..08a557ef18 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py @@ -102,11 +102,13 @@ def serialize_and_upload_uworker_input( uworker_input: uworker_msg_pb2.Input) -> Tuple[str, str]: # pylint: disable=no-member """Serializes input for the untrusted portion of a task.""" signed_input_download_url, input_gcs_url = get_uworker_input_urls() + logs.info(f'input_gcs_url: {input_gcs_url}') # Get URLs for the uworker'ps output. We need a signed upload URL so it can # write its output. Also get a download URL in case the caller wants to read # the output. signed_output_upload_url, output_gcs_url = get_uworker_output_urls( input_gcs_url) + logs.info(f'output_gcs_url: {output_gcs_url}') assert not uworker_input.HasField('uworker_output_upload_url') uworker_input.uworker_output_upload_url = signed_output_upload_url