Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import dataclasses
import json
from datetime import datetime, timedelta
from decimal import Decimal
Expand Down Expand Up @@ -34,6 +35,29 @@ def _transform_to_list(value: Any) -> list[str] | None:
return value if isinstance(value, list) else str(value).split(',')


@dataclasses.dataclass
class ActorStorages:
"""Storage IDs for different storage types used by an Actor."""

key_value_stores: dict[str, str]
datasets: dict[str, str]
request_queues: dict[str, str]


def _load_storage_keys(data: None | str | dict | ActorStorages) -> ActorStorages | None:
"""Load storage keys from environment."""
if data is None:
return None
if isinstance(data, ActorStorages):
return data
storage_mapping = data if isinstance(data, dict) else json.loads(data)
return ActorStorages(
key_value_stores=storage_mapping.get('keyValueStores', {}),
datasets=storage_mapping.get('datasets', {}),
request_queues=storage_mapping.get('requestQueues', {}),
)


@docs_group('Configuration')
class Configuration(CrawleeConfiguration):
"""A class for specifying the configuration of an Actor.
Expand Down Expand Up @@ -446,6 +470,15 @@ class Configuration(CrawleeConfiguration):
BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data or None),
] = None

actor_storages: Annotated[
ActorStorages | None,
Field(
alias='actor_storages_json',
description='Storage IDs for the actor',
),
BeforeValidator(_load_storage_keys),
] = None
Comment on lines +473 to +480
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change closes #762, but the issue description specifies the platform-provided env var name as ACTOR_STORAGE_IDS (object with datasets, keyValueStores, requestQueues). The new field only declares alias='actor_storages_json' (env ACTOR_STORAGES_JSON). If the platform actually uses ACTOR_STORAGE_IDS, configuration loading will silently miss the mapping. Consider supporting ACTOR_STORAGE_IDS via validation_alias=AliasChoices(...) (keeping backward compatibility if ACTOR_STORAGES_JSON is intentional).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation on the platform is ACTOR_STORAGES_JSON now, if it changes the code should reflect it as well


@model_validator(mode='after')
def disable_browser_sandbox_on_platform(self) -> Self:
"""Disable the browser sandbox mode when running on the Apify platform.
Expand Down
44 changes: 20 additions & 24 deletions src/apify/storage_clients/_apify/_alias_resolving.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from logging import getLogger
from typing import TYPE_CHECKING, ClassVar, Literal, overload

from propcache import cached_property

from apify_client import ApifyClientAsync

from ._utils import hash_api_base_url_and_token
Expand Down Expand Up @@ -139,7 +141,6 @@ def __init__(
self._storage_type = storage_type
self._alias = alias
self._configuration = configuration
self._additional_cache_key = hash_api_base_url_and_token(configuration)

async def __aenter__(self) -> AliasResolver:
"""Context manager to prevent race condition in alias creation."""
Expand Down Expand Up @@ -183,15 +184,7 @@ async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]:
default_kvs_client = await cls._get_default_kvs_client(configuration)

record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY)

# get_record can return {key: ..., value: ..., content_type: ...}
if isinstance(record, dict):
if 'value' in record and isinstance(record['value'], dict):
cls._alias_map = record['value']
else:
cls._alias_map = record
else:
cls._alias_map = dict[str, str]()
cls._alias_map = record.get('value', {}) if record else {}

return cls._alias_map

