BigQuery is Google Cloud's fully managed data warehouse — and with petabytes of data queryable in seconds at no infrastructure cost, it's the ideal analytics backend for AI agents. Agents connected to BigQuery can answer any business question from your data warehouse in natural language, generate automated reports, and proactively surface anomalies before stakeholders notice them.
For data teams running GA4, Firebase, or custom event data through BigQuery, AI agent integration creates a direct path from raw event data to business insight.
What AI Agents Can Do With BigQuery Access#
Natural Language Analytics
- Answer business questions by generating and executing SQL against your BigQuery datasets
- Enable non-technical users to query product, sales, and marketing data without SQL
- Explain query results in context with comparisons to prior periods
- Generate data narratives from raw query results for executive consumption
Automated Pipeline and Reporting
- Generate daily/weekly business metric reports from scheduled queries
- Monitor GA4 event data for unusual traffic patterns or conversion drops
- Create cohort retention analyses from user activity tables
- Build funnel conversion reports across multi-step user journeys
Data Quality Monitoring
- Detect tables not updated within expected refresh windows
- Identify schema changes that could break downstream BI tools
- Flag unexpected null rates or value distributions in key columns
- Alert when metric values deviate more than N% from the rolling average
Setting Up BigQuery API Access#
pip install google-cloud-bigquery langchain langchain-openai db-dtypes python-dotenv
Service Account Authentication#
# Download service account key from Google Cloud Console → IAM → Service Accounts
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
export BIGQUERY_PROJECT_ID="your-gcp-project-id"
export BIGQUERY_DATASET="your_dataset"
Test your connection:
from google.cloud import bigquery
import os
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT_ID"))
query = "SELECT COUNT(*) as table_count FROM INFORMATION_SCHEMA.TABLES"
result = client.query(query).result()
for row in result:
print(f"Tables in project: {row.table_count}")
Option 1: No-Code with n8n#
Automated Weekly Analytics Report#
- Schedule Trigger: Monday 8am
- HTTP Request: Call BigQuery REST API with a weekly metrics query (POST to
/bigquery/v2/projects/{projectId}/queries) - Code node: Parse response, calculate week-over-week changes
- OpenAI: "Write a 5-bullet weekly business summary from these metrics. Highlight any significant changes."
- Slack/Email: Send report to leadership channel
For the BigQuery REST API, include Authorization: Bearer {access_token} and query body:
{
"query": "SELECT metric, value FROM `project.dataset.weekly_metrics` WHERE week = DATE_TRUNC(CURRENT_DATE(), WEEK) LIMIT 50",
"useLegacySql": false
}
Option 2: LangChain with Python#
Build BigQuery Tools#
import os
import pandas as pd
from google.cloud import bigquery
from langchain.tools import tool
from dotenv import load_dotenv
load_dotenv()
PROJECT_ID = os.getenv("BIGQUERY_PROJECT_ID")
DATASET = os.getenv("BIGQUERY_DATASET")
client = bigquery.Client(project=PROJECT_ID)
def run_bq_query(sql: str, max_rows: int = 200) -> pd.DataFrame:
"""Execute a BigQuery SQL query and return results as DataFrame."""
job_config = bigquery.QueryJobConfig(maximum_bytes_billed=10 * 1024**3) # 10GB limit
query_job = client.query(sql, job_config=job_config)
result = query_job.result()
return result.to_dataframe().head(max_rows)
@tool
def query_bigquery(sql: str) -> str:
"""
Execute a BigQuery SQL query and return results. Only SELECT queries allowed.
Always include WHERE clauses filtering on partitioned columns for cost efficiency.
"""
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
return "Error: Only SELECT queries are permitted"
if "LIMIT" not in sql_upper:
sql = sql.rstrip(";") + "\nLIMIT 100"
df = run_bq_query(sql)
if df.empty:
return "Query returned no results"
return f"Results ({len(df)} rows):\n{df.to_string(index=False, max_rows=25)}"
@tool
def list_datasets_and_tables() -> str:
"""List all datasets and tables in the BigQuery project."""
datasets = list(client.list_datasets())
if not datasets:
return f"No datasets found in project {PROJECT_ID}"
result = [f"BigQuery datasets in {PROJECT_ID}:"]
for dataset in datasets[:10]:
tables = list(client.list_tables(dataset.reference))
result.append(f"\n Dataset: {dataset.dataset_id}")
for table in tables[:10]:
result.append(f" - {table.table_id} (type: {table.table_type})")
return "\n".join(result)
@tool
def get_table_info(table_ref: str) -> str:
"""
Get schema and metadata for a BigQuery table.
table_ref format: 'dataset.table_name' or 'project.dataset.table_name'
"""
if table_ref.count(".") == 1:
full_ref = f"{PROJECT_ID}.{table_ref}"
else:
full_ref = table_ref
table = client.get_table(full_ref)
schema_lines = [f"Table: {full_ref}",
f"Rows: {table.num_rows:,}",
f"Size: {table.num_bytes / (1024**3):.2f} GB",
f"Partitioned: {bool(table.time_partitioning)}",
f"\nSchema:"]
for field in table.schema[:20]:
schema_lines.append(f" {field.name}: {field.field_type} ({'nullable' if field.mode == 'NULLABLE' else 'required'})")
return "\n".join(schema_lines)
@tool
def get_metric_trend(table_ref: str, metric_col: str, date_col: str,
days: int = 30) -> str:
"""Get a time-series trend for a metric over the past N days."""
if table_ref.count(".") == 1:
full_ref = f"`{PROJECT_ID}.{table_ref}`"
else:
full_ref = f"`{table_ref}`"
sql = f"""
SELECT
DATE({date_col}) AS date,
SUM({metric_col}) AS daily_value,
AVG(SUM({metric_col})) OVER (ORDER BY DATE({date_col}) ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_7d
FROM {full_ref}
WHERE DATE({date_col}) >= DATE_SUB(CURRENT_DATE(), INTERVAL {days} DAY)
GROUP BY 1
ORDER BY 1 DESC
LIMIT {days}"""
df = run_bq_query(sql)
if df.empty:
return f"No data found for {metric_col} in {table_ref}"
latest = df.iloc[0]
avg_value = df["DAILY_VALUE"].mean()
trend_pct = ((latest["DAILY_VALUE"] - avg_value) / avg_value * 100) if avg_value else 0
return f"Metric trend for {metric_col} (last {days} days):\n{df.to_string(index=False)}\n\nLatest: {latest['DAILY_VALUE']:,.0f} | {days}-day avg: {avg_value:,.0f} | Trend: {trend_pct:+.1f}%"
@tool
def check_table_freshness(table_ref: str) -> str:
"""Check when a BigQuery table was last modified — useful for pipeline monitoring."""
if table_ref.count(".") == 1:
full_ref = f"{PROJECT_ID}.{table_ref}"
else:
full_ref = table_ref
table = client.get_table(full_ref)
last_modified = table.modified
import pytz
from datetime import datetime
now = datetime.now(pytz.utc)
hours_stale = (now - last_modified).total_seconds() / 3600
status = "FRESH" if hours_stale < 25 else f"STALE ({hours_stale:.0f} hours)"
return f"Table {table_ref}: Last modified {last_modified.strftime('%Y-%m-%d %H:%M UTC')} — {status}"
BigQuery Analytics Agent#
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [query_bigquery, list_datasets_and_tables, get_table_info,
get_metric_trend, check_table_freshness]
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a data analyst with access to Google BigQuery (project: {PROJECT_ID}).
When answering data questions:
1. Explore available datasets and tables if you're not sure where data lives
2. Get table schema before writing queries to use correct column names
3. Always filter on partitioned date columns to minimize costs
4. Return both raw numbers and percentage changes for business context
5. Interpret results in business terms, not just database terminology
Cost awareness:
- Always include a WHERE clause filtering on date partitions
- Never SELECT * on large tables — specify needed columns
- Include LIMIT clauses for exploratory queries"""),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True, max_iterations=8)
Rate Limits and Best Practices#
| BigQuery consideration | Guidance |
|---|---|
| On-demand cost | $5/TB scanned |
| Partition pruning | Always filter on _PARTITIONTIME or date columns |
| Column pruning | SELECT specific columns, never SELECT * |
| Concurrent queries | 100 concurrent on-demand, 2000 for flat-rate |
Best practices:
- Partition-first filtering: Always filter on the partition column in your WHERE clause — this is the single highest-impact BigQuery cost optimization
- Limit scanned data: For agent exploration, use
TABLESAMPLE SYSTEM (10 PERCENT)to query a sample before running against the full table - Monitor with Information Schema: Use
bigquery-public-data.INFORMATION_SCHEMA.JOBS_BY_PROJECTto audit agent query costs and patterns - Use authorized views: Create authorized views in BigQuery that restrict agent access to specific columns or row ranges — more fine-grained than project-level IAM
Next Steps#
- AI Agents Snowflake Integration — Multi-cloud data warehouse alternative
- AI Agents Google Analytics Integration — Web analytics data that flows natively into BigQuery
- Agentic RAG Glossary — How structured data retrieval extends RAG patterns
- Build an AI Agent with LangChain — Complete framework tutorial