Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add integration tests for storages, proxy configuration #49

Merged
merged 10 commits into from
Feb 8, 2023
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
package_data={'apify': ['py.typed']},
python_requires='>=3.8',
install_requires=[
'apify-client ~= 0.7.0b39',
'apify-client ~= 0.7.0b46',
'httpx ~= 0.23.0',
'psutil ~= 5.9.4',
'pydantic ~= 1.10.2',
Expand Down
4 changes: 2 additions & 2 deletions src/apify/_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def public_encrypt(value: str, *, public_key: rsa.RSAPublicKey) -> dict:
It returns the encrypted password and encrypted value in BASE64 format.

Args:
value (str): Password used to encrypt the private key encoded as base64 string.
public_key (RSAPublicKey): Private key to use for decryption.
value (str): The value which should be encrypted.
public_key (RSAPublicKey): Public key to use for encryption.

Returns:
disc: Encrypted password and value.
Expand Down
34 changes: 18 additions & 16 deletions src/apify/storages/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from collections import OrderedDict
from datetime import datetime
from datetime import datetime, timezone
from typing import Coroutine, Dict, Optional
from typing import OrderedDict as OrderedDictType
from typing import Set, Union
Expand Down Expand Up @@ -110,7 +110,7 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync,
self._queue_head_dict = OrderedDict()
self._query_queue_head_promise = None
self._in_progress = set()
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)
self._recently_handled = LRUCache[bool](max_length=RECENTLY_HANDLED_CACHE_SIZE)
self._requests_cache = LRUCache(max_length=MAX_CACHED_REQUESTS)

Expand Down Expand Up @@ -140,7 +140,7 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
_budget_ow(request, {
'url': (str, True),
})
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)

if request.get('uniqueKey') is None:
request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic...
Expand Down Expand Up @@ -215,7 +215,7 @@ async def fetch_next_request(self) -> Optional[Dict]:
})}""")
return None
self._in_progress.add(next_request_id)
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)

try:
request = await self.get_request(next_request_id)
Expand Down Expand Up @@ -266,12 +266,12 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]:
'uniqueKey': (str, True),
'handledAt': (datetime, False),
})
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)
if request['id'] not in self._in_progress:
logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!')
return None

request['handledAt'] = request.get('handledAt', datetime.utcnow())
request['handledAt'] = request.get('handledAt', datetime.now(timezone.utc))
queue_operation_info = await self._client.update_request({**request})
queue_operation_info['uniqueKey'] = request['uniqueKey']

Expand Down Expand Up @@ -302,7 +302,7 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio
'id': (str, True),
'uniqueKey': (str, True),
})
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)

if request['id'] not in self._in_progress:
logging.debug(f'Cannot reclaim request {request["id"]}, because it is not in progress!')
Expand Down Expand Up @@ -352,7 +352,8 @@ async def is_finished(self) -> bool:
Returns:
bool: `True` if all requests were already handled and there are no more left. `False` otherwise.
"""
if self._in_progress_count() > 0 and (datetime.utcnow() - self._last_activity).seconds > self._internal_timeout_seconds:
seconds_since_last_activity = (datetime.now(timezone.utc) - self._last_activity).seconds
if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout_seconds:
message = f'The request queue seems to be stuck for {self._internal_timeout_seconds}s, resetting internal state.'
logging.warning(message)
self._reset()
Expand All @@ -371,7 +372,7 @@ def _reset(self) -> None:
self._assumed_total_count = 0
self._assumed_handled_count = 0
self._requests_cache.clear()
self._last_activity = datetime.utcnow()
self._last_activity = datetime.now(timezone.utc)

def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None:
self._requests_cache[cache_key] = {
Expand All @@ -382,7 +383,7 @@ def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None:
}

async def _queue_query_head(self, limit: int) -> Dict:
query_started_at = datetime.utcnow()
query_started_at = datetime.now(timezone.utc)

list_head = await self._client.list_head(limit=limit)
for request in list_head['items']:
Expand All @@ -391,10 +392,10 @@ async def _queue_query_head(self, limit: int) -> Dict:
continue
self._queue_head_dict[request['id']] = request['id']
self._cache_request(_unique_key_to_request_id(request['uniqueKey']), {
'request_id': request['id'],
'was_already_handled': False,
'was_already_present': True,
'unique_key': request['uniqueKey'],
'requestId': request['id'],
'wasAlreadyHandled': False,
'wasAlreadyPresent': True,
'uniqueKey': request['uniqueKey'],
})