Expand All @@ -201,6 +194,18 @@ async def resolve_id(self) -> str | None:
Returns:
Storage id if it exists, None otherwise.
"""
# First try to find the alias in the configuration mapping to avoid any API calls.
# This mapping is maintained by the Apify platform and does not have to be maintained in the default KVS.
if self._configuration.actor_storages and self._alias != 'default':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I don't understand why we're treating 'default' differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to access the default storage is without an alias. Loading up default aliases from the configuration mapping would allow an alternative way to access the default storage through "default" alias. In Python that is generally frowned upon:
There should be one-- and preferably only one --obvious way to do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sometimes I wish Javascript was like this :D

storage_maps = {
'Dataset': self._configuration.actor_storages.datasets,
'KeyValueStore': self._configuration.actor_storages.key_value_stores,
'RequestQueue': self._configuration.actor_storages.request_queues,
}
if storage_id := storage_maps.get(self._storage_type, {}).get(self._alias):
return storage_id

# Fallback to the mapping saved in the default KVS
return (await self._get_alias_map(self._configuration)).get(self._storage_key, None)

async def store_mapping(self, storage_id: str) -> None:
Expand All @@ -220,30 +225,21 @@ async def store_mapping(self, storage_id: str) -> None:

try:
record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY)

# get_record can return {key: ..., value: ..., content_type: ...}
if isinstance(record, dict) and 'value' in record:
record = record['value']

# Update or create the record with the new alias mapping
if isinstance(record, dict):
record[self._storage_key] = storage_id
else:
record = {self._storage_key: storage_id}
value = record.get('value', {}) if record else {}
value[self._storage_key] = storage_id

# Store the mapping back in the KVS.
await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record)
await default_kvs_client.set_record(key=self._ALIAS_MAPPING_KEY, value=value)
except Exception as exc:
logger.warning(f'Error storing alias mapping for {self._alias}: {exc}')

@property
@cached_property
def _storage_key(self) -> str:
"""Get a unique storage key used for storing the alias in the mapping."""
return self._ALIAS_STORAGE_KEY_SEPARATOR.join(
[
self._storage_type,
self._alias,
self._additional_cache_key,
hash_api_base_url_and_token(self._configuration),
]
)

Expand Down
Empty file.
24 changes: 24 additions & 0 deletions tests/e2e/test_schema_storages/actor_source/actor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"actorSpecification": 1,
"version": "0.0",
"storages": {
"datasets": {
"default": {
"actorSpecification": 1,
"fields": {
"properties": {
"id": { "type": "string" }
}
}
},
"custom": {
"actorSpecification": 1,
"fields": {
"properties": {
"id": { "type": "string" }
}
}
}
}
}
}
7 changes: 7 additions & 0 deletions tests/e2e/test_schema_storages/actor_source/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from apify import Actor


async def main() -> None:
async with Actor:
assert Actor.configuration.actor_storages
assert (await Actor.open_dataset(alias='custom')).id == Actor.configuration.actor_storages.datasets['custom']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Would it also make sense to check that default is default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should probably be tested on the platform level. If the env vars are inconsistent. (default id is different in mapping env and in dedicated env var, then our code can't do much about that)

26 changes: 26 additions & 0 deletions tests/e2e/test_schema_storages/test_schema_storages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ..conftest import MakeActorFunction, RunActorFunction

_ACTOR_SOURCE_DIR = Path(__file__).parent / 'actor_source'


def read_actor_source(filename: str) -> str:
return (_ACTOR_SOURCE_DIR / filename).read_text()


async def test_configuration_storages(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
actor = await make_actor(
label='schema_storages',
source_files={
'src/main.py': read_actor_source('main.py'),
'.actor/actor.json': read_actor_source('actor.json'),
},
)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'
23 changes: 23 additions & 0 deletions tests/unit/actor/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,26 @@ def test_actor_pricing_info_from_json_env_var(monkeypatch: pytest.MonkeyPatch) -
config = ApifyConfiguration()
assert config.actor_pricing_info is not None
assert config.actor_pricing_info.pricing_model == 'PAY_PER_EVENT'


def test_actor_storage_json_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that actor_storages_json is parsed from JSON env var."""
import json

datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
request_queues = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
key_value_stores = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}

actor_storages_json = json.dumps(
{
'datasets': datasets,
'requestQueues': request_queues,
'keyValueStores': key_value_stores,
}
)
monkeypatch.setenv('ACTOR_STORAGES_JSON', actor_storages_json)
config = ApifyConfiguration()
assert config.actor_storages
assert config.actor_storages.datasets == datasets
assert config.actor_storages.request_queues == request_queues
assert config.actor_storages.key_value_stores == key_value_stores
25 changes: 24 additions & 1 deletion tests/unit/storage_clients/test_alias_resolver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from apify._configuration import Configuration
from apify._configuration import ActorStorages, Configuration
from apify.storage_clients._apify._alias_resolving import AliasResolver


Expand Down Expand Up @@ -76,3 +76,26 @@ async def test_get_alias_map_returns_in_memory_map() -> None:
AliasResolver._alias_map = {}
result = await AliasResolver._get_alias_map(config)
assert result == {}


async def test_configuration_storages_alias_resolving() -> None:
"""Test that `AliasResolver` correctly resolves ids of storages registered in Configuration."""

# Actor storages
datasets = {'default': 'default_dataset_id', 'custom': 'custom_Dataset_id'}
request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_RequestQueue_id'}
key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_KeyValueStore_id'}

# Set up the configuration with the storage mapping
configuration = Configuration(
actor_storages=ActorStorages(
datasets=datasets, request_queues=request_queues, key_value_stores=key_value_stores
),
)

# Check that id of each non-default storage saved in the mapping is resolved
for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'):
assert (
await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id()
== f'custom_{storage_type}_id'
)