Understanding CodeWords
Core Architecture
CodeWords is built on a microservices architecture where each workflow is a standalone FastAPI application that runs in secure, isolated sandboxes.
graph TB
A[User Request] --> B[API Gateway]
B --> C[Runtime Engine]
C --> D[Sandbox Environment]
D --> E[Your Workflow]
E --> F[Response]
G[Library Services] --> E
H[AI Models] --> E
I[Third-party APIs] --> E
Key Concepts
1. Workflows
FastAPI applications that define your automation logic
Stateless - each execution is independent
Scalable - automatically handle concurrent requests
Versioned - maintain multiple versions of your workflows
2. Sandboxes
Isolated environments powered by E2B technology
Ephemeral - created on-demand, destroyed after execution
Secure - no access to other users' data or workflows
Resource-limited - automatic cleanup and timeout protection
3. Library Services
Pre-built services that handle common tasks:
AI Models - OpenAI, Anthropic, Gemini
Web Automation - Chrome extension, web agents
Data Sources - Reddit, GitHub, search engines
Communication - Slack, WhatsApp, email
Storage - File upload/download, state management
Platform Components
1. Runtime Engine
Request routing and authentication
Sandbox provisioning and lifecycle management
Dependency installation using
uv
Log streaming and error handling
Usage tracking and billing
2. Library Services
AI Services
OpenAI GPT (
openai
) - Text generation, embeddings, visionAnthropic Claude (
anthropic
) - Reasoning, analysis, tool useGoogle Gemini (
gemini
) - Large context, multimodal
Web Automation
Chrome Extension (
chrome_extension
) - Browser automationWeb Agent (
web_automation_agent
) - AI-powered web tasksWebpage to Markdown (
webpage_to_markdown
) - Content extraction
Data Sources
GitHub (
github
) - Repository and profile searchReddit (
reddit
) - Post and comment retrievalSearch API (
searchapi
) - Google, Bing, YouTube searchHunter.io (
hunter
) - Email finding
Communication
Slack Trigger (
slack_trigger
) - Slack bot integrationWhatsApp Trigger (
whatsapp_trigger
) - WhatsApp automationSend Email (
send_email
) - Email delivery
Utilities
Schedule Runs (
schedule_runs
) - Workflow schedulingUser Login (
user_login
) - Authentication managementFirecrawl (
firecrawl
) - Web scraping
3. Pipedream Integrations
Access 2000+ third-party services:
Popular Categories
CRM: Salesforce, HubSpot, Pipedrive
Email: Gmail, Outlook, Mailchimp
Calendar: Google Calendar, Outlook Calendar
Social: LinkedIn, Twitter, Facebook
Storage: Google Drive, Dropbox, OneDrive
Payment: Stripe, PayPal, Square
Database: MySQL, PostgreSQL, MongoDB
Integration Capabilities
CodeWords provides access to over 2,000+ third-party integrations through Pipedream, enabling you to connect with virtually any tool in your tech stack:
Popular Categories:
CRM & Sales: Salesforce, HubSpot, Pipedrive, Airtable, Monday.com
Communication: Slack, Discord, Microsoft Teams, Zoom, WhatsApp
Email & Marketing: Gmail, Outlook, Mailchimp, SendGrid, Constant Contact
Data & Analytics: Google Sheets, Notion, Zapier, Snowflake, BigQuery
E-commerce: Shopify, WooCommerce, Stripe, PayPal, Square
Social Media: LinkedIn, Twitter, Facebook, Instagram, YouTube
Project Management: Asana, Trello, Jira, Linear, ClickUp
Cloud Storage: Google Drive, Dropbox, OneDrive, Box, AWS S3
Databases: MySQL, PostgreSQL, MongoDB, Redis, Supabase
Developer Tools: GitHub, GitLab, Docker, Jenkins, Vercel
What This Means for You:
No Integration Limits: Access enterprise-grade APIs without worrying about rate limits or complex authentication
Unified Interface: Use the same simple CodeWords syntax across all 2000+ services
OAuth Handled: We manage authentication flows, tokens, and refreshes automatically
Enterprise Ready: Connect to internal tools, databases, and custom APIs seamlessly
Whether you're building a simple automation or a complex multi-system workflow, CodeWords' extensive integration library ensures you can connect to the tools your organization already uses.
Example Use Cases
Example 1: LinkedIn Profile Enricher
Scenario: Automatically enrich a list of LinkedIn URLs with structured profile data.
Implementation:
# /// script
# requires-python = "==3.11.*"
# dependencies = [
# "fastapi==0.115.12",
# "uvicorn[standard]==0.34.2",
# "pydantic==2.11.3",
# "structlog==25.2.0",
# "codewords-client==0.2.4"
# ]
# [tool.env-checker]
# env_vars = [
# "PORT=8000",
# "LOGLEVEL=INFO",
# "CODEWORDS_API_KEY",
# "CODEWORDS_RUNTIME_URI"
# ]
# ///
import asyncio
import logging
from typing import List, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from codewords_client import AsyncCodewordsClient
import structlog
logger = structlog.get_logger()
app = FastAPI(
title="LinkedIn Profile Enricher",
description="Enrich LinkedIn profile URLs with structured data"
)
class ProfileData(BaseModel):
name: str
headline: Optional[str] = None
company: Optional[str] = None
location: Optional[str] = None
experience: List[str] = []
education: List[str] = []
class EnrichmentRequest(BaseModel):
linkedin_urls: List[HttpUrl]
max_concurrent: int = 5
class EnrichmentResponse(BaseModel):
profiles: List[ProfileData]
success_count: int
error_count: int
@app.post("/", response_model=EnrichmentResponse)
async def enrich_profiles(request: EnrichmentRequest):
"""Enrich LinkedIn profiles with structured data."""
logger.info("Starting profile enrichment",
url_count=len(request.linkedin_urls))
semaphore = asyncio.Semaphore(request.max_concurrent)
profiles = []
errors = 0
async def enrich_single_profile(url: str) -> Optional[ProfileData]:
async with semaphore:
try:
# Use Chrome extension to scrape profile
async with AsyncCodewordsClient() as client:
# Navigate to profile
result = await client.run(
service_id="chrome_extension",
path="/actions",
data={
"actions": [
{"type": "navigate", "url": str(url)},
{"type": "wait", "time": 3},
{"type": "extract_html"}
]
}
)
if not result.get("success"):
logger.error("Failed to extract profile", url=url)
return None
html_content = result["data"]["html"]
# Use AI to extract structured data
ai_result = await client.run(
service_id="openai",
data={
"model": "gpt-4o-mini",
"prompt": f"Extract profile data from this HTML: {html_content}",
"example_output": ProfileData.model_json_schema()
}
)
return ProfileData(**ai_result["response"])
except Exception as e:
logger.error("Error enriching profile", url=url, error=str(e))
return None
# Process all URLs concurrently
tasks = [enrich_single_profile(url) for url in request.linkedin_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, ProfileData):
profiles.append(result)
else:
errors += 1
logger.info("Enrichment completed",
success_count=len(profiles),
error_count=errors)
return EnrichmentResponse(
profiles=profiles,
success_count=len(profiles),
error_count=errors
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
Try with a generated UI here: Scrape And Enrich Linkedin Post To Google Sheets
OR
curl -X POST "https://your-workflow-url/" \
-H "Content-Type: application/json" \
-d '{
"linkedin_urls": [
"https://linkedin.com/in/johndoe",
"https://linkedin.com/in/janedoe"
],
"max_concurrent": 3
}'
Example 2: Competitor Website Monitor
Scenario: Monitor competitor websites for changes and send Slack alerts.
Implementation:
# /// script
# requires-python = "==3.11.*"
# dependencies = [
# "fastapi==0.115.12",
# "uvicorn[standard]==0.34.2",
# "pydantic==2.11.3",
# "structlog==25.2.0",
# "codewords-client==0.2.4",
# "slack-sdk==3.31.0"
# ]
# [tool.env-checker]
# env_vars = [
# "PORT=8000",
# "LOGLEVEL=INFO",
# "CODEWORDS_API_KEY",
# "CODEWORDS_RUNTIME_URI",
# "SLACK_BOT_TOKEN",
# "PIPEDREAM_SLACK_ACCESS"
# ]
# ///
import os
from typing import Optional, Dict, Any
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from codewords_client import AsyncCodewordsClient
import structlog
logger = structlog.get_logger()
app = FastAPI(
title="Website Monitor",
description="Monitor websites for changes and send alerts"
)
class MonitorRequest(BaseModel):
url: HttpUrl
slack_channel: str
check_frequency: str = "0 9 * * *" # Daily at 9 AM
comparison_prompt: str = "Identify significant changes in content, pricing, or features"
class MonitorResponse(BaseModel):
message: str
changes_detected: bool
summary: Optional[str] = None
@app.post("/", response_model=MonitorResponse)
async def monitor_website(request: MonitorRequest):
"""Monitor a website for changes and send alerts."""
logger.info("Starting website monitoring", url=request.url)
async with AsyncCodewordsClient() as client:
# Get current website content
current_content = await client.run(
service_id="firecrawl",
data={
"url": str(request.url),
"formats": ["markdown"],
"onlyMainContent": True
}
)
if not current_content.get("success"):
logger.error("Failed to fetch website content")
return MonitorResponse(
message="Failed to fetch website content",
changes_detected=False
)
current_markdown = current_content["data"]["markdown"]
# Get previous snapshot from Redis
snapshot_key = f"website_snapshot_{hash(str(request.url))}"
redis_result = await client.run(
service_id="redis_coordinator",
data={
"action": "get",
"key": snapshot_key
}
)
previous_content = redis_result.get("value") if redis_result.get("success") else None
if not previous_content:
# First run - store snapshot
await client.run(
service_id="redis_coordinator",
data={
"action": "set",
"key": snapshot_key,
"value": current_markdown,
"ttl": 30 * 24 * 3600 # 30 days
}
)
logger.info("Stored initial snapshot")
return MonitorResponse(
message="Initial snapshot stored. Monitoring will begin on next run.",
changes_detected=False
)
# Compare content using AI
comparison_result = await client.run(
service_id="anthropic",
data={
"model": "claude-3-5-haiku-20241022",
"prompt": f"""
Compare these two versions of a website and {request.comparison_prompt}.
Previous version:
{previous_content}
Current version:
{current_markdown}
Respond with a JSON object containing:
- changes_detected: boolean
- summary: string (if changes detected)
""",
"example_output": {
"changes_detected": True,
"summary": "Pricing increased from $99 to $129 for premium plan"
}
}
)
analysis = comparison_result.get("response", {})
changes_detected = analysis.get("changes_detected", False)
if changes_detected:
summary = analysis.get("summary", "Changes detected")
# Send Slack notification
await client.run(
service_id="pipedream",
data={
"app_slug": "slack",
"action_name": "send-message",
"props": {
"channel": request.slack_channel,
"text": f"🚨 Website Change Detected!\n\n**URL**: {request.url}\n**Summary**: {summary}"
}
}
)
# Update snapshot
await client.run(
service_id="redis_coordinator",
data={
"action": "set",
"key": snapshot_key,
"value": current_markdown,
"ttl": 30 * 24 * 3600
}
)
logger.info("Changes detected and notification sent", summary=summary)
return MonitorResponse(
message="Changes detected and notification sent",
changes_detected=True,
summary=summary
)
else:
logger.info("No significant changes detected")
return MonitorResponse(
message="No significant changes detected",
changes_detected=False
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
Example 3: AI-Powered Customer Support Ticket Analyzer
Scenario: Analyze support tickets, categorize them, and route to appropriate teams.
Implementation:
# /// script
# requires-python = "==3.11.*"
# dependencies = [
# "fastapi==0.115.12",
# "uvicorn[standard]==0.34.2",
# "pydantic==2.11.3",
# "structlog==25.2.0",
# "codewords-client==0.2.4"
# ]
# [tool.env-checker]
# env_vars = [
# "PORT=8000",
# "LOGLEVEL=INFO",
# "CODEWORDS_API_KEY",
# "CODEWORDS_RUNTIME_URI",
# "PIPEDREAM_HUBSPOT_ACCESS"
# ]
# ///
from enum import Enum
from typing import List, Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from codewords_client import AsyncCodewordsClient
import structlog
logger = structlog.get_logger()
app = FastAPI(
title="Support Ticket Analyzer",
description="AI-powered support ticket analysis and routing"
)
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class Category(str, Enum):
BILLING = "billing"
TECHNICAL = "technical"
FEATURE_REQUEST = "feature_request"
BUG_REPORT = "bug_report"
GENERAL = "general"
class Sentiment(str, Enum):
POSITIVE = "positive"
NEUTRAL = "neutral"
NEGATIVE = "negative"
class TicketAnalysis(BaseModel):
category: Category
priority: Priority
sentiment: Sentiment
summary: str = Field(description="Brief summary of the ticket")
suggested_response: str = Field(description="Suggested response template")
action_items: List[str] = Field(description="Specific action items")
escalation_needed: bool = Field(description="Whether escalation is needed")
class AnalyzeTicketRequest(BaseModel):
ticket_id: str
subject: str
content: str
customer_email: str
customer_tier: str = "standard" # standard, premium, enterprise
class AnalyzeTicketResponse(BaseModel):
ticket_id: str
analysis: TicketAnalysis
routing_team: str
estimated_resolution_time: str
@app.post("/", response_model=AnalyzeTicketResponse)
async def analyze_ticket(request: AnalyzeTicketRequest):
"""Analyze a support ticket and provide routing recommendations."""
logger.info("Analyzing ticket", ticket_id=request.ticket_id)
async with AsyncCodewordsClient() as client:
# Analyze ticket using AI
analysis_result = await client.run(
service_id="openai",
data={
"model": "gpt-4o",
"prompt": f"""
Analyze this customer support ticket and provide detailed analysis:
Customer Tier: {request.customer_tier}
Subject: {request.subject}
Content: {request.content}
Consider:
- Urgency indicators (downtime, revenue impact, security)
- Customer tier (enterprise customers get higher priority)
- Technical complexity
- Sentiment and frustration level
""",
"example_output": TicketAnalysis.model_json_schema()
}
)
analysis = TicketAnalysis(**analysis_result["response"])
# Determine routing team
routing_team = determine_routing_team(analysis.category, analysis.priority, request.customer_tier)
# Estimate resolution time
resolution_time = estimate_resolution_time(analysis.category, analysis.priority, request.customer_tier)
# Update CRM with analysis
await update_crm_ticket(
client,
request.ticket_id,
request.customer_email,
analysis,
routing_team
)
logger.info("Ticket analysis completed",
ticket_id=request.ticket_id,
category=analysis.category,
priority=analysis.priority,
team=routing_team)
return AnalyzeTicketResponse(
ticket_id=request.ticket_id,
analysis=analysis,
routing_team=routing_team,
estimated_resolution_time=resolution_time
)
def determine_routing_team(category: Category, priority: Priority, customer_tier: str) -> str:
"""Determine which team should handle the ticket."""
if category == Category.BILLING:
return "billing_team"
elif category == Category.TECHNICAL and priority in [Priority.HIGH, Priority.URGENT]:
return "senior_technical_team"
elif category == Category.TECHNICAL:
return "technical_team"
elif category == Category.BUG_REPORT:
return "engineering_team"
elif category == Category.FEATURE_REQUEST:
return "product_team"
else:
return "general_support_team"
def estimate_resolution_time(category: Category, priority: Priority, customer_tier: str) -> str:
"""Estimate resolution time based on ticket characteristics."""
base_times = {
Priority.URGENT: "2-4 hours",
Priority.HIGH: "4-8 hours",
Priority.MEDIUM: "1-2 days",
Priority.LOW: "3-5 days"
}
# Enterprise customers get faster resolution
if customer_tier == "enterprise":
enterprise_times = {
Priority.URGENT: "1-2 hours",
Priority.HIGH: "2-4 hours",
Priority.MEDIUM: "4-8 hours",
Priority.LOW: "1-2 days"
}
return enterprise_times.get(priority, base_times[priority])
return base_times.get(priority, "3-5 days")
async def update_crm_ticket(
client: AsyncCodewordsClient,
ticket_id: str,
customer_email: str,
analysis: TicketAnalysis,
routing_team: str
):
"""Update CRM system with ticket analysis."""
try:
await client.run(
service_id="pipedream",
data={
"app_slug": "hubspot",
"action_name": "update-ticket",
"props": {
"ticket_id": ticket_id,
"properties": {
"hs_ticket_category": analysis.category,
"hs_ticket_priority": analysis.priority,
"sentiment": analysis.sentiment,
"routing_team": routing_team,
"ai_summary": analysis.summary,
"escalation_needed": analysis.escalation_needed
}
}
}
)
logger.info("CRM updated successfully", ticket_id=ticket_id)
except Exception as e:
logger.error("Failed to update CRM", ticket_id=ticket_id, error=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
Best Practices (Advanced)
1. Workflow Design
Structure Your Code Properly
# ✅ Good: Clear separation of concerns
class DataProcessor:
async def process(self, data: List[Dict]) -> List[Dict]:
cleaned = await self.clean_data(data)
enriched = await self.enrich_data(cleaned)
return await self.validate_results(enriched)
# ❌ Bad: Everything in one function
@app.post("/")
async def process_everything(request):
# 200 lines of mixed logic...
Use Proper Error Handling
# ✅ Good: Specific error handling
try:
result = await external_api_call()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
# Handle rate limiting
await asyncio.sleep(retry_delay)
result = await external_api_call()
else:
raise HTTPException(status_code=502, detail="External API error")
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Request timeout")
# ❌ Bad: Generic exception handling
try:
result = await external_api_call()
except Exception:
return {"error": "Something went wrong"}
Implement Proper Logging
# ✅ Good: Structured logging with context
logger.info("Processing batch",
batch_size=len(items),
user_id=user_id,
operation="data_enrichment")
# ❌ Bad: Basic print statements
print(f"Processing {len(items)} items")
2. Performance Optimization
Use Concurrency Wisely
# ✅ Good: Controlled concurrency
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent operations
async def process_item(item):
async with semaphore:
return await expensive_operation(item)
# Process items concurrently
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
Optimize API Calls
# ✅ Good: Batch operations
batch_size = 10
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
await process_batch(batch)
# ❌ Bad: One item at a time
for item in items:
await process_single_item(item)
3. Security Best Practices
Use Environment Variables for Secrets
# ✅ Good: Environment variables
api_key = os.environ["THIRD_PARTY_API_KEY"]
# ❌ Bad: Hardcoded secrets
api_key = "sk-1234567890abcdef" # Never do this!
Validate Input Thoroughly
# ✅ Good: Comprehensive validation
class EmailRequest(BaseModel):
email: EmailStr = Field(..., description="Valid email address")
name: str = Field(..., min_length=1, max_length=100)
age: int = Field(..., ge=0, le=120)
# ❌ Bad: No validation
class EmailRequest(BaseModel):
email: str
name: str
age: int
4. Cost Optimization
Choose the Right AI Model
# ✅ Good: Model selection based on task
def get_model_for_task(task_type: str) -> str:
if task_type == "simple_extraction":
return "gpt-4o-mini" # Cheaper for simple tasks
elif task_type == "complex_reasoning":
return "gpt-4o" # More capable for complex tasks
elif task_type == "large_context":
return "gemini-1.5-pro" # Best for large inputs
Cache Expensive Operations
# ✅ Good: Caching with Redis
async def get_cached_result(key: str):
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
result = await expensive_computation()
await redis_client.setex(key, 3600, json.dumps(result)) # 1 hour cache
return result
Last updated