# This is needed so that the next call to _ensureHeadIsNonEmpty() will fetch the queue head again.
Expand Down Expand Up @@ -440,7 +441,7 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi
# If ensureConsistency=true then we must ensure that either:
# - queueModifiedAt is older than queryStartedAt by at least API_PROCESSED_REQUESTS_DELAY_MILLIS
# - hadMultipleClients=false and this.assumedTotalCount<=this.assumedHandledCount
is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt']
is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt'].replace(tzinfo=timezone.utc)
).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000)
is_locally_consistent = not queue_head['hadMultipleClients'] and self._assumed_total_count <= self._assumed_handled_count
# Consistent information from one source is enough to consider request queue finished.
Expand All @@ -459,7 +460,8 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi

# If we are repeating for consistency then wait required time.
if should_repeat_for_consistency:
delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - (datetime.utcnow() - queue_head['queueModifiedAt']).seconds
delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - \
(datetime.now(timezone.utc) - queue_head['queueModifiedAt']).seconds
logging.info(f'Waiting for {delay_seconds}s before considering the queue as finished to ensure that the data is consistent.')
await asyncio.sleep(delay_seconds)

Expand Down
58 changes: 58 additions & 0 deletions tests/integration/test_actor_create_proxy_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from apify import Actor

from .conftest import ActorFactory


class TestActorCreateProxyConfiguration:

async def test_create_proxy_configuration_basic(self, make_actor: ActorFactory) -> None:
async def main() -> None:
groups = ['SHADER']
country_code = 'US'

async with Actor:
proxy_configuration = await Actor.create_proxy_configuration(
groups=groups,
country_code=country_code,
)

assert proxy_configuration is not None
assert proxy_configuration._groups == groups
assert proxy_configuration._password is not None
assert proxy_configuration._country_code == country_code

