From 34f287c9e9cc1fa88ba9350726eb730bd1162255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Mon, 6 Feb 2023 13:43:26 +0100 Subject: [PATCH 1/8] feat: Add integration tests for storages and create proxy configuration --- setup.py | 2 +- .../test_actor_create_proxy_configuration.py | 60 +++++++++++++ tests/integration/test_actor_dataset.py | 40 +++++++++ .../integration/test_actor_key_value_store.py | 88 +++++++++++++++++++ tests/integration/test_actor_request_queue.py | 18 ++++ 5 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_actor_create_proxy_configuration.py create mode 100644 tests/integration/test_actor_dataset.py create mode 100644 tests/integration/test_actor_key_value_store.py create mode 100644 tests/integration/test_actor_request_queue.py diff --git a/setup.py b/setup.py index b9d6ddc2..a05ef614 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ package_data={'apify': ['py.typed']}, python_requires='>=3.8', install_requires=[ - 'apify-client ~= 0.7.0b39', + 'apify-client ~= 0.7.0b46', 'httpx ~= 0.23.0', 'psutil ~= 5.9.4', 'pydantic ~= 1.10.2', diff --git a/tests/integration/test_actor_create_proxy_configuration.py b/tests/integration/test_actor_create_proxy_configuration.py new file mode 100644 index 00000000..8c856d02 --- /dev/null +++ b/tests/integration/test_actor_create_proxy_configuration.py @@ -0,0 +1,60 @@ + + +from apify import Actor + +from .conftest import ActorFactory + + +class TestActorCreateProxyConfiguration: + + async def test_create_proxy_configuration_basic(self, make_actor: ActorFactory) -> None: + async def main() -> None: + groups = ['SHADER'] + country_code = 'US' + + async with Actor: + proxy_configuration = await Actor.create_proxy_configuration( + groups=groups, + country_code=country_code, + ) + + assert proxy_configuration is not None + assert proxy_configuration._groups == groups + assert proxy_configuration._password is not None + assert proxy_configuration._country_code == country_code + + actor = await make_actor('proxy-configuration', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + + async def test_create_proxy_configuration_complex(self, make_actor: ActorFactory) -> None: + async def main() -> None: + await Actor.init() + + proxy_url_suffix = f'{Actor.config.proxy_password}@{Actor.config.proxy_hostname}:{Actor.config.proxy_port}' + + proxy_configuration = await Actor.create_proxy_configuration(actor_proxy_input={ + 'useApifyProxy': True, + }) + assert proxy_configuration is not None + assert await proxy_configuration.new_url() == f'http://auto:{proxy_url_suffix}' + + groups = ['SHADER', 'BUYPROXIES94952'] + country_code = 'US' + proxy_configuration = await Actor.create_proxy_configuration(actor_proxy_input={ + 'useApifyProxy': True, + 'apifyProxyGroups': groups, + 'apifyProxyCountry': country_code, + }) + assert proxy_configuration is not None + assert await proxy_configuration.new_url() == f'http://groups-{"+".join(groups)},country-{country_code}:{proxy_url_suffix}' + + await Actor.exit() + + actor = await make_actor('proxy-configuration', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' diff --git a/tests/integration/test_actor_dataset.py b/tests/integration/test_actor_dataset.py new file mode 100644 index 00000000..1eb2fc36 --- /dev/null +++ b/tests/integration/test_actor_dataset.py @@ -0,0 +1,40 @@ + +from apify import Actor + +from .conftest import ActorFactory + + +class TestActorPushData: + + async def test_push_data(self, make_actor: ActorFactory) -> None: + desired_item_count = 100 # Also change inside main() if you're changing this + + async def main() -> None: + desired_item_count = 100 + async with Actor: + await Actor.push_data([{'id': i} for i in range(desired_item_count)]) + + actor = await make_actor('push-data', main_func=main) + + run_result = await actor.call() + + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + list_page = await actor.last_run().dataset().list_items(limit=desired_item_count) + assert list_page.items[0]['id'] == 0 + assert list_page.items[-1]['id'] == desired_item_count - 1 + + +class TestActorOpenDataset: + async def test_same_references(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + dataset1 = await Actor.open_dataset() + dataset2 = await Actor.open_dataset() + assert dataset1 is dataset2 + + actor = await make_actor('dataset-same-references', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' diff --git a/tests/integration/test_actor_key_value_store.py b/tests/integration/test_actor_key_value_store.py new file mode 100644 index 00000000..635c95c3 --- /dev/null +++ b/tests/integration/test_actor_key_value_store.py @@ -0,0 +1,88 @@ +from apify import Actor + +from .conftest import ActorFactory + + +class TestActorOpenKeyValueStore: + async def test_same_references(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + kvs1 = await Actor.open_key_value_store() + kvs2 = await Actor.open_key_value_store() + assert kvs1 is kvs2 + + actor = await make_actor('kvs-same-references', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + + +class TestActorGetSetValue: + async def test_actor_get_set_value_simple(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + await Actor.set_value('test', {'number': 123, 'string': 'a string', 'nested': {'test': 1}}) + value = await Actor.get_value('test') + assert value['number'] == 123 + assert value['string'] == 'a string' + assert value['nested']['test'] == 1 + + actor = await make_actor('actor-get-set-value', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + + async def test_actor_get_set_value_complex(self, make_actor: ActorFactory) -> None: + async def main_set() -> None: + async with Actor: + await Actor.set_value('test', {'number': 123, 'string': 'a string', 'nested': {'test': 1}}) + + actor_set = await make_actor('actor-set-value', main_func=main_set) + + run_result_set = await actor_set.call() + assert run_result_set is not None + assert run_result_set['status'] == 'SUCCEEDED' + # Externally check if the value is present in key-value store + test_record = await actor_set.last_run().key_value_store().get_record('test') + assert test_record is not None + test_value = test_record['value'] + assert test_value['number'] == 123 + assert test_value['string'] == 'a string' + assert test_value['nested']['test'] == 1 + + async def main_get() -> None: + async with Actor: + input_object = await Actor.get_input() + # Access KVS of the previous 'set' run + kvs = await Actor.open_key_value_store(input_object['kvs-id']) + value = await kvs.get_value('test') + assert value['number'] == 123 + assert value['string'] == 'a string' + assert value['nested']['test'] == 1 + + actor_get = await make_actor('actor-get-value', main_func=main_get) + default_kvs_info = await actor_set.last_run().key_value_store().get() + assert default_kvs_info is not None + + run_result_get = await actor_get.call(run_input={'kvs-id': default_kvs_info['id']}) + assert run_result_get is not None + assert run_result_get['status'] == 'SUCCEEDED' + + +class TestActorGetInput: + async def test_actor_get_input(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + input_object = await Actor.get_input() + assert input_object is not None + assert input_object['number'] == 123 + assert input_object['string'] == 'a string' + assert input_object['nested']['test'] == 1 + + actor = await make_actor('actor-get-input', main_func=main) + + run_result = await actor.call(run_input={'number': 123, 'string': 'a string', 'nested': {'test': 1}}) + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py new file mode 100644 index 00000000..60d16a2f --- /dev/null +++ b/tests/integration/test_actor_request_queue.py @@ -0,0 +1,18 @@ +from apify import Actor + +from .conftest import ActorFactory + + +class TestActorOpenRequestQueue: + async def test_same_references(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + rq1 = await Actor.open_request_queue() + rq2 = await Actor.open_request_queue() + assert rq1 is rq2 + + actor = await make_actor('rq-same-references', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' From 0bcd90acc1d4f9625c93d64caea4c30991079880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 10:10:05 +0100 Subject: [PATCH 2/8] fix newlines that don't get fixed by `make format` --- tests/integration/test_actor_create_proxy_configuration.py | 2 -- tests/integration/test_actor_dataset.py | 1 - 2 files changed, 3 deletions(-) diff --git a/tests/integration/test_actor_create_proxy_configuration.py b/tests/integration/test_actor_create_proxy_configuration.py index 8c856d02..6c78baf6 100644 --- a/tests/integration/test_actor_create_proxy_configuration.py +++ b/tests/integration/test_actor_create_proxy_configuration.py @@ -1,5 +1,3 @@ - - from apify import Actor from .conftest import ActorFactory diff --git a/tests/integration/test_actor_dataset.py b/tests/integration/test_actor_dataset.py index 1eb2fc36..6f1d6f78 100644 --- a/tests/integration/test_actor_dataset.py +++ b/tests/integration/test_actor_dataset.py @@ -1,4 +1,3 @@ - from apify import Actor from .conftest import ActorFactory From 544760d85c93d833b67e41ec84ea216171ede68a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 10:55:02 +0100 Subject: [PATCH 3/8] also test secret input, fix docstring --- src/apify/_crypto.py | 4 +- .../integration/test_actor_key_value_store.py | 51 +++++++++++++++---- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/apify/_crypto.py b/src/apify/_crypto.py index babfed5c..4f6c0059 100644 --- a/src/apify/_crypto.py +++ b/src/apify/_crypto.py @@ -21,8 +21,8 @@ def public_encrypt(value: str, *, public_key: rsa.RSAPublicKey) -> dict: It returns the encrypted password and encrypted value in BASE64 format. Args: - value (str): Password used to encrypt the private key encoded as base64 string. - public_key (RSAPublicKey): Private key to use for decryption. + value (str): The value which should be encrypted. + public_key (RSAPublicKey): Public key to use for encryption. Returns: disc: Encrypted password and value. diff --git a/tests/integration/test_actor_key_value_store.py b/tests/integration/test_actor_key_value_store.py index 635c95c3..15aae050 100644 --- a/tests/integration/test_actor_key_value_store.py +++ b/tests/integration/test_actor_key_value_store.py @@ -73,16 +73,45 @@ async def main_get() -> None: class TestActorGetInput: async def test_actor_get_input(self, make_actor: ActorFactory) -> None: - async def main() -> None: - async with Actor: - input_object = await Actor.get_input() - assert input_object is not None - assert input_object['number'] == 123 - assert input_object['string'] == 'a string' - assert input_object['nested']['test'] == 1 - - actor = await make_actor('actor-get-input', main_func=main) - - run_result = await actor.call(run_input={'number': 123, 'string': 'a string', 'nested': {'test': 1}}) + actor_source_files = { + 'INPUT_SCHEMA.json': """ + { + "title": "Actor get input test", + "type": "object", + "schemaVersion": 1, + "properties": { + "password": { + "title": "Password", + "type": "string", + "description": "A secret, encrypted input field", + "editor": "textfield", + "isSecret": true + } + }, + "required": ["password"] + } + """, + 'src/main.py': """ + import asyncio + from apify import Actor + + async def main(): + async with Actor: + input_object = await Actor.get_input() + assert input_object is not None + assert input_object['number'] == 123 + assert input_object['string'] == 'a string' + assert input_object['nested']['test'] == 1 + assert input_object['password'] == 'very secret' + """, + } + actor = await make_actor('actor-get-input', source_files=actor_source_files) + + run_result = await actor.call(run_input={ + 'number': 123, + 'string': 'a string', + 'nested': {'test': 1}, + 'password': 'very secret', + }) assert run_result is not None assert run_result['status'] == 'SUCCEEDED' From 1d9931b092c9b17e7ab52e41e950b3e2036b4529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 10:57:37 +0100 Subject: [PATCH 4/8] improve push_data checks --- tests/integration/test_actor_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_actor_dataset.py b/tests/integration/test_actor_dataset.py index 6f1d6f78..5f30f039 100644 --- a/tests/integration/test_actor_dataset.py +++ b/tests/integration/test_actor_dataset.py @@ -19,9 +19,10 @@ async def main() -> None: assert run_result is not None assert run_result['status'] == 'SUCCEEDED' - list_page = await actor.last_run().dataset().list_items(limit=desired_item_count) + list_page = await actor.last_run().dataset().list_items() assert list_page.items[0]['id'] == 0 assert list_page.items[-1]['id'] == desired_item_count - 1 + assert len(list_page.items) == list_page.count == desired_item_count class TestActorOpenDataset: From dfad7be8748f34a95567601c373163c9b6e6d2c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 11:49:55 +0100 Subject: [PATCH 5/8] also test named storages --- tests/integration/test_actor_dataset.py | 23 +++++++++++++++++-- .../integration/test_actor_key_value_store.py | 23 +++++++++++++++++-- tests/integration/test_actor_request_queue.py | 23 +++++++++++++++++-- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_actor_dataset.py b/tests/integration/test_actor_dataset.py index 5f30f039..b17bfe99 100644 --- a/tests/integration/test_actor_dataset.py +++ b/tests/integration/test_actor_dataset.py @@ -1,5 +1,6 @@ from apify import Actor +from ._utils import generate_unique_resource_name from .conftest import ActorFactory @@ -26,15 +27,33 @@ async def main() -> None: class TestActorOpenDataset: - async def test_same_references(self, make_actor: ActorFactory) -> None: + async def test_same_references_default(self, make_actor: ActorFactory) -> None: async def main() -> None: async with Actor: dataset1 = await Actor.open_dataset() dataset2 = await Actor.open_dataset() assert dataset1 is dataset2 - actor = await make_actor('dataset-same-references', main_func=main) + actor = await make_actor('dataset-same-ref-default', main_func=main) run_result = await actor.call() assert run_result is not None assert run_result['status'] == 'SUCCEEDED' + + async def test_same_references_named(self, make_actor: ActorFactory) -> None: + dataset_name = generate_unique_resource_name('dataset') + + async def main() -> None: + async with Actor: + input_object = await Actor.get_input() + dataset_name = input_object['datasetName'] + dataset1 = await Actor.open_dataset(dataset_name) + dataset2 = await Actor.open_dataset(dataset_name) + assert dataset1 is dataset2 + await dataset1.drop() + + actor = await make_actor('dataset-same-ref-named', main_func=main) + + run_result = await actor.call(run_input={'datasetName': dataset_name}) + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' diff --git a/tests/integration/test_actor_key_value_store.py b/tests/integration/test_actor_key_value_store.py index 15aae050..d4838c88 100644 --- a/tests/integration/test_actor_key_value_store.py +++ b/tests/integration/test_actor_key_value_store.py @@ -1,22 +1,41 @@ from apify import Actor +from ._utils import generate_unique_resource_name from .conftest import ActorFactory class TestActorOpenKeyValueStore: - async def test_same_references(self, make_actor: ActorFactory) -> None: + async def test_same_references_default(self, make_actor: ActorFactory) -> None: async def main() -> None: async with Actor: kvs1 = await Actor.open_key_value_store() kvs2 = await Actor.open_key_value_store() assert kvs1 is kvs2 - actor = await make_actor('kvs-same-references', main_func=main) + actor = await make_actor('kvs-same-ref-default', main_func=main) run_result = await actor.call() assert run_result is not None assert run_result['status'] == 'SUCCEEDED' + async def test_same_references_named(self, make_actor: ActorFactory) -> None: + kvs_name = generate_unique_resource_name('key-value-store') + + async def main() -> None: + async with Actor: + input_object = await Actor.get_input() + kvs_name = input_object['kvsName'] + kvs1 = await Actor.open_key_value_store(kvs_name) + kvs2 = await Actor.open_key_value_store(kvs_name) + assert kvs1 is kvs2 + await kvs1.drop() + + actor = await make_actor('kvs-same-ref-named', main_func=main) + + run_result = await actor.call(run_input={'kvsName': kvs_name}) + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + class TestActorGetSetValue: async def test_actor_get_set_value_simple(self, make_actor: ActorFactory) -> None: diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 60d16a2f..3c609866 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -1,18 +1,37 @@ from apify import Actor +from ._utils import generate_unique_resource_name from .conftest import ActorFactory class TestActorOpenRequestQueue: - async def test_same_references(self, make_actor: ActorFactory) -> None: + async def test_same_references_default(self, make_actor: ActorFactory) -> None: async def main() -> None: async with Actor: rq1 = await Actor.open_request_queue() rq2 = await Actor.open_request_queue() assert rq1 is rq2 - actor = await make_actor('rq-same-references', main_func=main) + actor = await make_actor('rq-same-ref-default', main_func=main) run_result = await actor.call() assert run_result is not None assert run_result['status'] == 'SUCCEEDED' + + async def test_same_references_named(self, make_actor: ActorFactory) -> None: + rq_name = generate_unique_resource_name('request-queue') + + async def main() -> None: + async with Actor: + input_object = await Actor.get_input() + rq_name = input_object['rqName'] + rq1 = await Actor.open_request_queue(rq_name) + rq2 = await Actor.open_request_queue(rq_name) + assert rq1 is rq2 + await rq1.drop() + + actor = await make_actor('rq-same-ref-named', main_func=main) + + run_result = await actor.call(run_input={'rqName': rq_name}) + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' From f8207633d8df0fbcd0b25dcb4e97c2875ba7ba6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 13:34:31 +0100 Subject: [PATCH 6/8] add rq test, fix some inconsistencies wrt to datetime handling --- src/apify/storages/request_queue.py | 34 +++++++++++---------- tests/integration/test_request_queue.py | 37 +++++++++++++++++++++++ tests/unit/storages/test_request_queue.py | 4 +-- 3 files changed, 57 insertions(+), 18 deletions(-) create mode 100644 tests/integration/test_request_queue.py diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 7e4440f2..b7526e87 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -2,7 +2,7 @@ import json import logging from collections import OrderedDict -from datetime import datetime +from datetime import datetime, timezone from typing import Coroutine, Dict, Optional from typing import OrderedDict as OrderedDictType from typing import Set, Union @@ -110,7 +110,7 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._queue_head_dict = OrderedDict() self._query_queue_head_promise = None self._in_progress = set() - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) self._recently_handled = LRUCache[bool](max_length=RECENTLY_HANDLED_CACHE_SIZE) self._requests_cache = LRUCache(max_length=MAX_CACHED_REQUESTS) @@ -140,7 +140,7 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: _budget_ow(request, { 'url': (str, True), }) - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) if request.get('uniqueKey') is None: request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic... @@ -215,7 +215,7 @@ async def fetch_next_request(self) -> Optional[Dict]: })}""") return None self._in_progress.add(next_request_id) - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) try: request = await self.get_request(next_request_id) @@ -266,12 +266,12 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: 'uniqueKey': (str, True), 'handledAt': (datetime, False), }) - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) if request['id'] not in self._in_progress: logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!') return None - request['handledAt'] = request.get('handledAt', datetime.utcnow()) + request['handledAt'] = request.get('handledAt', datetime.utcnow().replace(tzinfo=timezone.utc)) queue_operation_info = await self._client.update_request({**request}) queue_operation_info['uniqueKey'] = request['uniqueKey'] @@ -302,7 +302,7 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio 'id': (str, True), 'uniqueKey': (str, True), }) - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) if request['id'] not in self._in_progress: logging.debug(f'Cannot reclaim request {request["id"]}, because it is not in progress!') @@ -352,7 +352,8 @@ async def is_finished(self) -> bool: Returns: bool: `True` if all requests were already handled and there are no more left. `False` otherwise. """ - if self._in_progress_count() > 0 and (datetime.utcnow() - self._last_activity).seconds > self._internal_timeout_seconds: + seconds_since_last_activity = (datetime.utcnow().replace(tzinfo=timezone.utc) - self._last_activity).seconds + if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout_seconds: message = f'The request queue seems to be stuck for {self._internal_timeout_seconds}s, resetting internal state.' logging.warning(message) self._reset() @@ -371,7 +372,7 @@ def _reset(self) -> None: self._assumed_total_count = 0 self._assumed_handled_count = 0 self._requests_cache.clear() - self._last_activity = datetime.utcnow() + self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None: self._requests_cache[cache_key] = { @@ -382,7 +383,7 @@ def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None: } async def _queue_query_head(self, limit: int) -> Dict: - query_started_at = datetime.utcnow() + query_started_at = datetime.utcnow().replace(tzinfo=timezone.utc) list_head = await self._client.list_head(limit=limit) for request in list_head['items']: @@ -391,10 +392,10 @@ async def _queue_query_head(self, limit: int) -> Dict: continue self._queue_head_dict[request['id']] = request['id'] self._cache_request(_unique_key_to_request_id(request['uniqueKey']), { - 'request_id': request['id'], - 'was_already_handled': False, - 'was_already_present': True, - 'unique_key': request['uniqueKey'], + 'requestId': request['id'], + 'wasAlreadyHandled': False, + 'wasAlreadyPresent': True, + 'uniqueKey': request['uniqueKey'], }) # This is needed so that the next call to _ensureHeadIsNonEmpty() will fetch the queue head again. @@ -440,7 +441,7 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi # If ensureConsistency=true then we must ensure that either: # - queueModifiedAt is older than queryStartedAt by at least API_PROCESSED_REQUESTS_DELAY_MILLIS # - hadMultipleClients=false and this.assumedTotalCount<=this.assumedHandledCount - is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt'] + is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt'].replace(tzinfo=timezone.utc) ).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) is_locally_consistent = not queue_head['hadMultipleClients'] and self._assumed_total_count <= self._assumed_handled_count # Consistent information from one source is enough to consider request queue finished. @@ -459,7 +460,8 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi # If we are repeating for consistency then wait required time. if should_repeat_for_consistency: - delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - (datetime.utcnow() - queue_head['queueModifiedAt']).seconds + delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - \ + (datetime.utcnow().replace(tzinfo=timezone.utc) - queue_head['queueModifiedAt']).seconds logging.info(f'Waiting for {delay_seconds}s before considering the queue as finished to ensure that the data is consistent.') await asyncio.sleep(delay_seconds) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py new file mode 100644 index 00000000..6ada3ea8 --- /dev/null +++ b/tests/integration/test_request_queue.py @@ -0,0 +1,37 @@ +import pytest + +from apify import Actor + +from .conftest import ActorFactory + + +@pytest.mark.only +class TestRequestQueue: + async def test_simple(self, make_actor: ActorFactory) -> None: + async def main() -> None: + import random + async with Actor: + desired_request_count = 100 + rq = await Actor.open_request_queue() + # Add some requests + for i in range(desired_request_count): + await rq.add_request({ + 'url': f'https://example.com/{i}', + }) + + handled_request_count = 0 + while next_request := await rq.fetch_next_request(): + queue_operation_info = await rq.mark_request_as_handled(next_request) + assert queue_operation_info is not None + assert queue_operation_info['wasAlreadyHandled'] is False + handled_request_count += 1 + + assert handled_request_count == desired_request_count + is_finished = await rq.is_finished() + assert is_finished is True + + actor = await make_actor('rq-simple-test', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index b59185af..7f9f12d5 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -1,5 +1,5 @@ import asyncio -from datetime import datetime +from datetime import datetime, timezone import pytest @@ -55,7 +55,7 @@ async def test_add_fetch_handle_request(request_queue: RequestQueue) -> None: next = await request_queue.fetch_next_request() assert next is not None # Mark it as handled - next['handledAt'] = datetime.utcnow() + next['handledAt'] = datetime.utcnow().replace(tzinfo=timezone.utc) queue_operation_info = await request_queue.mark_request_as_handled(next) assert queue_operation_info is not None assert queue_operation_info['uniqueKey'] == url From 4cd203b36b3a2b658422323cfd96389286819252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 13:34:56 +0100 Subject: [PATCH 7/8] fix --- tests/integration/test_request_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 6ada3ea8..206266d2 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -9,7 +9,6 @@ class TestRequestQueue: async def test_simple(self, make_actor: ActorFactory) -> None: async def main() -> None: - import random async with Actor: desired_request_count = 100 rq = await Actor.open_request_queue() From e7e0729df40460ee3b0523da9322e42df3bf668b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 8 Feb 2023 15:28:46 +0100 Subject: [PATCH 8/8] simplify datetime now utc --- src/apify/storages/request_queue.py | 20 ++++++++++---------- tests/integration/test_fixtures.py | 4 ++-- tests/integration/test_request_queue.py | 3 --- tests/unit/actor/test_actor_env_helpers.py | 2 +- tests/unit/storages/test_request_queue.py | 2 +- 5 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index b7526e87..d1bf05a4 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -110,7 +110,7 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._queue_head_dict = OrderedDict() self._query_queue_head_promise = None self._in_progress = set() - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) self._recently_handled = LRUCache[bool](max_length=RECENTLY_HANDLED_CACHE_SIZE) self._requests_cache = LRUCache(max_length=MAX_CACHED_REQUESTS) @@ -140,7 +140,7 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: _budget_ow(request, { 'url': (str, True), }) - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) if request.get('uniqueKey') is None: request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic... @@ -215,7 +215,7 @@ async def fetch_next_request(self) -> Optional[Dict]: })}""") return None self._in_progress.add(next_request_id) - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) try: request = await self.get_request(next_request_id) @@ -266,12 +266,12 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: 'uniqueKey': (str, True), 'handledAt': (datetime, False), }) - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) if request['id'] not in self._in_progress: logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!') return None - request['handledAt'] = request.get('handledAt', datetime.utcnow().replace(tzinfo=timezone.utc)) + request['handledAt'] = request.get('handledAt', datetime.now(timezone.utc)) queue_operation_info = await self._client.update_request({**request}) queue_operation_info['uniqueKey'] = request['uniqueKey'] @@ -302,7 +302,7 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio 'id': (str, True), 'uniqueKey': (str, True), }) - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) if request['id'] not in self._in_progress: logging.debug(f'Cannot reclaim request {request["id"]}, because it is not in progress!') @@ -352,7 +352,7 @@ async def is_finished(self) -> bool: Returns: bool: `True` if all requests were already handled and there are no more left. `False` otherwise. """ - seconds_since_last_activity = (datetime.utcnow().replace(tzinfo=timezone.utc) - self._last_activity).seconds + seconds_since_last_activity = (datetime.now(timezone.utc) - self._last_activity).seconds if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout_seconds: message = f'The request queue seems to be stuck for {self._internal_timeout_seconds}s, resetting internal state.' logging.warning(message) @@ -372,7 +372,7 @@ def _reset(self) -> None: self._assumed_total_count = 0 self._assumed_handled_count = 0 self._requests_cache.clear() - self._last_activity = datetime.utcnow().replace(tzinfo=timezone.utc) + self._last_activity = datetime.now(timezone.utc) def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None: self._requests_cache[cache_key] = { @@ -383,7 +383,7 @@ def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None: } async def _queue_query_head(self, limit: int) -> Dict: - query_started_at = datetime.utcnow().replace(tzinfo=timezone.utc) + query_started_at = datetime.now(timezone.utc) list_head = await self._client.list_head(limit=limit) for request in list_head['items']: @@ -461,7 +461,7 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi # If we are repeating for consistency then wait required time. if should_repeat_for_consistency: delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - \ - (datetime.utcnow().replace(tzinfo=timezone.utc) - queue_head['queueModifiedAt']).seconds + (datetime.now(timezone.utc) - queue_head['queueModifiedAt']).seconds logging.info(f'Waiting for {delay_seconds}s before considering the queue as finished to ensure that the data is consistent.') await asyncio.sleep(delay_seconds) diff --git a/tests/integration/test_fixtures.py b/tests/integration/test_fixtures.py index c223b603..ebb4cf75 100644 --- a/tests/integration/test_fixtures.py +++ b/tests/integration/test_fixtures.py @@ -49,7 +49,7 @@ async def main(): assert output_record['value'] == expected_output async def test_source_files(self, make_actor: ActorFactory) -> None: - test_started_at = datetime.utcnow().replace(tzinfo=timezone.utc) + test_started_at = datetime.now(timezone.utc) actor_source_files = { 'src/utils.py': """ from datetime import datetime @@ -79,7 +79,7 @@ async def main(): output_datetime = datetime.fromisoformat(output_record['value']).replace(tzinfo=timezone.utc) assert output_datetime > test_started_at - assert output_datetime < datetime.utcnow().replace(tzinfo=timezone.utc) + assert output_datetime < datetime.now(timezone.utc) class TestApifyClientAsyncFixture: diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 206266d2..3f239ce2 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -1,11 +1,8 @@ -import pytest - from apify import Actor from .conftest import ActorFactory -@pytest.mark.only class TestRequestQueue: async def test_simple(self, make_actor: ActorFactory) -> None: async def main() -> None: diff --git a/tests/unit/actor/test_actor_env_helpers.py b/tests/unit/actor/test_actor_env_helpers.py index 50139768..2df50829 100644 --- a/tests/unit/actor/test_actor_env_helpers.py +++ b/tests/unit/actor/test_actor_env_helpers.py @@ -38,7 +38,7 @@ async def test_get_env_use_env_vars(self, monkeypatch: pytest.MonkeyPatch) -> No for datetime_env_var in DATETIME_ENV_VARS: datetime_get_env_var = datetime_env_var.name.lower() - expected_get_env[datetime_get_env_var] = datetime.now().replace(tzinfo=timezone.utc) + expected_get_env[datetime_get_env_var] = datetime.now(timezone.utc) monkeypatch.setenv(datetime_env_var, expected_get_env[datetime_get_env_var].strftime('%Y-%m-%dT%H:%M:%S.%fZ')) for string_env_var in STRING_ENV_VARS: diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 7f9f12d5..c232846e 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -55,7 +55,7 @@ async def test_add_fetch_handle_request(request_queue: RequestQueue) -> None: next = await request_queue.fetch_next_request() assert next is not None # Mark it as handled - next['handledAt'] = datetime.utcnow().replace(tzinfo=timezone.utc) + next['handledAt'] = datetime.now(timezone.utc) queue_operation_info = await request_queue.mark_request_as_handled(next) assert queue_operation_info is not None assert queue_operation_info['uniqueKey'] == url