Load GCS Data via SyncedGcsBlob in Merino
- Status: Accepted
- Deciders: Bastian Gruber, Herraj Luhano, Mathijs Miermans, Nan Jiang
- Date: 2025-07-24
Context and Problem Statement
The following Merino providers load data from Google Cloud Storage (GCS):
- TopPicks - reloads suggestion data every 12 hours
- Manifest - reloads domain metadata (icons, titles, categories) every hour
- Curated Recommendations - uses
SyncedGcsBlob
to check every minute and load data only when the blob is updated
TopPicks and Manifest are loaded periodically without checking whether the data was updated. This requires a longer reload time, leading to stale data, unnecessary resource use, and duplicated maintenance effort.
The existing SyncedGcsBlob
class, used by the curated recommendations endpoint, periodically checks the updated
timestamp of GCS blobs and reloads data only when blobs are updated, calling a callback function to parse and cache the data. However, SyncedGcsBlob
currently uses Google's synchronous GCS client (google-cloud-storage
), offloading synchronous calls to a thread pool to avoid blocking the main event loop (as previously addressed in ADR 0005).
Should we standardize on SyncedGcsBlob
for all providers, and further, should we enhance it to use the asynchronous gcloud-aio-storage
client to simplify our implementation?
Decision Drivers
- Minimize latency from GCS updates to Merino data availability.
- Minimize performance overhead by frequent metadata checks (not full blob).
- Simplify maintenance with a consistent solution.
- Continue to avoid blocking Merino's async event loop (ADR 0005).
Considered Options
Option | Summary | Pros | Cons | |
---|---|---|---|---|
A | SyncedGcsBlob + gcloud-aio-storage (preferred) | Adopt async GCS client within SyncedGcsBlob | Low latency, minimal memory usage, non-blocking I/O, consistent implementation | Requires refactor |
B | SyncedGcsBlob with current sync GCS client | Use existing SyncedGcsBlob (status quo) | Low latency, existing solution | Memory overhead from threads |
C | Provider-specific loaders | Keep per-provider loading logic | No immediate refactor, flexible per provider | Duplicate logic, higher maintenance |
D | GCS → Pub/Sub push | Event-driven notifications | Instant updates | Increased complexity, more infrastructure |
Recommended Option
- Option A: Adopt
SyncedGcsBlob
enhanced withgcloud-aio-storage
.
This approach efficiently reduces latency, simplifies asynchronous handling by removing the thread-pool complexity in the current implementation of SyncedGcsBlob
, and maintains the decision outcome of ADR-0005.
By integrating gcloud-aio-storage
into SyncedGcsBlob
, it will natively use asynchronous I/O, and stop offloading synchronous calls to threads.
Positive Consequences
- Minimal latency: Rapid propagation of GCS updates.
- Low memory overhead: Leveraging async avoids creating a separate thread for each job.
- Low network overhead: Frequent metadata checks instead of full downloads.
- Maintenance ease: Standard implementation across providers.
Negative Consequences
- Initial refactor: Existing providers require updating.
Implementation Impact
Adopting this approach will require the following changes:
- SyncedGcsBlob - Refactor to use the async
gcloud-aio-storage
client instead of the synchronous client - TopPicks - Replace periodic reload with
SyncedGcsBlob
to check for updates - Manifest - Replace periodic reload with
SyncedGcsBlob
to check for updates
Curated Recommendations already uses SyncedGcsBlob
and will benefit from the async refactor without requiring significant changes.
Usage Example
Here's how providers will initialize and use the enhanced asynchronous SyncedGcsBlob
:
async def initialize_provider_backend() -> ProviderBackend:
try:
storage_client = AsyncStorageClient(...) # from gcloud-aio-storage
synced_blob = SyncedGcsBlob(
storage_client=storage_client,
metrics_client=get_metrics_client(),
metrics_namespace="provider.data",
bucket_name=settings.provider.gcs.bucket_name,
blob_name=settings.provider.gcs.data.blob_name,
max_size=settings.provider.gcs.data.max_size,
cron_interval_seconds=settings.provider.gcs.data.cron_interval_seconds, # How often 'updated' timestamp is checked
cron_job_name="fetch_provider_data",
)
synced_blob.set_fetch_callback(parse_and_cache_provider_data)
await synced_blob.initialize() # starts async background task
return GcsProviderBackend(synced_gcs_blob=synced_blob)
except Exception as e:
logger.error(f"Failed to initialize provider backend: {e}")
return DefaultProviderBackend()
def parse_and_cache_provider_data(data: str) -> None:
provider_data = json.loads(data)
cache_provider_data(provider_data)
The callback (parse_and_cache_provider_data
) is called automatically by SyncedGcsBlob
whenever the blob is updated. The callback implementation will vary; typical it decodes JSON, sometimes converting it to Pydantic models, and caching in memory.