actor = await make_actor('proxy-configuration', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_create_proxy_configuration_complex(self, make_actor: ActorFactory) -> None:
async def main() -> None:
await Actor.init()

proxy_url_suffix = f'{Actor.config.proxy_password}@{Actor.config.proxy_hostname}:{Actor.config.proxy_port}'

proxy_configuration = await Actor.create_proxy_configuration(actor_proxy_input={
'useApifyProxy': True,
})
jirimoravcik marked this conversation as resolved.
Show resolved Hide resolved
assert proxy_configuration is not None
assert await proxy_configuration.new_url() == f'http://auto:{proxy_url_suffix}'

groups = ['SHADER', 'BUYPROXIES94952']
country_code = 'US'
proxy_configuration = await Actor.create_proxy_configuration(actor_proxy_input={
'useApifyProxy': True,
'apifyProxyGroups': groups,
'apifyProxyCountry': country_code,
})
assert proxy_configuration is not None
assert await proxy_configuration.new_url() == f'http://groups-{"+".join(groups)},country-{country_code}:{proxy_url_suffix}'

await Actor.exit()

actor = await make_actor('proxy-configuration', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'
59 changes: 59 additions & 0 deletions tests/integration/test_actor_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from apify import Actor

from ._utils import generate_unique_resource_name
from .conftest import ActorFactory


class TestActorPushData:

async def test_push_data(self, make_actor: ActorFactory) -> None:
desired_item_count = 100 # Also change inside main() if you're changing this

async def main() -> None:
desired_item_count = 100
async with Actor:
await Actor.push_data([{'id': i} for i in range(desired_item_count)])

actor = await make_actor('push-data', main_func=main)

run_result = await actor.call()

assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'
list_page = await actor.last_run().dataset().list_items()
assert list_page.items[0]['id'] == 0
assert list_page.items[-1]['id'] == desired_item_count - 1
assert len(list_page.items) == list_page.count == desired_item_count


class TestActorOpenDataset:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test if open works for the dataset name and as you pass ID, we probably didn't test this in unit tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async def test_same_references_default(self, make_actor: ActorFactory) -> None:
async def main() -> None:
async with Actor:
dataset1 = await Actor.open_dataset()
dataset2 = await Actor.open_dataset()
assert dataset1 is dataset2

actor = await make_actor('dataset-same-ref-default', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_same_references_named(self, make_actor: ActorFactory) -> None:
dataset_name = generate_unique_resource_name('dataset')

async def main() -> None:
async with Actor:
input_object = await Actor.get_input()
dataset_name = input_object['datasetName']
dataset1 = await Actor.open_dataset(dataset_name)
dataset2 = await Actor.open_dataset(dataset_name)
assert dataset1 is dataset2
await dataset1.drop()

actor = await make_actor('dataset-same-ref-named', main_func=main)

run_result = await actor.call(run_input={'datasetName': dataset_name})
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'
136 changes: 136 additions & 0 deletions tests/integration/test_actor_key_value_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from apify import Actor

from ._utils import generate_unique_resource_name
from .conftest import ActorFactory


class TestActorOpenKeyValueStore:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I would test with name and ID param.

async def test_same_references_default(self, make_actor: ActorFactory) -> None:
async def main() -> None:
async with Actor:
kvs1 = await Actor.open_key_value_store()
kvs2 = await Actor.open_key_value_store()
assert kvs1 is kvs2

actor = await make_actor('kvs-same-ref-default', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_same_references_named(self, make_actor: ActorFactory) -> None:
kvs_name = generate_unique_resource_name('key-value-store')

async def main() -> None:
async with Actor:
input_object = await Actor.get_input()
kvs_name = input_object['kvsName']
kvs1 = await Actor.open_key_value_store(kvs_name)
kvs2 = await Actor.open_key_value_store(kvs_name)
assert kvs1 is kvs2
await kvs1.drop()

actor = await make_actor('kvs-same-ref-named', main_func=main)

run_result = await actor.call(run_input={'kvsName': kvs_name})
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'


class TestActorGetSetValue:
async def test_actor_get_set_value_simple(self, make_actor: ActorFactory) -> None:
async def main() -> None:
async with Actor:
await Actor.set_value('test', {'number': 123, 'string': 'a string', 'nested': {'test': 1}})
value = await Actor.get_value('test')
assert value['number'] == 123
assert value['string'] == 'a string'
assert value['nested']['test'] == 1

actor = await make_actor('actor-get-set-value', main_func=main)

run_result = await actor.call()
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'

async def test_actor_get_set_value_complex(self, make_actor: ActorFactory) -> None:
async def main_set() -> None:
async with Actor:
await Actor.set_value('test', {'number': 123, 'string': 'a string', 'nested': {'test': 1}})

actor_set = await make_actor('actor-set-value', main_func=main_set)

run_result_set = await actor_set.call()
assert run_result_set is not None
assert run_result_set['status'] == 'SUCCEEDED'
# Externally check if the value is present in key-value store
test_record = await actor_set.last_run().key_value_store().get_record('test')
assert test_record is not None
test_value = test_record['value']
assert test_value['number'] == 123
assert test_value['string'] == 'a string'
assert test_value['nested']['test'] == 1

async def main_get() -> None:
async with Actor:
input_object = await Actor.get_input()
# Access KVS of the previous 'set' run
kvs = await Actor.open_key_value_store(input_object['kvs-id'])
value = await kvs.get_value('test')
assert value['number'] == 123
assert value['string'] == 'a string'
assert value['nested']['test'] == 1

actor_get = await make_actor('actor-get-value', main_func=main_get)
default_kvs_info = await actor_set.last_run().key_value_store().get()
assert default_kvs_info is not None

run_result_get = await actor_get.call(run_input={'kvs-id': default_kvs_info['id']})
assert run_result_get is not None
assert run_result_get['status'] == 'SUCCEEDED'


class TestActorGetInput:
async def test_actor_get_input(self, make_actor: ActorFactory) -> None:
jirimoravcik marked this conversation as resolved.
Show resolved Hide resolved
actor_source_files = {
'INPUT_SCHEMA.json': """
{
"title": "Actor get input test",
"type": "object",
"schemaVersion": 1,
"properties": {
"password": {
"title": "Password",
"type": "string",
"description": "A secret, encrypted input field",
"editor": "textfield",
"isSecret": true
}
},
"required": ["password"]
}
""",
'src/main.py': """
import asyncio
from apify import Actor

async def main():
async with Actor:
input_object = await Actor.get_input()
assert input_object is not None
assert input_object['number'] == 123
assert input_object['string'] == 'a string'
assert input_object['nested']['test'] == 1
assert input_object['password'] == 'very secret'
""",
}
actor = await make_actor('actor-get-input', source_files=actor_source_files)

run_result = await actor.call(run_input={
'number': 123,
'string': 'a string',
'nested': {'test': 1},
'password': 'very secret',
})
assert run_result is not None
assert run_result['status'] == 'SUCCEEDED'
Loading