Load GCS Data via SyncedGcsBlob in Merino

  • Status: Accepted
  • Deciders: Bastian Gruber, Herraj Luhano, Mathijs Miermans, Nan Jiang
  • Date: 2025-07-24

Context and Problem Statement

The following Merino providers load data from Google Cloud Storage (GCS):

  • TopPicks - reloads suggestion data every 12 hours
  • Manifest - reloads domain metadata (icons, titles, categories) every hour
  • Curated Recommendations - uses SyncedGcsBlob to check every minute and load data only when the blob is updated

TopPicks and Manifest are loaded periodically without checking whether the data was updated. This requires a longer reload time, leading to stale data, unnecessary resource use, and duplicated maintenance effort.

The existing SyncedGcsBlob class, used by the curated recommendations endpoint, periodically checks the updated timestamp of GCS blobs and reloads data only when blobs are updated, calling a callback function to parse and cache the data. However, SyncedGcsBlob currently uses Google's synchronous GCS client (google-cloud-storage), offloading synchronous calls to a thread pool to avoid blocking the main event loop (as previously addressed in ADR 0005).

Should we standardize on SyncedGcsBlob for all providers, and further, should we enhance it to use the asynchronous gcloud-aio-storage client to simplify our implementation?

Decision Drivers

  1. Minimize latency from GCS updates to Merino data availability.
  2. Minimize performance overhead by frequent metadata checks (not full blob).
  3. Simplify maintenance with a consistent solution.
  4. Continue to avoid blocking Merino's async event loop (ADR 0005).

Considered Options

OptionSummaryProsCons
ASyncedGcsBlob + gcloud-aio-storage (preferred)Adopt async GCS client within SyncedGcsBlobLow latency, minimal memory usage, non-blocking I/O, consistent implementationRequires refactor
BSyncedGcsBlob with current sync GCS clientUse existing SyncedGcsBlob (status quo)Low latency, existing solutionMemory overhead from threads
CProvider-specific loadersKeep per-provider loading logicNo immediate refactor, flexible per providerDuplicate logic, higher maintenance
DGCS → Pub/Sub pushEvent-driven notificationsInstant updatesIncreased complexity, more infrastructure
  • Option A: Adopt SyncedGcsBlob enhanced with gcloud-aio-storage.

This approach efficiently reduces latency, simplifies asynchronous handling by removing the thread-pool complexity in the current implementation of SyncedGcsBlob, and maintains the decision outcome of ADR-0005. By integrating gcloud-aio-storage into SyncedGcsBlob, it will natively use asynchronous I/O, and stop offloading synchronous calls to threads.

Positive Consequences

  • Minimal latency: Rapid propagation of GCS updates.
  • Low memory overhead: Leveraging async avoids creating a separate thread for each job.
  • Low network overhead: Frequent metadata checks instead of full downloads.
  • Maintenance ease: Standard implementation across providers.

Negative Consequences

  • Initial refactor: Existing providers require updating.

Implementation Impact

Adopting this approach will require the following changes:

  • SyncedGcsBlob - Refactor to use the async gcloud-aio-storage client instead of the synchronous client
  • TopPicks - Replace periodic reload with SyncedGcsBlob to check for updates
  • Manifest - Replace periodic reload with SyncedGcsBlob to check for updates

Curated Recommendations already uses SyncedGcsBlob and will benefit from the async refactor without requiring significant changes.

Usage Example

Here's how providers will initialize and use the enhanced asynchronous SyncedGcsBlob:

async def initialize_provider_backend() -> ProviderBackend:
    try:
        storage_client = AsyncStorageClient(...)  # from gcloud-aio-storage
        synced_blob = SyncedGcsBlob(
            storage_client=storage_client,
            metrics_client=get_metrics_client(),
            metrics_namespace="provider.data",
            bucket_name=settings.provider.gcs.bucket_name,
            blob_name=settings.provider.gcs.data.blob_name,
            max_size=settings.provider.gcs.data.max_size,
            cron_interval_seconds=settings.provider.gcs.data.cron_interval_seconds,  # How often 'updated' timestamp is checked
            cron_job_name="fetch_provider_data",
        )
        synced_blob.set_fetch_callback(parse_and_cache_provider_data)
        await synced_blob.initialize()  # starts async background task
        return GcsProviderBackend(synced_gcs_blob=synced_blob)
    except Exception as e:
        logger.error(f"Failed to initialize provider backend: {e}")
        return DefaultProviderBackend()

def parse_and_cache_provider_data(data: str) -> None:
    provider_data = json.loads(data)
    cache_provider_data(provider_data)

The callback (parse_and_cache_provider_data) is called automatically by SyncedGcsBlob whenever the blob is updated. The callback implementation will vary; typical it decodes JSON, sometimes converting it to Pydantic models, and caching in memory.