Skip to content

Commit

Permalink
Merge branch 'master' into feat/upload-time
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorguidi authored Nov 13, 2024
2 parents f81b23d + 082b008 commit 83a5d04
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def is_done_collecting_messages():
def get_postprocess_task():
"""Gets a postprocess task if one exists."""
# This should only be run on non-preemptible bots.
if not (task_utils.is_remotely_executing_utasks() or
if not (task_utils.is_remotely_executing_utasks() and
task_utils.get_opted_in_tasks()):
return None
# Postprocess is platform-agnostic, so we run all such tasks on our
Expand Down
5 changes: 3 additions & 2 deletions src/clusterfuzz/_internal/base/tasks/task_rate_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class TaskRateLimiter:
"""Rate limiter for tasks. This limits tasks to 100 erroneous runs or 2000
succesful runs in 6 hours. It keeps track of task completion when record_task
is called at the end of every task."""
TASK_RATE_LIMIT_WINDOW = datetime.timedelta(hours=6)
TASK_RATE_LIMIT_MAX_ERRORS = 100
# TODO(metzman): Reevaluate this number, it's probably too high.
TASK_RATE_LIMIT_MAX_COMPLETIONS = 2000
Expand Down Expand Up @@ -74,7 +73,9 @@ def is_rate_limited(self) -> bool:
if environment.get_value('COMMAND_OVERRIDE'):
# A user wants to run this task.
return False
window_start = _get_datetime_now() - self.TASK_RATE_LIMIT_WINDOW
window_start = (
_get_datetime_now() -
data_types.WindowRateLimitTask.TASK_RATE_LIMIT_WINDOW)
query = data_types.WindowRateLimitTask.query(
data_types.WindowRateLimitTask.task_name == self.task_name,
data_types.WindowRateLimitTask.task_argument == self.task_argument,
Expand Down
6 changes: 6 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from clusterfuzz._internal.datastore import data_handler
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.metrics import monitoring_metrics
from clusterfuzz._internal.system import environment
from clusterfuzz._internal.system import process_handler
from clusterfuzz._internal.system import shell
Expand Down Expand Up @@ -211,6 +212,11 @@ def run_command(task_name, task_argument, job_name, uworker_env):
rate_limiter = task_rate_limiting.TaskRateLimiter(task_name, task_argument,
job_name)
if rate_limiter.is_rate_limited():
monitoring_metrics.TASK_RATE_LIMIT_COUNT.increment(labels={
'job': job_name,
'task': task_name,
'argument': task_argument,
})
logs.error(f'Rate limited task: {task_name} {task_argument} {job_name}')
if task_name == 'fuzz':
# Wait 10 seconds. We don't want to try again immediately because if we
Expand Down
2 changes: 2 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ def serialize_and_upload_uworker_input(
uworker_input: uworker_msg_pb2.Input) -> Tuple[str, str]: # pylint: disable=no-member
"""Serializes input for the untrusted portion of a task."""
signed_input_download_url, input_gcs_url = get_uworker_input_urls()
logs.info(f'input_gcs_url: {input_gcs_url}')
# Get URLs for the uworker'ps output. We need a signed upload URL so it can
# write its output. Also get a download URL in case the caller wants to read
# the output.
signed_output_upload_url, output_gcs_url = get_uworker_output_urls(
input_gcs_url)
logs.info(f'output_gcs_url: {output_gcs_url}')

assert not uworker_input.HasField('uworker_output_upload_url')
uworker_input.uworker_output_upload_url = signed_output_upload_url
Expand Down
8 changes: 4 additions & 4 deletions src/clusterfuzz/_internal/build_management/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def _download_and_open_build_archive(self, base_build_dir: str,
build_download_duration = time.time() - start_time
monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add(
build_download_duration, {
'job': os.getenv('JOB_TYPE'),
'job': os.getenv('JOB_NAME'),
'platform': environment.platform(),
'step': 'download',
'build_type': self._build_type,
Expand Down Expand Up @@ -556,7 +556,7 @@ def _unpack_build(self,

monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add(
unpack_elapsed_time, {
'job': os.getenv('JOB_TYPE'),
'job': os.getenv('JOB_NAME'),
'platform': environment.platform(),
'step': 'unpack',
'build_type': self._build_type,
Expand Down Expand Up @@ -907,7 +907,7 @@ def _unpack_custom_build(self):
build_download_time = time.time() - download_start_time
monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add(
build_download_time, {
'job': os.getenv('JOB_TYPE'),
'job': os.getenv('JOB_NAME'),
'platform': environment.platform(),
'step': 'download',
'build_type': self._build_type,
Expand Down Expand Up @@ -935,7 +935,7 @@ def _unpack_custom_build(self):
build_unpack_time = time.time() - unpack_start_time
monitoring_metrics.JOB_BUILD_RETRIEVAL_TIME.add(
build_unpack_time, {
'job': os.getenv('JOB_TYPE'),
'job': os.getenv('JOB_NAME'),
'platform': environment.platform(),
'step': 'unpack',
'build_type': self._build_type,
Expand Down
11 changes: 10 additions & 1 deletion src/clusterfuzz/_internal/datastore/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Classes for objects stored in the datastore."""

import datetime
import re

from google.cloud import ndb
Expand Down Expand Up @@ -1119,13 +1120,21 @@ class WindowRateLimitTask(Model):
it will have a different lifecycle (it's not needed after the window
completes). This should have a TTL as TASK_RATE_LIMIT_WINDOW in
task_rate_limiting.py (6 hours)."""
# TODO(metzman): Consider using task_id.
TASK_RATE_LIMIT_WINDOW = datetime.timedelta(hours=6)

timestamp = ndb.DateTimeProperty(auto_now_add=True, indexed=True)
# Only use this for TTL. It should only be saved to by ClusterFuzz, not read.
ttl_expiry_timestamp = ndb.DateTimeProperty()
# TODO(metzman): Consider using task_id.
task_name = ndb.StringProperty(indexed=True)
task_argument = ndb.StringProperty(indexed=True)
job_name = ndb.StringProperty(indexed=True)
status = ndb.StringProperty(choices=[TaskState.ERROR, TaskState.FINISHED])

def _pre_put_hook(self):
self.ttl_expiry_timestamp = (
datetime.datetime.now() + self.TASK_RATE_LIMIT_WINDOW)


class BuildMetadata(Model):
"""Metadata associated with a particular archived build."""
Expand Down
33 changes: 25 additions & 8 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud import batch_v1 as batch

from clusterfuzz._internal.base import retry
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.config import local_config
Expand All @@ -33,7 +34,6 @@

_local = threading.local()

MAX_DURATION = f'{60 * 60 * 6}s'
RETRY_COUNT = 0

TASK_BUNCH_SIZE = 20
Expand All @@ -46,9 +46,20 @@
MAX_CONCURRENT_VMS_PER_JOB = 1000

BatchWorkloadSpec = collections.namedtuple('BatchWorkloadSpec', [
'clusterfuzz_release', 'disk_size_gb', 'disk_type', 'docker_image',
'user_data', 'service_account_email', 'subnetwork', 'preemptible',
'project', 'gce_zone', 'machine_type', 'network', 'gce_region'
'clusterfuzz_release',
'disk_size_gb',
'disk_type',
'docker_image',
'user_data',
'service_account_email',
'subnetwork',
'preemptible',
'project',
'gce_zone',
'machine_type',
'network',
'gce_region',
'max_run_duration',
])


Expand Down Expand Up @@ -158,7 +169,7 @@ def _get_task_spec(batch_workload_spec):
task_spec = batch.TaskSpec()
task_spec.runnables = [runnable]
task_spec.max_retry_count = RETRY_COUNT
task_spec.max_run_duration = MAX_DURATION
task_spec.max_run_duration = batch_workload_spec.max_run_duration
return task_spec


Expand Down Expand Up @@ -219,8 +230,7 @@ def _create_job(spec, input_urls):
create_request.job_id = job_name
# The job's parent is the region in which the job will run
project_id = spec.project
create_request.parent = (
f'projects/{project_id}/locations/{spec.gce_region}')
create_request.parent = f'projects/{project_id}/locations/{spec.gce_region}'
job_result = _send_create_job_request(create_request)
logs.info(f'Created batch job id={job_name}.', spec=spec)
return job_result
Expand Down Expand Up @@ -274,6 +284,11 @@ def _get_config_name(command, job_name):
return config_name


def _get_task_duration(command):
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
tasks.TASK_LEASE_SECONDS)


def _get_spec_from_config(command, job_name):
"""Gets the configured specifications for a batch workload."""
config_name = _get_config_name(command, job_name)
Expand All @@ -285,6 +300,7 @@ def _get_spec_from_config(command, job_name):
docker_image = instance_spec['docker_image']
user_data = instance_spec['user_data']
clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod')
max_run_duration = f'{_get_task_duration(command)}s'
spec = BatchWorkloadSpec(
clusterfuzz_release=clusterfuzz_release,
docker_image=docker_image,
Expand All @@ -298,5 +314,6 @@ def _get_spec_from_config(command, job_name):
network=instance_spec['network'],
subnetwork=instance_spec['subnetwork'],
preemptible=instance_spec['preemptible'],
machine_type=instance_spec['machine_type'])
machine_type=instance_spec['machine_type'],
max_run_duration=max_run_duration)
return spec
8 changes: 8 additions & 0 deletions src/clusterfuzz/_internal/metrics/monitoring_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@
monitor.StringField('job'),
],
)
TASK_RATE_LIMIT_COUNT = monitor.CounterMetric(
'task/rate_limit',
description=('Counter for rate limit events.'),
field_spec=[
monitor.StringField('task'),
monitor.StringField('job'),
monitor.StringField('argument'),
])

UTASK_SUBTASK_E2E_DURATION_SECS = monitor.CumulativeDistributionMetric(
'utask/subtask_e2e_duration_secs',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ def setUp(self):
'clusterfuzz._internal.base.persistent_cache.get_value',
'clusterfuzz._internal.base.persistent_cache.set_value',
'clusterfuzz._internal.base.utils.utcnow',
'clusterfuzz._internal.base.tasks.task_utils.get_opted_in_tasks',
'time.sleep',
])

self.mock.get_value.return_value = None
self.mock.sleep.return_value = None
self.mock.get_opted_in_tasks.return_value = False
data_types.Job(name='job').put()

client = pubsub.PubSubClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_is_rate_limited_old_tasks(self):
"""Test is_rate_limited() with old tasks outside the window."""
# Add tasks outside the time window.
window_start = (
self.now - task_rate_limiting.TaskRateLimiter.TASK_RATE_LIMIT_WINDOW)
self.now - data_types.WindowRateLimitTask.TASK_RATE_LIMIT_WINDOW)
self._create_n_tasks(
task_rate_limiting.TaskRateLimiter.TASK_RATE_LIMIT_MAX_COMPLETIONS + 1,
timestamp=window_start - datetime.timedelta(minutes=10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
from clusterfuzz._internal.google_cloud_utils import batch
from clusterfuzz._internal.tests.test_libs import test_utils

# pylint: disable=protected-access


@test_utils.with_cloud_emulators('datastore')
class GetSpecFromConfigTest(unittest.TestCase):
"""Tests for get_spec_from_config."""

def setUp(self):
self.maxDiff = None
self.job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
self.job.put()

def test_nonpreemptible_get_spec_from_config(self):
def test_nonpreemptible(self):
"""Tests that get_spec_from_config works for non-preemptibles as
expected."""
job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
spec = batch._get_spec_from_config('corpus_pruning', job.name) # pylint: disable=protected-access
spec = batch._get_spec_from_config('analyze', self.job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -47,15 +49,14 @@ def test_nonpreemptible_get_spec_from_config(self):
gce_zone='gce-zone',
project='test-clusterfuzz',
preemptible=False,
machine_type='n1-standard-1')
machine_type='n1-standard-1',
max_run_duration='21600s')

self.assertCountEqual(spec, expected_spec)

def test_preemptible_get_spec_from_config(self):
def test_preemptible(self):
"""Tests that get_spec_from_config works for preemptibles as expected."""
job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
spec = batch._get_spec_from_config('fuzz', job.name) # pylint: disable=protected-access
spec = batch._get_spec_from_config('fuzz', self.job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -70,6 +71,21 @@ def test_preemptible_get_spec_from_config(self):
gce_region='gce-region',
project='test-clusterfuzz',
preemptible=True,
machine_type='n1-standard-1')
machine_type='n1-standard-1',
max_run_duration='21600s')

self.assertCountEqual(spec, expected_spec)

def test_corpus_pruning(self):
"""Tests that corpus pruning uses a spec of 24 hours and a different one
than normal."""
pruning_spec = batch._get_spec_from_config('corpus_pruning', self.job.name)
self.assertEqual(pruning_spec.max_run_duration, f'{24 * 60 * 60}s')
normal_spec = batch._get_spec_from_config('analyze', self.job.name)
self.assertNotEqual(pruning_spec, normal_spec)
job = data_types.Job(name='libfuzzer_chrome_msan', platform='LINUX')
job.put()
# This behavior is important for grouping batch alike tasks into a single
# batch job.
pruning_spec2 = batch._get_spec_from_config('corpus_pruning', job.name)
self.assertEqual(pruning_spec, pruning_spec2)

0 comments on commit 83a5d04

Please sign in to comment.