Load GCS Data via SyncedGcsBlob in Merino
- Status: Accepted
- Deciders: Bastian Gruber, Herraj Luhano, Mathijs Miermans, Nan Jiang
- Date: 2025-07-24
Context and Problem Statement
The following Merino providers load data from Google Cloud Storage (GCS):
- TopPicks - reloads suggestion data every 12 hours
- Manifest - reloads domain metadata (icons, titles, categories) every hour
- Curated Recommendations - uses
SyncedGcsBlobto check every minute and load data only when the blob is updated
TopPicks and Manifest are loaded periodically without checking whether the data was updated. This requires a longer reload time, leading to stale data, unnecessary resource use, and duplicated maintenance effort.
The existing SyncedGcsBlob class, used by the curated recommendations endpoint, periodically checks the updated timestamp of GCS blobs and reloads data only when blobs are updated, calling a callback function to parse and cache the data. However, SyncedGcsBlob currently uses Google's synchronous GCS client (google-cloud-storage), offloading synchronous calls to a thread pool to avoid blocking the main event loop (as previously addressed in ADR 0005).
Should we standardize on SyncedGcsBlob for all providers, and further, should we enhance it to use the asynchronous gcloud-aio-storage client to simplify our implementation?
Decision Drivers
- Minimize latency from GCS updates to Merino data availability.
- Minimize performance overhead by frequent metadata checks (not full blob).
- Simplify maintenance with a consistent solution.
- Continue to avoid blocking Merino's async event loop (ADR 0005).
Considered Options
| Option | Summary | Pros | Cons | |
|---|---|---|---|---|
| A | SyncedGcsBlob + gcloud-aio-storage (preferred) | Adopt async GCS client within SyncedGcsBlob | Low latency, minimal memory usage, non-blocking I/O, consistent implementation | Requires refactor |
| B | SyncedGcsBlob with current sync GCS client | Use existing SyncedGcsBlob (status quo) | Low latency, existing solution | Memory overhead from threads |
| C | Provider-specific loaders | Keep per-provider loading logic | No immediate refactor, flexible per provider | Duplicate logic, higher maintenance |
| D | GCS → Pub/Sub push | Event-driven notifications | Instant updates | Increased complexity, more infrastructure |
Recommended Option
- Option A: Adopt
SyncedGcsBlobenhanced withgcloud-aio-storage.
This approach efficiently reduces latency, simplifies asynchronous handling by removing the thread-pool complexity in the current implementation of SyncedGcsBlob, and maintains the decision outcome of ADR-0005.
By integrating gcloud-aio-storage into SyncedGcsBlob, it will natively use asynchronous I/O, and stop offloading synchronous calls to threads.
Positive Consequences
- Minimal latency: Rapid propagation of GCS updates.
- Low memory overhead: Leveraging async avoids creating a separate thread for each job.
- Low network overhead: Frequent metadata checks instead of full downloads.
- Maintenance ease: Standard implementation across providers.
Negative Consequences
- Initial refactor: Existing providers require updating.
Implementation Impact
Adopting this approach will require the following changes:
- SyncedGcsBlob - Refactor to use the async
gcloud-aio-storageclient instead of the synchronous client - TopPicks - Replace periodic reload with
SyncedGcsBlobto check for updates - Manifest - Replace periodic reload with
SyncedGcsBlobto check for updates
Curated Recommendations already uses SyncedGcsBlob and will benefit from the async refactor without requiring significant changes.
Usage Example
Here's how providers will initialize and use the enhanced asynchronous SyncedGcsBlob:
async def initialize_provider_backend() -> ProviderBackend:
try:
storage_client = AsyncStorageClient(...) # from gcloud-aio-storage
synced_blob = SyncedGcsBlob(
storage_client=storage_client,
metrics_client=get_metrics_client(),
metrics_namespace="provider.data",
bucket_name=settings.provider.gcs.bucket_name,
blob_name=settings.provider.gcs.data.blob_name,
max_size=settings.provider.gcs.data.max_size,
cron_interval_seconds=settings.provider.gcs.data.cron_interval_seconds, # How often 'updated' timestamp is checked
cron_job_name="fetch_provider_data",
)
synced_blob.set_fetch_callback(parse_and_cache_provider_data)
await synced_blob.initialize() # starts async background task
return GcsProviderBackend(synced_gcs_blob=synced_blob)
except Exception as e:
logger.error(f"Failed to initialize provider backend: {e}")
return DefaultProviderBackend()
def parse_and_cache_provider_data(data: str) -> None:
provider_data = json.loads(data)
cache_provider_data(provider_data)
The callback (parse_and_cache_provider_data) is called automatically by SyncedGcsBlob whenever the blob is updated. The callback implementation will vary; typical it decodes JSON, sometimes converting it to Pydantic models, and caching in memory.