Back to Skills
Agent CRM
by Tyrell
A complete CRM with no UIβjust natural language. Track contacts, deals, interactions, and tasks through conversation. Use when: (1) adding/finding contacts, (2) creating or updating deals and pipeline, (3) logging calls/emails/meetings, (4) managing follow-up tasks, (5) generating pipeline reports or charts, (6) parsing emails/notes into CRM data. Supports: pipeline tracking, deal stages, activity logging, task management, alerts for stale contacts, visual charts, CSV/JSON export, database backup/restore.
1.0.1
$ npx skills add https://github.com/ianpcook/agent-crmFiles
SKILL.mdMain
11.7 KB
---
name: Agent CRM
description: >
A complete CRM with no UIβjust natural language. Track contacts, deals, interactions,
and tasks through conversation. Use when: (1) adding/finding contacts, (2) creating or
updating deals and pipeline, (3) logging calls/emails/meetings, (4) managing follow-up
tasks, (5) generating pipeline reports or charts, (6) parsing emails/notes into CRM data.
Supports: pipeline tracking, deal stages, activity logging, task management, alerts for
stale contacts, visual charts, CSV/JSON export, database backup/restore.
version: 1.0.1
author: Tyrell
category: productivity
agents:
- claude-code
- openclaw
---
# Agent CRM
A CRM with no UI. Just you, a database, and natural language.
## Overview
The Agent CRM replaces traditional CRM software with a conversational interface. Data lives in SQLite; you are the interface. Every action is audited with conversation context.
## Scripts
| Script | Purpose |
|--------|---------|
| `crm.py` | Core CRUD operations |
| `crm-ingest.py` | Parse unstructured text β structured CRM actions |
| `crm-digest.py` | Generate daily digest / pipeline summary |
| `crm-notify.py` | Check for alerts (overdue tasks, stale contacts, closing deals) |
| `crm-webhook.py` | HTTP server for form/lead ingestion |
| `crm-report.py` | Pipeline analytics, activity reports, win/loss analysis |
| `crm-chart.py` | Generate visual charts (auto-bootstraps matplotlib) |
| `crm-export.py` | Export data to CSV/JSON |
| `crm-backup.py` | Backup/restore database |
---
## CLI: `crm`
All scripts are in the `scripts/` directory. The database auto-initializes on first use.
### Contacts
```bash
# Add a contact
crm add-contact "Sarah Chen" --email sarah@replicate.com --company Replicate --role CTO --source "AI meetup"
# Find contacts
crm find-contact "sarah"
crm find-contact "replicate"
# List contacts
crm list-contacts
crm list-contacts --recent
# Update contact
crm update-contact "Sarah Chen" --phone "415-555-1234" --notes "Interested in API integration"
# Delete (use with caution)
crm delete-contact <id> --reason "Duplicate entry"
```
### Deals
```bash
# Add a deal
crm add-deal "Replicate API License" --value 50000 --contact "Sarah Chen" --stage qualified --expected-close "next month"
# List deals
crm list-deals
crm list-deals --stage proposal
# Update deal
crm update-deal "Replicate" --stage negotiation --probability 70
# Pipeline summary
crm pipeline
```
### Interactions
```bash
# Log an interaction
crm log call "Discussed pricing, she'll review with team" --contact "Sarah Chen" --direction outbound
crm log email "Sent proposal PDF" --contact "Sarah" --direction outbound
crm log meeting "Demo of API features, very positive" --contact "Sarah" --date yesterday
```
### Tasks
```bash
# Add task
crm add-task "Follow up on proposal" --contact "Sarah Chen" --due "next tuesday" --priority high
# List tasks
crm list-tasks --pending
crm list-tasks --overdue
# Complete task
crm complete-task "Follow up on proposal"
```
### Queries
```bash
# Pipeline stats
crm stats
# Raw SQL (SELECT only)
crm query "SELECT name, company FROM contacts WHERE company LIKE '%tech%'"
```
## Confirmation Rules
**Always confirm before:**
- Creating/updating deals > $10,000
- Changing deal stage to `won` or `lost`
- Deleting any record
- Bulk updates (future)
**Example flow:**
```
User: "Mark the Replicate deal as won"
You: "Confirm: Mark 'Replicate API License' ($50,000) as WON? (yes/no)"
User: "yes"
You: [execute] "Done. Deal closed at $50K. π"
```
## Audit Trail
Every write operation is logged to `audit_log` table with:
- What changed (old β new values)
- Why (use `--reason` flag)
- When
View audit history:
```bash
crm query "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 10"
```
## Data Location
- **Database:** `~/.local/share/agent-crm/crm.db`
- **Schema:** `skills/agent-crm/schema.sql`
## Stages
Valid deal stages (in order):
1. `lead` β Initial contact
2. `qualified` β Confirmed interest/budget
3. `proposal` β Sent proposal/quote
4. `negotiation` β Active negotiation
5. `won` β Closed won β
6. `lost` β Closed lost β
## Interaction Types
- `email` β Email correspondence
- `call` β Phone/video call
- `meeting` β In-person or scheduled meeting
- `note` β Internal note (no contact involved)
- `linkedin` β LinkedIn message/interaction
- `text` β SMS/iMessage/WhatsApp
---
## CLI: `crm-ingest`
Parses unstructured text (emails, meeting notes, call summaries) and extracts structured data.
```bash
# From stdin
echo "Met Sarah Chen at the AI meetup. She's CTO at Replicate, interested in API." | crm-ingest
# From file
crm-ingest --file meeting-notes.txt
# Force type detection
crm-ingest --type email --file forwarded-email.txt
```
**Output:** JSON with extracted entities and suggested actions:
- Contact names, emails, phones, companies
- Interaction type and direction
- Deal signals (stage hints, positive/negative indicators)
- Monetary amounts
- Potential tasks
- Suggested CRM actions for your review
**Workflow:**
1. User pastes text or forwards email
2. Run `crm-ingest` to extract entities
3. Review the suggested actions
4. Execute the ones that make sense via `crm` commands
---
## CLI: `crm-digest`
Generates a daily summary of CRM activity.
```bash
# Human-readable digest
crm-digest
# JSON output
crm-digest --json
# Custom time range
crm-digest --lookback 7 --lookahead 14
```
**Includes:**
- Recent activity (new contacts, deals, interactions)
- Pipeline summary by stage
- Tasks due today / overdue
- Deals closing soon
- Contacts needing follow-up (14+ days inactive)
- Won deals this month
**For daily briefings:** Schedule via cron or include in morning heartbeat.
---
## Confirmation Flow
**ALWAYS confirm before:**
| Action | Threshold |
|--------|-----------|
| Create deal | value > $10,000 |
| Update deal | value > $10,000 |
| Change stage | β `won` or `lost` |
| Delete any record | Always |
**Flow pattern:**
```
User: "Mark the Replicate deal as won"
You: "β οΈ Confirm: Mark 'Replicate API License' ($50,000) as WON?
This will close the deal and log the win.
Reply 'yes' to confirm."
User: "yes"
You: [run: crm update-deal "Replicate" --stage won --reason "User confirmed close"]
"Done. Deal closed at $50K. π
Want me to create a follow-up task for invoicing?"
```
**Never auto-execute high-stakes actions.** Even if the user sounds certain, confirm first.
---
## Tips
1. **Be conversational.** User says "I just talked to Sarah" β you log the interaction
2. **Infer intelligently.** "Add Mike from Acme" β create contact with company=Acme
3. **Create follow-ups.** After logging a call, offer to create a task
4. **Summarize.** "What's my pipeline?" β run `crm pipeline` and present nicely
5. **Link things.** Deals to contacts, tasks to deals, interactions to everything
6. **Use ingest for bulk.** User pastes meeting notes β run through `crm-ingest` β execute sensible actions
7. **Daily digest.** Run `crm-digest` during morning heartbeat if CRM has data
8. **Check alerts.** Run `crm-notify` during heartbeat to catch overdue items
9. **Proactive follow-ups.** When `crm-notify` shows stale contacts, suggest reaching out
---
## CLI: `crm-notify`
Checks for items needing attention. Run from heartbeat or on-demand.
```bash
# All alerts
crm-notify
# JSON output
crm-notify --json
# Specific alert types
crm-notify --type overdue_task
crm-notify --type stale_contact
# Custom thresholds
crm-notify --stale-days 7 --closing-days 14 --stuck-days 30
```
**Alert types:**
- `overdue_task` β Tasks past due date
- `task_due_today` β Tasks due today
- `deal_closing_soon` β Deals with expected close within N days
- `stale_contact` β Contacts with open deals but no interaction in N days
- `deal_stuck` β Deals unchanged for N days
**Heartbeat integration:** Add to HEARTBEAT.md check cycle.
---
## CLI: `crm-webhook`
HTTP server for ingesting leads from external forms (Typeform, Tally, etc).
```bash
# Start server
crm-webhook --port 8901
# Endpoints:
# POST /lead β Create contact from form submission
# POST /contact β Alias for /lead
# GET /health β Health check
```
**Supported formats:**
- Typeform webhooks
- Tally webhooks
- Generic JSON with standard field names (name, email, phone, company)
**Example curl:**
```bash
curl -X POST http://localhost:8901/lead \
-H "Content-Type: application/json" \
-d '{"name": "Alex Rivera", "email": "alex@datastack.io", "company": "DataStack"}'
```
**Log file:** `~/.local/share/agent-crm/webhook.log`
---
## CLI: `crm-report`
Analytics and pipeline reports.
```bash
# Pipeline summary with forecast
crm-report pipeline
# Activity report (last 30 days)
crm-report activity --days 30
# Win/loss analysis
crm-report winloss --days 90
# JSON output
crm-report pipeline --json
```
**Pipeline report includes:**
- Deals by stage with weighted values
- Forecast by expected close month
- Top 10 deals by value
**Activity report includes:**
- Interactions by type
- Tasks created vs completed
- Deal stage movements
**Win/loss report includes:**
- Win rate percentage
- Average deal value (won vs lost)
- Average sales cycle length
---
---
## CLI: `crm-chart`
Generate visual charts from CRM data. Auto-bootstraps its own venv with matplotlib on first run.
```bash
crm-chart pipeline # Deal value by stage
crm-chart forecast # Expected closes by month
crm-chart activity # Interactions over time
crm-chart winloss # Won vs lost by month
crm-chart summary # Full dashboard
```
**Options:**
```bash
crm-chart forecast --months 6 # Forecast range
crm-chart activity --days 30 # Activity lookback
crm-chart pipeline --output /tmp/chart.png # Custom output
```
**Output:** JSON with path to PNG:
```json
{"status": "success", "chart": "pipeline", "path": "/Users/.../.local/share/agent-crm/charts/pipeline_20260208.png"}
```
**Sending to user:** Run the chart, then use `message` tool with `filePath` to send the PNG.
**Example flow:**
```
User: "Show me the pipeline"
You: [run crm-chart pipeline]
[send image via message tool with filePath]
```
---
## CLI: `crm-export`
Export CRM data to CSV or JSON.
```bash
crm-export contacts # Export contacts (JSON)
crm-export deals --format csv # Export deals (CSV)
crm-export all # Full database export
crm-export tasks --output /tmp # Custom output dir
```
**Export types:** `contacts`, `deals`, `interactions`, `tasks`, `all`
**Output:** Files saved to `~/.local/share/agent-crm/exports/`
---
## CLI: `crm-backup`
Database backup and restore.
```bash
# Create backup
crm-backup backup
crm-backup backup --note "Before big import"
# List backups
crm-backup list
# Restore (requires --confirm)
crm-backup restore # Restore latest
crm-backup restore /path/to/backup.db --confirm
# Prune old backups
crm-backup prune --keep 5
```
**Safety:** Restore always creates a safety backup first.
**Output:** Backups saved to `~/.local/share/agent-crm/backups/`
---
## Heartbeat Integration
For proactive CRM monitoring, add to `HEARTBEAT.md`:
```markdown
## CRM Check (every 4 hours)
If 4+ hours since last CRM check:
1. Run `crm-notify` to check for alerts
2. If high-priority alerts exist, message Tyrell
3. Update `lastCrmCheck` in `memory/heartbeat-state.json`
```
---
## Example Conversation
```
User: Met Alex Rivera at the startup mixer. She's founder of DataStack,
looking for AI consulting. Could be a $30K engagement.
You: Created:
β’ Contact: Alex Rivera (Founder @ DataStack)
Source: startup mixer
β’ Deal: "DataStack AI Consulting" β $30,000 (lead stage)
β’ Interaction: Met at startup mixer, interested in AI consulting
Want me to set a follow-up task?
User: Yeah, email her next week
You: Task created: "Email Alex Rivera" due Feb 15 (next Saturday)
```
schema.sql
3.6 KB
-- Agent CRM Schema
-- SQLite version (single-player)
-- Enable foreign keys
PRAGMA foreign_keys = ON;
-- Contacts
CREATE TABLE IF NOT EXISTS contacts (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
name TEXT NOT NULL,
email TEXT,
phone TEXT,
company TEXT,
role TEXT,
source TEXT,
tags TEXT, -- JSON array
notes TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_contacts_name ON contacts(name);
CREATE INDEX IF NOT EXISTS idx_contacts_company ON contacts(company);
CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email);
-- Companies (denormalized from contacts, but useful for rollups)
CREATE TABLE IF NOT EXISTS companies (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
name TEXT NOT NULL UNIQUE,
domain TEXT,
industry TEXT,
size TEXT,
notes TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
-- Deals
CREATE TABLE IF NOT EXISTS deals (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
contact_id TEXT REFERENCES contacts(id),
company_id TEXT REFERENCES companies(id),
title TEXT NOT NULL,
value REAL,
currency TEXT DEFAULT 'USD',
stage TEXT NOT NULL DEFAULT 'lead',
probability INTEGER,
expected_close TEXT,
notes TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
closed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_deals_stage ON deals(stage);
CREATE INDEX IF NOT EXISTS idx_deals_contact ON deals(contact_id);
-- Interactions
CREATE TABLE IF NOT EXISTS interactions (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
contact_id TEXT REFERENCES contacts(id),
deal_id TEXT REFERENCES deals(id),
type TEXT NOT NULL, -- email, call, meeting, note, linkedin, text
direction TEXT, -- inbound, outbound
summary TEXT NOT NULL,
raw_content TEXT,
occurred_at TEXT NOT NULL,
logged_at TEXT DEFAULT (datetime('now')),
logged_by TEXT DEFAULT 'agent'
);
CREATE INDEX IF NOT EXISTS idx_interactions_contact ON interactions(contact_id);
CREATE INDEX IF NOT EXISTS idx_interactions_occurred ON interactions(occurred_at);
-- Tasks
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
contact_id TEXT REFERENCES contacts(id),
deal_id TEXT REFERENCES deals(id),
title TEXT NOT NULL,
due_at TEXT,
completed_at TEXT,
priority TEXT DEFAULT 'normal',
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_at);
CREATE INDEX IF NOT EXISTS idx_tasks_completed ON tasks(completed_at);
-- Audit Log
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
table_name TEXT NOT NULL,
record_id TEXT NOT NULL,
action TEXT NOT NULL, -- INSERT, UPDATE, DELETE
old_values TEXT, -- JSON
new_values TEXT, -- JSON
reason TEXT,
conversation_ref TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_audit_record ON audit_log(table_name, record_id);
CREATE INDEX IF NOT EXISTS idx_audit_created ON audit_log(created_at);
crm-backup.py
6.7 KB
#!/usr/bin/env python3
"""
CRM Backup/Restore - Database backup and restore operations
Commands:
- backup: Create a timestamped backup of the database
- restore: Restore from a backup file
- list: List available backups
- prune: Remove old backups (keep N most recent)
"""
import argparse
import json
import os
import shutil
import sqlite3
from datetime import datetime
from pathlib import Path
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
BACKUP_DIR = os.environ.get('CRM_BACKUP_DIR', os.path.expanduser('~/.local/share/agent-crm/backups'))
def ensure_backup_dir():
"""Create backup directory if needed."""
Path(BACKUP_DIR).mkdir(parents=True, exist_ok=True)
def get_backup_files() -> list[dict]:
"""Get list of backup files with metadata."""
ensure_backup_dir()
backups = []
for f in Path(BACKUP_DIR).glob('crm_backup_*.db'):
stat = f.stat()
# Parse timestamp from filename
try:
ts_str = f.stem.replace('crm_backup_', '')
ts = datetime.strptime(ts_str, '%Y%m%d_%H%M%S')
except:
ts = datetime.fromtimestamp(stat.st_mtime)
backups.append({
'path': str(f),
'filename': f.name,
'size_bytes': stat.st_size,
'size_human': format_size(stat.st_size),
'created_at': ts.isoformat(),
'age_days': (datetime.now() - ts).days
})
return sorted(backups, key=lambda x: x['created_at'], reverse=True)
def format_size(bytes: int) -> str:
"""Format bytes as human-readable size."""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes < 1024:
return f'{bytes:.1f} {unit}'
bytes /= 1024
return f'{bytes:.1f} TB'
def backup_database(note: str = None) -> dict:
"""Create a backup of the database."""
if not Path(DB_PATH).exists():
return {'error': 'Database not found', 'path': DB_PATH}
ensure_backup_dir()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(BACKUP_DIR, f'crm_backup_{timestamp}.db')
# Use SQLite backup API for consistency
source = sqlite3.connect(DB_PATH)
dest = sqlite3.connect(backup_path)
source.backup(dest)
source.close()
dest.close()
# Get stats
stat = Path(backup_path).stat()
result = {
'status': 'success',
'path': backup_path,
'size': format_size(stat.st_size),
'timestamp': timestamp
}
if note:
# Save note alongside backup
note_path = backup_path + '.note'
with open(note_path, 'w') as f:
f.write(note)
result['note'] = note
return result
def restore_database(backup_path: str, confirm: bool = False) -> dict:
"""Restore database from backup."""
if not Path(backup_path).exists():
return {'error': 'Backup file not found', 'path': backup_path}
if not confirm:
return {
'status': 'confirmation_required',
'message': 'This will overwrite the current database. Pass --confirm to proceed.',
'backup_path': backup_path,
'current_db': DB_PATH
}
# Create safety backup first
if Path(DB_PATH).exists():
safety_backup = backup_database('Pre-restore safety backup')
if 'error' in safety_backup:
return {'error': 'Failed to create safety backup', 'details': safety_backup}
# Restore
shutil.copy2(backup_path, DB_PATH)
# Verify
try:
conn = sqlite3.connect(DB_PATH)
conn.execute("SELECT COUNT(*) FROM contacts")
conn.close()
except Exception as e:
return {'error': 'Restored database appears corrupt', 'details': str(e)}
return {
'status': 'success',
'message': 'Database restored successfully',
'restored_from': backup_path,
'safety_backup': safety_backup.get('path') if Path(DB_PATH).exists() else None
}
def list_backups() -> dict:
"""List all available backups."""
backups = get_backup_files()
# Load notes
for b in backups:
note_path = b['path'] + '.note'
if Path(note_path).exists():
b['note'] = Path(note_path).read_text().strip()
return {
'count': len(backups),
'backups': backups,
'backup_dir': BACKUP_DIR
}
def prune_backups(keep: int = 10) -> dict:
"""Remove old backups, keeping N most recent."""
backups = get_backup_files()
if len(backups) <= keep:
return {
'status': 'success',
'message': f'No pruning needed. {len(backups)} backups exist, keeping {keep}.',
'removed': 0
}
to_remove = backups[keep:]
removed = []
for b in to_remove:
try:
Path(b['path']).unlink()
# Also remove note if exists
note_path = b['path'] + '.note'
if Path(note_path).exists():
Path(note_path).unlink()
removed.append(b['filename'])
except Exception as e:
pass
return {
'status': 'success',
'kept': keep,
'removed': len(removed),
'removed_files': removed
}
def main():
parser = argparse.ArgumentParser(description='CRM backup and restore')
subparsers = parser.add_subparsers(dest='command', required=True)
# backup
p = subparsers.add_parser('backup', help='Create a backup')
p.add_argument('--note', '-n', help='Note to attach to backup')
# restore
p = subparsers.add_parser('restore', help='Restore from backup')
p.add_argument('path', nargs='?', help='Backup file path (uses latest if not specified)')
p.add_argument('--confirm', action='store_true', help='Confirm overwrite')
# list
p = subparsers.add_parser('list', help='List backups')
# prune
p = subparsers.add_parser('prune', help='Remove old backups')
p.add_argument('--keep', '-k', type=int, default=10, help='Number of backups to keep')
args = parser.parse_args()
if args.command == 'backup':
result = backup_database(args.note)
elif args.command == 'restore':
path = args.path
if not path:
# Use latest backup
backups = get_backup_files()
if not backups:
result = {'error': 'No backups found'}
else:
path = backups[0]['path']
result = restore_database(path, args.confirm)
else:
result = restore_database(path, args.confirm)
elif args.command == 'list':
result = list_backups()
elif args.command == 'prune':
result = prune_backups(args.keep)
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
crm-chart.py
18.6 KB
#!/usr/bin/env python3
"""
CRM Charts - Generate visual reports from CRM data
Charts:
- pipeline: Deal value by stage (horizontal bar)
- forecast: Expected closes by month (bar)
- activity: Interactions over time (line)
- funnel: Stage conversion funnel (funnel chart)
Output: PNG image suitable for sending via chat
"""
import argparse
import json
import os
import sqlite3
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.parent
VENV_DIR = SCRIPT_DIR / '.venv'
VENV_PYTHON = VENV_DIR / 'bin' / 'python'
def ensure_venv():
"""Ensure venv exists with matplotlib, re-exec if needed."""
# If we're already in the venv, continue
if sys.prefix != sys.base_prefix:
return True
# Check if venv exists
if not VENV_PYTHON.exists():
print(json.dumps({'status': 'installing', 'message': 'Setting up chart dependencies...'}))
subprocess.run([sys.executable, '-m', 'venv', str(VENV_DIR)], check=True)
subprocess.run([str(VENV_PYTHON), '-m', 'pip', 'install', '--quiet', 'matplotlib'], check=True)
# Re-exec with venv python
os.execv(str(VENV_PYTHON), [str(VENV_PYTHON)] + sys.argv)
# Ensure we're in venv before importing matplotlib
ensure_venv()
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
HAS_MATPLOTLIB = True
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
OUTPUT_DIR = os.environ.get('CRM_CHARTS_DIR', os.path.expanduser('~/.local/share/agent-crm/charts'))
# Color scheme
COLORS = {
'lead': '#94a3b8', # slate
'qualified': '#60a5fa', # blue
'proposal': '#a78bfa', # purple
'negotiation': '#fbbf24', # amber
'won': '#34d399', # green
'lost': '#f87171', # red
'primary': '#3b82f6', # blue
'secondary': '#8b5cf6', # violet
'accent': '#06b6d4', # cyan
}
STAGE_ORDER = ['lead', 'qualified', 'proposal', 'negotiation', 'won', 'lost']
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not Path(DB_PATH).exists():
return None
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_output_dir():
"""Create output directory if needed."""
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
def format_currency(value):
"""Format as currency."""
if value >= 1_000_000:
return f'${value/1_000_000:.1f}M'
elif value >= 1_000:
return f'${value/1_000:.0f}K'
else:
return f'${value:.0f}'
def chart_pipeline(output_path: str = None) -> str:
"""Generate pipeline chart - deals by stage."""
conn = get_db()
if not conn:
return None
rows = conn.execute("""
SELECT stage, COUNT(*) as count, COALESCE(SUM(value), 0) as total_value
FROM deals
WHERE stage NOT IN ('won', 'lost')
GROUP BY stage
""").fetchall()
conn.close()
if not rows:
return None
# Organize by stage order
data = {r['stage']: {'count': r['count'], 'value': r['total_value']} for r in rows}
stages = [s for s in STAGE_ORDER if s in data]
values = [data[s]['value'] for s in stages]
counts = [data[s]['count'] for s in stages]
colors = [COLORS.get(s, COLORS['primary']) for s in stages]
# Create chart
fig, ax = plt.subplots(figsize=(10, 5))
y_pos = range(len(stages))
bars = ax.barh(y_pos, values, color=colors, height=0.6)
# Labels
ax.set_yticks(y_pos)
ax.set_yticklabels([s.title() for s in stages], fontsize=12)
ax.invert_yaxis()
# Value labels on bars
for i, (bar, value, count) in enumerate(zip(bars, values, counts)):
width = bar.get_width()
label = f'{format_currency(value)} ({count} deal{"s" if count != 1 else ""})'
if width > max(values) * 0.3:
ax.text(width - max(values) * 0.02, bar.get_y() + bar.get_height()/2,
label, ha='right', va='center', color='white', fontweight='bold', fontsize=11)
else:
ax.text(width + max(values) * 0.02, bar.get_y() + bar.get_height()/2,
label, ha='left', va='center', color='#1f2937', fontsize=11)
# Styling
ax.set_xlabel('Deal Value', fontsize=12)
ax.set_title('Pipeline by Stage', fontsize=16, fontweight='bold', pad=20)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.set_xlim(0, max(values) * 1.3 if values else 1)
# Total annotation
total = sum(values)
ax.annotate(f'Total Pipeline: {format_currency(total)}',
xy=(0.98, 0.02), xycoords='axes fraction',
ha='right', va='bottom', fontsize=12, fontweight='bold',
color=COLORS['primary'])
plt.tight_layout()
# Save
ensure_output_dir()
if not output_path:
output_path = os.path.join(OUTPUT_DIR, f'pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close()
return output_path
def chart_forecast(months: int = 6, output_path: str = None) -> str:
"""Generate forecast chart - expected closes by month."""
conn = get_db()
if not conn:
return None
rows = conn.execute("""
SELECT
strftime('%Y-%m', expected_close) as month,
COUNT(*) as count,
COALESCE(SUM(value), 0) as total_value,
COALESCE(SUM(value * COALESCE(probability, 50) / 100.0), 0) as weighted_value
FROM deals
WHERE stage NOT IN ('won', 'lost')
AND expected_close IS NOT NULL
AND expected_close >= date('now')
GROUP BY month
ORDER BY month
LIMIT ?
""", (months,)).fetchall()
conn.close()
if not rows:
return None
months_list = [r['month'] for r in rows]
values = [r['total_value'] for r in rows]
weighted = [r['weighted_value'] for r in rows]
counts = [r['count'] for r in rows]
# Create chart
fig, ax = plt.subplots(figsize=(10, 5))
x = range(len(months_list))
width = 0.35
bars1 = ax.bar([i - width/2 for i in x], values, width, label='Total Value', color=COLORS['primary'], alpha=0.8)
bars2 = ax.bar([i + width/2 for i in x], weighted, width, label='Weighted Value', color=COLORS['secondary'], alpha=0.8)
# Labels
ax.set_xticks(x)
ax.set_xticklabels([datetime.strptime(m, '%Y-%m').strftime('%b %Y') for m in months_list], fontsize=11)
# Value labels
for bar, val, count in zip(bars1, values, counts):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(values) * 0.02,
f'{count}', ha='center', va='bottom', fontsize=10, color='#6b7280')
# Styling
ax.set_ylabel('Deal Value', fontsize=12)
ax.set_title('Forecast: Expected Closes by Month', fontsize=16, fontweight='bold', pad=20)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.legend(loc='upper right')
# Format y-axis as currency
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
plt.tight_layout()
# Save
ensure_output_dir()
if not output_path:
output_path = os.path.join(OUTPUT_DIR, f'forecast_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close()
return output_path
def chart_activity(days: int = 30, output_path: str = None) -> str:
"""Generate activity chart - interactions over time."""
conn = get_db()
if not conn:
return None
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
rows = conn.execute("""
SELECT
date(occurred_at) as day,
type,
COUNT(*) as count
FROM interactions
WHERE occurred_at >= ?
GROUP BY day, type
ORDER BY day
""", (since,)).fetchall()
conn.close()
if not rows:
return None
# Organize data by day and type
from collections import defaultdict
daily = defaultdict(lambda: defaultdict(int))
types = set()
for r in rows:
daily[r['day']][r['type']] = r['count']
types.add(r['type'])
days_list = sorted(daily.keys())
types = sorted(types)
# Create chart
fig, ax = plt.subplots(figsize=(12, 5))
type_colors = {
'email': COLORS['primary'],
'call': COLORS['secondary'],
'meeting': COLORS['accent'],
'note': '#94a3b8',
'linkedin': '#0077b5',
'text': '#25d366',
}
bottom = [0] * len(days_list)
for t in types:
values = [daily[d][t] for d in days_list]
color = type_colors.get(t, '#6b7280')
ax.bar(range(len(days_list)), values, bottom=bottom, label=t.title(), color=color, alpha=0.8)
bottom = [b + v for b, v in zip(bottom, values)]
# Labels
# Show every Nth label to avoid crowding
step = max(1, len(days_list) // 10)
ax.set_xticks(range(0, len(days_list), step))
ax.set_xticklabels([datetime.strptime(days_list[i], '%Y-%m-%d').strftime('%m/%d')
for i in range(0, len(days_list), step)], fontsize=10)
# Styling
ax.set_ylabel('Interactions', fontsize=12)
ax.set_title(f'Activity: Last {days} Days', fontsize=16, fontweight='bold', pad=20)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.legend(loc='upper left', ncol=len(types))
plt.tight_layout()
# Save
ensure_output_dir()
if not output_path:
output_path = os.path.join(OUTPUT_DIR, f'activity_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close()
return output_path
def chart_winloss(days: int = 90, output_path: str = None) -> str:
"""Generate win/loss chart."""
conn = get_db()
if not conn:
return None
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
rows = conn.execute("""
SELECT
strftime('%Y-%m', closed_at) as month,
stage,
COUNT(*) as count,
COALESCE(SUM(value), 0) as total_value
FROM deals
WHERE stage IN ('won', 'lost')
AND closed_at >= ?
GROUP BY month, stage
ORDER BY month
""", (since,)).fetchall()
conn.close()
if not rows:
return None
# Organize data
from collections import defaultdict
monthly = defaultdict(lambda: {'won': 0, 'lost': 0, 'won_count': 0, 'lost_count': 0})
for r in rows:
if r['stage'] == 'won':
monthly[r['month']]['won'] = r['total_value']
monthly[r['month']]['won_count'] = r['count']
else:
monthly[r['month']]['lost'] = r['total_value']
monthly[r['month']]['lost_count'] = r['count']
months_list = sorted(monthly.keys())
won_values = [monthly[m]['won'] for m in months_list]
lost_values = [monthly[m]['lost'] for m in months_list]
# Create chart
fig, ax = plt.subplots(figsize=(10, 5))
x = range(len(months_list))
width = 0.35
bars1 = ax.bar([i - width/2 for i in x], won_values, width, label='Won', color=COLORS['won'])
bars2 = ax.bar([i + width/2 for i in x], lost_values, width, label='Lost', color=COLORS['lost'])
# Labels
ax.set_xticks(x)
ax.set_xticklabels([datetime.strptime(m, '%Y-%m').strftime('%b %Y') for m in months_list], fontsize=11)
# Styling
ax.set_ylabel('Deal Value', fontsize=12)
ax.set_title('Win/Loss Analysis', fontsize=16, fontweight='bold', pad=20)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.legend(loc='upper right')
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
# Win rate annotation
total_won = sum(won_values)
total_lost = sum(lost_values)
if total_won + total_lost > 0:
win_rate = total_won / (total_won + total_lost) * 100
ax.annotate(f'Win Rate: {win_rate:.0f}%',
xy=(0.02, 0.98), xycoords='axes fraction',
ha='left', va='top', fontsize=12, fontweight='bold',
color=COLORS['won'])
plt.tight_layout()
# Save
ensure_output_dir()
if not output_path:
output_path = os.path.join(OUTPUT_DIR, f'winloss_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close()
return output_path
def chart_summary(output_path: str = None) -> str:
"""Generate summary dashboard with multiple metrics."""
conn = get_db()
if not conn:
return None
# Get all metrics
stats = {}
# Pipeline by stage
rows = conn.execute("""
SELECT stage, COUNT(*) as count, COALESCE(SUM(value), 0) as value
FROM deals WHERE stage NOT IN ('won', 'lost')
GROUP BY stage
""").fetchall()
stats['pipeline'] = {r['stage']: {'count': r['count'], 'value': r['value']} for r in rows}
# Won this month
month_start = datetime.now().replace(day=1).strftime('%Y-%m-%d')
row = conn.execute("""
SELECT COUNT(*) as count, COALESCE(SUM(value), 0) as value
FROM deals WHERE stage = 'won' AND closed_at >= ?
""", (month_start,)).fetchone()
stats['won_month'] = {'count': row['count'], 'value': row['value']}
# Tasks
row = conn.execute("""
SELECT
COUNT(*) FILTER (WHERE completed_at IS NULL) as pending,
COUNT(*) FILTER (WHERE completed_at IS NULL AND due_at < datetime('now')) as overdue
FROM tasks
""").fetchone()
stats['tasks'] = {'pending': row['pending'], 'overdue': row['overdue']}
# Contacts
row = conn.execute("SELECT COUNT(*) as count FROM contacts").fetchone()
stats['contacts'] = row['count']
conn.close()
# Create dashboard
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Pipeline pie chart
ax = axes[0, 0]
if stats['pipeline']:
stages = [s for s in STAGE_ORDER if s in stats['pipeline']]
values = [stats['pipeline'][s]['value'] for s in stages]
colors = [COLORS.get(s, '#6b7280') for s in stages]
wedges, texts, autotexts = ax.pie(values, labels=[s.title() for s in stages],
colors=colors, autopct='%1.0f%%',
pctdistance=0.75)
ax.set_title('Pipeline Distribution', fontsize=14, fontweight='bold')
else:
ax.text(0.5, 0.5, 'No pipeline data', ha='center', va='center', fontsize=14)
ax.set_title('Pipeline Distribution', fontsize=14, fontweight='bold')
# Key metrics
ax = axes[0, 1]
ax.axis('off')
total_pipeline = sum(d['value'] for d in stats['pipeline'].values())
metrics_text = f"""
π Key Metrics
Pipeline Value: {format_currency(total_pipeline)}
Open Deals: {sum(d['count'] for d in stats['pipeline'].values())}
Won This Month: {format_currency(stats['won_month']['value'])}
({stats['won_month']['count']} deals)
Contacts: {stats['contacts']}
Tasks Pending: {stats['tasks']['pending']}
Tasks Overdue: {stats['tasks']['overdue']}
"""
ax.text(0.1, 0.9, metrics_text, transform=ax.transAxes, fontsize=13,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='#f1f5f9', alpha=0.8))
# Stage breakdown bar
ax = axes[1, 0]
if stats['pipeline']:
stages = [s for s in STAGE_ORDER if s in stats['pipeline']]
values = [stats['pipeline'][s]['value'] for s in stages]
counts = [stats['pipeline'][s]['count'] for s in stages]
colors = [COLORS.get(s, '#6b7280') for s in stages]
bars = ax.barh(range(len(stages)), values, color=colors)
ax.set_yticks(range(len(stages)))
ax.set_yticklabels([s.title() for s in stages])
ax.invert_yaxis()
ax.set_title('Value by Stage', fontsize=14, fontweight='bold')
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: format_currency(x)))
for bar, count in zip(bars, counts):
ax.text(bar.get_width() + max(values) * 0.02, bar.get_y() + bar.get_height()/2,
f'{count}', va='center', fontsize=10)
# Tasks status
ax = axes[1, 1]
task_labels = ['Pending', 'Overdue']
task_values = [stats['tasks']['pending'] - stats['tasks']['overdue'], stats['tasks']['overdue']]
task_colors = [COLORS['primary'], COLORS['lost']]
if sum(task_values) > 0:
ax.pie(task_values, labels=task_labels, colors=task_colors, autopct='%1.0f%%',
startangle=90)
else:
ax.text(0.5, 0.5, 'No tasks', ha='center', va='center', fontsize=14)
ax.set_title('Task Status', fontsize=14, fontweight='bold')
plt.suptitle(f'CRM Dashboard β {datetime.now().strftime("%B %d, %Y")}',
fontsize=18, fontweight='bold', y=1.02)
plt.tight_layout()
# Save
ensure_output_dir()
if not output_path:
output_path = os.path.join(OUTPUT_DIR, f'summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close()
return output_path
def main():
parser = argparse.ArgumentParser(description='Generate CRM charts')
parser.add_argument('chart', choices=['pipeline', 'forecast', 'activity', 'winloss', 'summary'],
help='Chart type to generate')
parser.add_argument('--output', '-o', help='Output file path')
parser.add_argument('--days', '-d', type=int, default=30, help='Days to analyze (activity/winloss)')
parser.add_argument('--months', '-m', type=int, default=6, help='Months to forecast')
args = parser.parse_args()
chart_funcs = {
'pipeline': lambda: chart_pipeline(args.output),
'forecast': lambda: chart_forecast(args.months, args.output),
'activity': lambda: chart_activity(args.days, args.output),
'winloss': lambda: chart_winloss(args.days, args.output),
'summary': lambda: chart_summary(args.output),
}
output_path = chart_funcs[args.chart]()
if output_path:
print(json.dumps({'status': 'success', 'chart': args.chart, 'path': output_path}))
else:
print(json.dumps({'status': 'error', 'message': 'No data to chart or database not found'}))
sys.exit(1)
if __name__ == '__main__':
main()
crm-digest.py
10.0 KB
#!/usr/bin/env python3
"""
CRM Daily Digest - Summary of CRM activity and upcoming items
Generates a daily briefing with:
- Yesterday's activity
- Pipeline summary
- Tasks due today/overdue
- Contacts needing follow-up
- Deals closing soon
"""
import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not Path(DB_PATH).exists():
return None
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def generate_digest(lookback_days: int = 1, lookahead_days: int = 7) -> dict:
"""Generate the daily digest."""
conn = get_db()
if not conn:
return {'error': 'Database not found', 'path': DB_PATH}
now = datetime.now()
yesterday = (now - timedelta(days=lookback_days)).isoformat()
week_ahead = (now + timedelta(days=lookahead_days)).isoformat()
today_start = now.replace(hour=0, minute=0, second=0).isoformat()
today_end = now.replace(hour=23, minute=59, second=59).isoformat()
digest = {
'generated_at': now.isoformat(),
'period': {
'lookback_days': lookback_days,
'lookahead_days': lookahead_days
}
}
# Recent activity
activity = {}
# New contacts
rows = conn.execute("""
SELECT COUNT(*) as count FROM contacts WHERE created_at >= ?
""", (yesterday,)).fetchone()
activity['new_contacts'] = rows['count']
# New deals
rows = conn.execute("""
SELECT COUNT(*) as count, SUM(value) as total_value
FROM deals WHERE created_at >= ?
""", (yesterday,)).fetchone()
activity['new_deals'] = rows['count']
activity['new_deal_value'] = rows['total_value'] or 0
# Interactions logged
rows = conn.execute("""
SELECT type, COUNT(*) as count FROM interactions
WHERE logged_at >= ?
GROUP BY type
""", (yesterday,)).fetchall()
activity['interactions'] = {r['type']: r['count'] for r in rows}
activity['total_interactions'] = sum(r['count'] for r in rows)
# Tasks completed
rows = conn.execute("""
SELECT COUNT(*) as count FROM tasks WHERE completed_at >= ?
""", (yesterday,)).fetchone()
activity['tasks_completed'] = rows['count']
# Deal stage changes
rows = conn.execute("""
SELECT new_values, old_values FROM audit_log
WHERE table_name = 'deals' AND action = 'UPDATE' AND created_at >= ?
""", (yesterday,)).fetchall()
stage_changes = []
for r in rows:
try:
old = json.loads(r['old_values']) if r['old_values'] else {}
new = json.loads(r['new_values']) if r['new_values'] else {}
if 'stage' in new and old.get('stage') != new.get('stage'):
stage_changes.append({'from': old.get('stage'), 'to': new['stage']})
except:
pass
activity['deal_stage_changes'] = stage_changes
digest['recent_activity'] = activity
# Pipeline summary
rows = conn.execute("""
SELECT stage, COUNT(*) as count, SUM(value) as total_value
FROM deals WHERE stage NOT IN ('won', 'lost')
GROUP BY stage
ORDER BY CASE stage
WHEN 'lead' THEN 1
WHEN 'qualified' THEN 2
WHEN 'proposal' THEN 3
WHEN 'negotiation' THEN 4
END
""").fetchall()
pipeline = {
'stages': [dict(r) for r in rows],
'total_deals': sum(r['count'] for r in rows),
'total_value': sum(r['total_value'] or 0 for r in rows)
}
# Weighted pipeline
weighted = conn.execute("""
SELECT SUM(value * COALESCE(probability, 50) / 100.0) as weighted
FROM deals WHERE stage NOT IN ('won', 'lost')
""").fetchone()
pipeline['weighted_value'] = weighted['weighted'] or 0
digest['pipeline'] = pipeline
# Tasks due today
rows = conn.execute("""
SELECT t.*, c.name as contact_name FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL
AND t.due_at >= ? AND t.due_at <= ?
ORDER BY t.priority DESC, t.due_at ASC
""", (today_start, today_end)).fetchall()
digest['tasks_due_today'] = [dict(r) for r in rows]
# Overdue tasks
rows = conn.execute("""
SELECT t.*, c.name as contact_name FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL AND t.due_at < ?
ORDER BY t.due_at ASC
LIMIT 10
""", (today_start,)).fetchall()
digest['overdue_tasks'] = [dict(r) for r in rows]
# Deals closing soon
rows = conn.execute("""
SELECT d.*, c.name as contact_name FROM deals d
LEFT JOIN contacts c ON d.contact_id = c.id
WHERE d.stage NOT IN ('won', 'lost')
AND d.expected_close <= ?
ORDER BY d.expected_close ASC
LIMIT 5
""", (week_ahead,)).fetchall()
digest['deals_closing_soon'] = [dict(r) for r in rows]
# Contacts needing follow-up (no interaction in 14+ days)
stale_date = (now - timedelta(days=14)).isoformat()
rows = conn.execute("""
SELECT c.*, MAX(i.occurred_at) as last_interaction
FROM contacts c
LEFT JOIN interactions i ON c.id = i.contact_id
GROUP BY c.id
HAVING last_interaction < ? OR last_interaction IS NULL
ORDER BY last_interaction ASC
LIMIT 10
""", (stale_date,)).fetchall()
digest['needs_followup'] = [dict(r) for r in rows]
# Won deals this month
month_start = now.replace(day=1, hour=0, minute=0, second=0).isoformat()
rows = conn.execute("""
SELECT SUM(value) as total, COUNT(*) as count FROM deals
WHERE stage = 'won' AND closed_at >= ?
""", (month_start,)).fetchone()
digest['won_this_month'] = {
'count': rows['count'] or 0,
'value': rows['total'] or 0
}
conn.close()
return digest
def format_digest_text(digest: dict) -> str:
"""Format digest as human-readable text."""
if 'error' in digest:
return f"β {digest['error']}"
lines = []
lines.append(f"π **CRM Digest** β {datetime.now().strftime('%B %d, %Y')}")
lines.append("")
# Recent activity
act = digest['recent_activity']
if any([act['new_contacts'], act['total_interactions'], act['tasks_completed']]):
lines.append("**Recent Activity:**")
if act['new_contacts']:
lines.append(f"β’ {act['new_contacts']} new contact(s)")
if act['new_deals']:
lines.append(f"β’ {act['new_deals']} new deal(s) (${act['new_deal_value']:,.0f})")
if act['total_interactions']:
types = ', '.join(f"{v} {k}" for k, v in act['interactions'].items())
lines.append(f"β’ {act['total_interactions']} interaction(s) logged ({types})")
if act['tasks_completed']:
lines.append(f"β’ {act['tasks_completed']} task(s) completed")
if act['deal_stage_changes']:
for change in act['deal_stage_changes']:
lines.append(f"β’ Deal moved: {change['from']} β {change['to']}")
lines.append("")
# Pipeline
pipe = digest['pipeline']
if pipe['total_deals']:
lines.append("**Pipeline:**")
for stage in pipe['stages']:
val = f"${stage['total_value']:,.0f}" if stage['total_value'] else "$0"
lines.append(f"β’ {stage['stage'].title()}: {stage['count']} deal(s) ({val})")
lines.append(f"β’ **Total:** {pipe['total_deals']} deals, ${pipe['total_value']:,.0f}")
lines.append(f"β’ **Weighted:** ${pipe['weighted_value']:,.0f}")
lines.append("")
# Tasks
if digest['overdue_tasks']:
lines.append("**β οΈ Overdue Tasks:**")
for task in digest['overdue_tasks'][:5]:
contact = f" ({task['contact_name']})" if task['contact_name'] else ""
lines.append(f"β’ {task['title']}{contact}")
lines.append("")
if digest['tasks_due_today']:
lines.append("**Today's Tasks:**")
for task in digest['tasks_due_today']:
contact = f" ({task['contact_name']})" if task['contact_name'] else ""
lines.append(f"β’ {task['title']}{contact}")
lines.append("")
# Deals closing soon
if digest['deals_closing_soon']:
lines.append("**Deals Closing Soon:**")
for deal in digest['deals_closing_soon']:
val = f"${deal['value']:,.0f}" if deal['value'] else "TBD"
contact = f" - {deal['contact_name']}" if deal['contact_name'] else ""
lines.append(f"β’ {deal['title']} ({val}){contact} β {deal['expected_close']}")
lines.append("")
# Follow-ups needed
if digest['needs_followup']:
lines.append("**Needs Follow-up (14+ days):**")
for contact in digest['needs_followup'][:5]:
company = f" @ {contact['company']}" if contact['company'] else ""
lines.append(f"β’ {contact['name']}{company}")
lines.append("")
# Won this month
won = digest['won_this_month']
if won['count']:
lines.append(f"**Won This Month:** {won['count']} deal(s) β ${won['value']:,.0f} π")
if len(lines) <= 2:
lines.append("No activity to report. Database may be empty.")
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description='Generate CRM daily digest')
parser.add_argument('--json', action='store_true', help='Output as JSON')
parser.add_argument('--lookback', '-l', type=int, default=1, help='Days to look back')
parser.add_argument('--lookahead', '-a', type=int, default=7, help='Days to look ahead')
args = parser.parse_args()
digest = generate_digest(args.lookback, args.lookahead)
if args.json:
print(json.dumps(digest, indent=2))
else:
print(format_digest_text(digest))
if __name__ == '__main__':
main()
crm-export.py
6.8 KB
#!/usr/bin/env python3
"""
CRM Export - Export data to CSV/JSON formats
Exports:
- contacts: All contacts
- deals: All deals with contact info
- interactions: All interactions
- tasks: All tasks
- all: Complete database dump
"""
import argparse
import csv
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
EXPORT_DIR = os.environ.get('CRM_EXPORT_DIR', os.path.expanduser('~/.local/share/agent-crm/exports'))
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not Path(DB_PATH).exists():
return None
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_export_dir():
"""Create export directory if needed."""
Path(EXPORT_DIR).mkdir(parents=True, exist_ok=True)
def export_table(conn, table: str, query: str = None) -> list[dict]:
"""Export a table to list of dicts."""
if query is None:
query = f"SELECT * FROM {table}"
rows = conn.execute(query).fetchall()
return [dict(r) for r in rows]
def to_csv(data: list[dict], filepath: str):
"""Write data to CSV."""
if not data:
return
with open(filepath, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
def to_json(data: list[dict], filepath: str):
"""Write data to JSON."""
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, default=str)
def export_contacts(conn, format: str, output_dir: str) -> str:
"""Export contacts."""
data = export_table(conn, 'contacts')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'csv':
filepath = os.path.join(output_dir, f'contacts_{timestamp}.csv')
to_csv(data, filepath)
else:
filepath = os.path.join(output_dir, f'contacts_{timestamp}.json')
to_json(data, filepath)
return filepath
def export_deals(conn, format: str, output_dir: str) -> str:
"""Export deals with contact info."""
query = """
SELECT d.*, c.name as contact_name, c.email as contact_email, c.company as contact_company
FROM deals d
LEFT JOIN contacts c ON d.contact_id = c.id
ORDER BY d.created_at DESC
"""
data = export_table(conn, 'deals', query)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'csv':
filepath = os.path.join(output_dir, f'deals_{timestamp}.csv')
to_csv(data, filepath)
else:
filepath = os.path.join(output_dir, f'deals_{timestamp}.json')
to_json(data, filepath)
return filepath
def export_interactions(conn, format: str, output_dir: str) -> str:
"""Export interactions with contact info."""
query = """
SELECT i.*, c.name as contact_name, c.company as contact_company
FROM interactions i
LEFT JOIN contacts c ON i.contact_id = c.id
ORDER BY i.occurred_at DESC
"""
data = export_table(conn, 'interactions', query)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'csv':
filepath = os.path.join(output_dir, f'interactions_{timestamp}.csv')
to_csv(data, filepath)
else:
filepath = os.path.join(output_dir, f'interactions_{timestamp}.json')
to_json(data, filepath)
return filepath
def export_tasks(conn, format: str, output_dir: str) -> str:
"""Export tasks with contact info."""
query = """
SELECT t.*, c.name as contact_name, d.title as deal_title
FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
LEFT JOIN deals d ON t.deal_id = d.id
ORDER BY t.due_at ASC
"""
data = export_table(conn, 'tasks', query)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'csv':
filepath = os.path.join(output_dir, f'tasks_{timestamp}.csv')
to_csv(data, filepath)
else:
filepath = os.path.join(output_dir, f'tasks_{timestamp}.json')
to_json(data, filepath)
return filepath
def export_all(conn, format: str, output_dir: str) -> dict:
"""Export everything."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'json':
# Single JSON file with all data
data = {
'exported_at': datetime.now().isoformat(),
'contacts': export_table(conn, 'contacts'),
'deals': export_table(conn, 'deals', """
SELECT d.*, c.name as contact_name
FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
"""),
'interactions': export_table(conn, 'interactions', """
SELECT i.*, c.name as contact_name
FROM interactions i LEFT JOIN contacts c ON i.contact_id = c.id
"""),
'tasks': export_table(conn, 'tasks', """
SELECT t.*, c.name as contact_name
FROM tasks t LEFT JOIN contacts c ON t.contact_id = c.id
"""),
'audit_log': export_table(conn, 'audit_log')
}
filepath = os.path.join(output_dir, f'crm_export_{timestamp}.json')
to_json(data, filepath)
return {'format': 'json', 'path': filepath}
else:
# Multiple CSV files
files = []
files.append(export_contacts(conn, 'csv', output_dir))
files.append(export_deals(conn, 'csv', output_dir))
files.append(export_interactions(conn, 'csv', output_dir))
files.append(export_tasks(conn, 'csv', output_dir))
return {'format': 'csv', 'files': files}
def main():
parser = argparse.ArgumentParser(description='Export CRM data')
parser.add_argument('what', choices=['contacts', 'deals', 'interactions', 'tasks', 'all'],
help='What to export')
parser.add_argument('--format', '-f', choices=['csv', 'json'], default='json',
help='Export format')
parser.add_argument('--output', '-o', help='Output directory')
args = parser.parse_args()
conn = get_db()
if not conn:
print(json.dumps({'error': 'Database not found', 'path': DB_PATH}))
return
output_dir = args.output or EXPORT_DIR
ensure_export_dir()
Path(output_dir).mkdir(parents=True, exist_ok=True)
exporters = {
'contacts': export_contacts,
'deals': export_deals,
'interactions': export_interactions,
'tasks': export_tasks,
'all': export_all,
}
result = exporters[args.what](conn, args.format, output_dir)
conn.close()
if isinstance(result, dict):
print(json.dumps({'status': 'success', **result}))
else:
print(json.dumps({'status': 'success', 'path': result}))
if __name__ == '__main__':
main()
crm-ingest.py
16.4 KB
#!/usr/bin/env python3
"""
CRM Ingest - Parse unstructured text into CRM actions
Takes emails, meeting notes, voice transcripts and extracts:
- New contacts
- Interactions to log
- Deal updates
- Tasks to create
Outputs a JSON plan for agent review before execution.
"""
import argparse
import json
import re
import sys
from datetime import datetime
from pathlib import Path
def extract_emails(text: str) -> list[str]:
"""Extract email addresses from text."""
pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
return list(set(re.findall(pattern, text)))
def extract_phones(text: str) -> list[str]:
"""Extract phone numbers from text."""
patterns = [
r'\+?1?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}',
r'\+[0-9]{1,3}[-.\s]?[0-9]{6,14}'
]
phones = []
for pattern in patterns:
phones.extend(re.findall(pattern, text))
return list(set(phones))
def extract_money(text: str) -> list[dict]:
"""Extract monetary amounts."""
patterns = [
(r'\$([0-9,]+(?:\.[0-9]{2})?)\s*(?:k|K)', lambda m: float(m.group(1).replace(',', '')) * 1000),
(r'\$([0-9,]+(?:\.[0-9]{2})?)\s*(?:m|M)', lambda m: float(m.group(1).replace(',', '')) * 1000000),
(r'\$([0-9,]+(?:\.[0-9]{2})?)', lambda m: float(m.group(1).replace(',', ''))),
(r'([0-9,]+(?:\.[0-9]{2})?)\s*(?:dollars|USD)', lambda m: float(m.group(1).replace(',', ''))),
]
amounts = []
for pattern, converter in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
amounts.append({
'raw': match.group(0),
'value': converter(match),
'currency': 'USD'
})
return amounts
def extract_dates(text: str) -> list[str]:
"""Extract date references."""
patterns = [
r'(?:next|this)\s+(?:monday|tuesday|wednesday|thursday|friday|saturday|sunday)',
r'(?:next|this)\s+week',
r'(?:next|this)\s+month',
r'(?:in\s+)?[0-9]+\s+(?:day|week|month)s?(?:\s+from\s+now)?',
r'(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+[0-9]{1,2}(?:st|nd|rd|th)?(?:\s*,?\s*[0-9]{4})?',
r'[0-9]{1,2}/[0-9]{1,2}(?:/[0-9]{2,4})?',
r'tomorrow',
r'today',
r'end of (?:week|month|quarter|year)',
]
dates = []
for pattern in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
dates.append(match.group(0).strip())
return list(set(dates))
def extract_names(text: str) -> list[dict]:
"""Extract potential person names with context."""
names = []
seen = set()
# Common false positives to filter out
stop_words = {
'I', 'We', 'They', 'He', 'She', 'The', 'This', 'That', 'What', 'When',
'Where', 'How', 'If', 'But', 'And', 'For', 'Just', 'Also', 'Very',
'Really', 'Thanks', 'Thank', 'Hello', 'Hi', 'Hey', 'Best', 'Regards',
'Sincerely', 'Cheers', 'Dear', 'To', 'From', 'Subject', 'Re', 'Fwd'
}
def add_name(name: str, role: str = None, company: str = None):
name = name.strip()
if not name or name in stop_words or len(name) <= 2:
return
# Filter out single words that are likely not names
if ' ' not in name and len(name) < 4:
return
# If we've seen this name, update with new info (role/company)
if name in seen:
for entry in names:
if entry['name'] == name:
if role and 'role' not in entry:
entry['role'] = role
if company and 'company' not in entry:
entry['company'] = company
return
entry = {'name': name}
if role:
entry['role'] = role
if company:
entry['company'] = company
names.append(entry)
seen.add(name)
# Pattern 1: Name followed by "Title, Company" on next line (most specific first)
# "Alex Rivera\nFounder, DataStack"
title_company_pattern = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*\n\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+|[A-Z][a-z]+ Engineer|[A-Z][a-z]+ Manager)[,\s]+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
for match in re.finditer(title_company_pattern, text, re.MULTILINE):
name = match.group(1)
role = match.group(2).strip()
company = match.group(3).strip()
add_name(name, role=role, company=company)
# Pattern 1b: Name followed by just title on next line (no company)
title_pattern = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*\n\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+)\s*$'
for match in re.finditer(title_pattern, text, re.MULTILINE):
name = match.group(1)
role = match.group(2).strip()
add_name(name, role=role)
# Pattern 2: Email signature - name on line after closing
# "Best,\nAlex Rivera" or "Thanks,\nJohn Smith"
sig_pattern = r'(?:Best|Thanks|Regards|Cheers|Sincerely|Warmly|Yours)[,.]?\s*\n+([A-Z][a-z]+\s+[A-Z][a-z]+)'
for match in re.finditer(sig_pattern, text, re.MULTILINE):
add_name(match.group(1))
# Pattern 3: "Name, Title at Company" or "Name, Title"
inline_title = r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*,\s*(CEO|CTO|COO|CFO|VP|Director|Manager|Founder|Co-Founder|President|Head of [A-Za-z]+|[A-Z][a-z]+ Engineer|[A-Z][a-z]+ Manager)(?:\s+(?:at|@)\s+([A-Z][A-Za-z]+))?'
for match in re.finditer(inline_title, text):
name = match.group(1)
role = match.group(2)
company = match.group(3) if len(match.groups()) > 2 else None
add_name(name, role=role, company=company)
# Pattern 4: "talked to/met with/spoke with Name"
action_pattern = r'(?:talked to|met with|spoke with|call with|meeting with|heard from|email from|message from)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)'
for match in re.finditer(action_pattern, text, re.IGNORECASE):
add_name(match.group(1))
# Pattern 5: "Name from/at Company" - full two-word name
company_pattern = r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+(?:from|at|with|@)\s+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
for match in re.finditer(company_pattern, text):
name = match.group(1)
company = match.group(2)
add_name(name, company=company)
# Pattern 5b: "FirstName from/at Company" - single name with company context
company_pattern_single = r'\b([A-Z][a-z]+)\s+(?:from|at|with|@)\s+([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z]+)?)'
for match in re.finditer(company_pattern_single, text):
name = match.group(1)
company = match.group(2)
if name not in stop_words and len(name) >= 4:
add_name(name, company=company)
# Pattern 6: Email "From:" header with name
from_pattern = r'From:\s*(?:"?([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)"?\s*<|([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+[<\[])'
for match in re.finditer(from_pattern, text):
name = match.group(1) or match.group(2)
if name:
add_name(name)
# Pattern 7: Standalone two-word capitalized name on its own line (likely signature)
standalone = r'^([A-Z][a-z]+\s+[A-Z][a-z]+)\s*$'
lines = text.split('\n')
for i, line in enumerate(lines):
match = re.match(standalone, line.strip())
if match:
name = match.group(1)
# Check if next line looks like a title or company
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
if re.match(r'^(CEO|CTO|COO|CFO|VP|Director|Founder|Manager|Engineer|Head|President)', next_line):
add_name(name, role=next_line.split(',')[0].strip())
elif re.match(r'^[A-Z]', next_line) and len(next_line) < 50:
add_name(name, company=next_line.split(',')[0].strip())
else:
add_name(name)
else:
add_name(name)
return names
def extract_companies(text: str) -> list[str]:
"""Extract company names."""
# Look for patterns like "at Acme Corp" or "from TechCo"
patterns = [
r'(?:at|from|with|for)\s+([A-Z][a-zA-Z]+(?:\s+(?:Corp|Inc|LLC|Ltd|Co|Labs?|AI|Tech|Software|Systems|Solutions))?)',
r'([A-Z][a-zA-Z]+(?:\.(?:com|io|ai|co)))',
]
companies = []
seen = set()
# Common false positives
stop_words = {'I', 'We', 'They', 'He', 'She', 'The', 'This', 'That', 'What', 'When', 'Where', 'How', 'If', 'But', 'And', 'For', 'Just', 'Also', 'Very', 'Really', 'Thanks', 'Thank', 'Hello', 'Hi', 'Hey'}
for pattern in patterns:
for match in re.finditer(pattern, text):
company = match.group(1).strip()
if company not in seen and company not in stop_words and len(company) > 2:
companies.append(company)
seen.add(company)
return companies
def detect_interaction_type(text: str) -> str:
"""Detect the type of interaction from text."""
text_lower = text.lower()
if any(x in text_lower for x in ['subject:', 'from:', 'to:', 're:', 'fwd:']):
return 'email'
if any(x in text_lower for x in ['called', 'call with', 'phone call', 'spoke with', 'talked to']):
return 'call'
if any(x in text_lower for x in ['meeting', 'met with', 'conference', 'zoom', 'teams']):
return 'meeting'
if any(x in text_lower for x in ['linkedin', 'connected on']):
return 'linkedin'
if any(x in text_lower for x in ['texted', 'sms', 'imessage', 'whatsapp']):
return 'text'
return 'note'
def detect_deal_signals(text: str) -> dict:
"""Detect signals about deals."""
text_lower = text.lower()
signals = {
'stage_hints': [],
'positive': [],
'negative': [],
'actions': []
}
# Stage hints
if any(x in text_lower for x in ['interested', 'wants to learn more', 'requested demo']):
signals['stage_hints'].append('qualified')
if any(x in text_lower for x in ['sent proposal', 'sending quote', 'pricing']):
signals['stage_hints'].append('proposal')
if any(x in text_lower for x in ['negotiating', 'reviewing contract', 'legal review', 'redlines']):
signals['stage_hints'].append('negotiation')
if any(x in text_lower for x in ['signed', 'closed', 'won', 'agreed', 'confirmed']):
signals['stage_hints'].append('won')
if any(x in text_lower for x in ['passed', 'declined', 'lost', 'went with competitor', 'no budget']):
signals['stage_hints'].append('lost')
# Positive signals
if any(x in text_lower for x in ['excited', 'very interested', 'love it', 'great fit', 'impressed']):
signals['positive'].append('high_interest')
if any(x in text_lower for x in ['decision maker', 'can approve', 'has budget']):
signals['positive'].append('authority')
# Negative signals
if any(x in text_lower for x in ['not a priority', 'maybe later', 'next quarter', 'no budget']):
signals['negative'].append('timing_issue')
if any(x in text_lower for x in ['need to check', 'run it by', 'get approval']):
signals['negative'].append('no_authority')
# Follow-up actions
if any(x in text_lower for x in ['will send', 'sending', 'follow up', 'get back to']):
signals['actions'].append('follow_up_needed')
if any(x in text_lower for x in ['schedule', 'set up a', 'book a']):
signals['actions'].append('meeting_to_schedule')
return signals
def extract_tasks(text: str) -> list[dict]:
"""Extract potential tasks/action items."""
tasks = []
# Common action patterns
patterns = [
r'(?:need to|should|will|must|have to)\s+([^.!?\n]+)',
r'(?:follow up|send|schedule|call|email|review|check)\s+([^.!?\n]+)',
r'(?:action item|todo|task):\s*([^.!?\n]+)',
r'(?:by|before|due)\s+((?:monday|tuesday|wednesday|thursday|friday|next week|tomorrow|[a-z]+ [0-9]+)[^.!?\n]*)',
]
for pattern in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
task_text = match.group(1).strip()
if len(task_text) > 10 and len(task_text) < 200:
tasks.append({
'title': task_text[:100],
'source': match.group(0)
})
return tasks[:5] # Limit to 5 tasks
def parse_email(text: str) -> dict:
"""Parse email-specific structure."""
result = {
'from': None,
'to': None,
'subject': None,
'date': None,
'body': text
}
lines = text.split('\n')
body_start = 0
for i, line in enumerate(lines):
if line.lower().startswith('from:'):
result['from'] = line[5:].strip()
body_start = i + 1
elif line.lower().startswith('to:'):
result['to'] = line[3:].strip()
body_start = i + 1
elif line.lower().startswith('subject:'):
result['subject'] = line[8:].strip()
body_start = i + 1
elif line.lower().startswith('date:'):
result['date'] = line[5:].strip()
body_start = i + 1
elif line.strip() == '' and body_start > 0:
result['body'] = '\n'.join(lines[i+1:])
break
return result
def ingest(text: str, source_type: str = 'auto') -> dict:
"""Main ingestion function - extract all structured data from text."""
if source_type == 'auto':
source_type = detect_interaction_type(text)
# Parse email structure if applicable
email_data = None
if source_type == 'email':
email_data = parse_email(text)
# Extract all entities
plan = {
'source_type': source_type,
'extracted_at': datetime.now().isoformat(),
'raw_length': len(text),
'contacts': {
'names': extract_names(text),
'emails': extract_emails(text),
'phones': extract_phones(text),
'companies': extract_companies(text)
},
'interaction': {
'type': source_type,
'direction': 'inbound' if email_data and email_data.get('to') else 'outbound',
'summary': None, # Agent should generate this
'occurred_at': email_data.get('date') if email_data else 'today'
},
'deal_signals': detect_deal_signals(text),
'money': extract_money(text),
'dates': extract_dates(text),
'potential_tasks': extract_tasks(text),
'email_metadata': email_data,
'suggested_actions': []
}
# Generate suggested actions
if plan['contacts']['names']:
for name_info in plan['contacts']['names']:
plan['suggested_actions'].append({
'action': 'create_or_update_contact',
'data': name_info
})
if plan['money']:
plan['suggested_actions'].append({
'action': 'create_or_update_deal',
'data': {
'value': plan['money'][0]['value'],
'signals': plan['deal_signals']
}
})
if plan['deal_signals']['stage_hints']:
plan['suggested_actions'].append({
'action': 'update_deal_stage',
'data': {
'suggested_stage': plan['deal_signals']['stage_hints'][0]
}
})
plan['suggested_actions'].append({
'action': 'log_interaction',
'data': {
'type': source_type,
'needs_summary': True
}
})
for task in plan['potential_tasks'][:3]:
plan['suggested_actions'].append({
'action': 'create_task',
'data': task
})
return plan
def main():
parser = argparse.ArgumentParser(description='Parse unstructured text into CRM actions')
parser.add_argument('--type', '-t', choices=['auto', 'email', 'call', 'meeting', 'note'],
default='auto', help='Source type')
parser.add_argument('--file', '-f', help='Read from file instead of stdin')
parser.add_argument('--text', help='Text to parse (alternative to stdin/file)')
args = parser.parse_args()
# Get input text
if args.text:
text = args.text
elif args.file:
text = Path(args.file).read_text()
else:
text = sys.stdin.read()
if not text.strip():
print(json.dumps({'error': 'No input text provided'}))
sys.exit(1)
result = ingest(text, args.type)
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
crm-notify.py
9.3 KB
#!/usr/bin/env python3
"""
CRM Notify - Check for items needing attention
Returns actionable alerts:
- Overdue tasks
- Tasks due today
- Deals closing soon (within N days)
- Stale contacts (no interaction in N days)
- Deals stuck in stage too long
Designed to be run from heartbeat/cron and output alerts for the agent to act on.
"""
import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not Path(DB_PATH).exists():
return None
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def check_alerts(
stale_days: int = 14,
closing_days: int = 7,
stuck_days: int = 21
) -> dict:
"""Check for all alert conditions."""
conn = get_db()
if not conn:
return {'error': 'Database not found', 'alerts': []}
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0).isoformat()
today_end = now.replace(hour=23, minute=59, second=59).isoformat()
alerts = []
# Overdue tasks
rows = conn.execute("""
SELECT t.*, c.name as contact_name FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL AND t.due_at < ?
ORDER BY t.due_at ASC
""", (today_start,)).fetchall()
for row in rows:
due = datetime.fromisoformat(row['due_at']) if row['due_at'] else None
days_overdue = (now - due).days if due else 0
alerts.append({
'type': 'overdue_task',
'priority': 'high' if days_overdue > 3 else 'medium',
'task_id': row['id'],
'title': row['title'],
'contact': row['contact_name'],
'due_at': row['due_at'],
'days_overdue': days_overdue,
'message': f"β οΈ Task overdue ({days_overdue}d): {row['title']}" +
(f" ({row['contact_name']})" if row['contact_name'] else "")
})
# Tasks due today
rows = conn.execute("""
SELECT t.*, c.name as contact_name FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL
AND t.due_at >= ? AND t.due_at <= ?
ORDER BY t.priority DESC, t.due_at ASC
""", (today_start, today_end)).fetchall()
for row in rows:
alerts.append({
'type': 'task_due_today',
'priority': row['priority'] or 'normal',
'task_id': row['id'],
'title': row['title'],
'contact': row['contact_name'],
'due_at': row['due_at'],
'message': f"π Due today: {row['title']}" +
(f" ({row['contact_name']})" if row['contact_name'] else "")
})
# Deals closing soon
closing_threshold = (now + timedelta(days=closing_days)).isoformat()
rows = conn.execute("""
SELECT d.*, c.name as contact_name FROM deals d
LEFT JOIN contacts c ON d.contact_id = c.id
WHERE d.stage NOT IN ('won', 'lost')
AND d.expected_close IS NOT NULL
AND d.expected_close <= ?
ORDER BY d.expected_close ASC
""", (closing_threshold,)).fetchall()
for row in rows:
close_date = datetime.fromisoformat(row['expected_close']) if row['expected_close'] else None
days_until = (close_date - now).days if close_date else 0
value_str = f"${row['value']:,.0f}" if row['value'] else "TBD"
alerts.append({
'type': 'deal_closing_soon',
'priority': 'high' if days_until <= 3 else 'medium',
'deal_id': row['id'],
'title': row['title'],
'value': row['value'],
'contact': row['contact_name'],
'expected_close': row['expected_close'],
'days_until_close': days_until,
'message': f"π° Deal closing in {days_until}d: {row['title']} ({value_str})"
})
# Stale contacts (no interaction in N days, but have a deal)
stale_threshold = (now - timedelta(days=stale_days)).isoformat()
rows = conn.execute("""
SELECT c.*, d.title as deal_title, d.value as deal_value, d.stage as deal_stage,
MAX(i.occurred_at) as last_interaction
FROM contacts c
JOIN deals d ON c.id = d.contact_id AND d.stage NOT IN ('won', 'lost')
LEFT JOIN interactions i ON c.id = i.contact_id
GROUP BY c.id
HAVING last_interaction < ? OR last_interaction IS NULL
ORDER BY d.value DESC NULLS LAST
""", (stale_threshold,)).fetchall()
for row in rows:
last = datetime.fromisoformat(row['last_interaction']) if row['last_interaction'] else None
days_stale = (now - last).days if last else 999
alerts.append({
'type': 'stale_contact',
'priority': 'medium',
'contact_id': row['id'],
'name': row['name'],
'company': row['company'],
'deal': row['deal_title'],
'deal_value': row['deal_value'],
'last_interaction': row['last_interaction'],
'days_since_contact': days_stale,
'message': f"π No contact in {days_stale}d: {row['name']}" +
(f" ({row['deal_title']})" if row['deal_title'] else "")
})
# Deals stuck in stage
stuck_threshold = (now - timedelta(days=stuck_days)).isoformat()
rows = conn.execute("""
SELECT d.*, c.name as contact_name FROM deals d
LEFT JOIN contacts c ON d.contact_id = c.id
WHERE d.stage NOT IN ('won', 'lost', 'lead')
AND d.updated_at < ?
ORDER BY d.value DESC NULLS LAST
""", (stuck_threshold,)).fetchall()
for row in rows:
updated = datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None
days_stuck = (now - updated).days if updated else 0
value_str = f"${row['value']:,.0f}" if row['value'] else "TBD"
alerts.append({
'type': 'deal_stuck',
'priority': 'low',
'deal_id': row['id'],
'title': row['title'],
'value': row['value'],
'stage': row['stage'],
'days_in_stage': days_stuck,
'message': f"π Deal stuck {days_stuck}d in {row['stage']}: {row['title']} ({value_str})"
})
conn.close()
# Sort by priority
priority_order = {'high': 0, 'medium': 1, 'normal': 2, 'low': 3}
alerts.sort(key=lambda x: priority_order.get(x.get('priority', 'normal'), 2))
return {
'checked_at': now.isoformat(),
'total_alerts': len(alerts),
'by_type': {
'overdue_tasks': len([a for a in alerts if a['type'] == 'overdue_task']),
'tasks_due_today': len([a for a in alerts if a['type'] == 'task_due_today']),
'deals_closing_soon': len([a for a in alerts if a['type'] == 'deal_closing_soon']),
'stale_contacts': len([a for a in alerts if a['type'] == 'stale_contact']),
'deals_stuck': len([a for a in alerts if a['type'] == 'deal_stuck'])
},
'alerts': alerts
}
def format_alerts_text(result: dict) -> str:
"""Format alerts as human-readable text."""
if 'error' in result:
return f"β {result['error']}"
if not result['alerts']:
return "β
No CRM alerts. All clear!"
lines = []
lines.append(f"π **CRM Alerts** ({result['total_alerts']} items)")
lines.append("")
# Group by type
current_type = None
type_labels = {
'overdue_task': 'β οΈ Overdue Tasks',
'task_due_today': 'π Due Today',
'deal_closing_soon': 'π° Deals Closing Soon',
'stale_contact': 'π Needs Follow-up',
'deal_stuck': 'π Stuck Deals'
}
for alert in result['alerts']:
if alert['type'] != current_type:
if current_type is not None:
lines.append("")
current_type = alert['type']
lines.append(f"**{type_labels.get(current_type, current_type)}:**")
lines.append(f"β’ {alert['message']}")
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description='Check CRM for items needing attention')
parser.add_argument('--json', action='store_true', help='Output as JSON')
parser.add_argument('--stale-days', type=int, default=14, help='Days before contact is stale')
parser.add_argument('--closing-days', type=int, default=7, help='Days to look ahead for closing deals')
parser.add_argument('--stuck-days', type=int, default=21, help='Days before deal is considered stuck')
parser.add_argument('--type', '-t', choices=['overdue_task', 'task_due_today', 'deal_closing_soon', 'stale_contact', 'deal_stuck'],
help='Filter to specific alert type')
args = parser.parse_args()
result = check_alerts(args.stale_days, args.closing_days, args.stuck_days)
# Filter by type if specified
if args.type and 'alerts' in result:
result['alerts'] = [a for a in result['alerts'] if a['type'] == args.type]
result['total_alerts'] = len(result['alerts'])
if args.json:
print(json.dumps(result, indent=2))
else:
print(format_alerts_text(result))
if __name__ == '__main__':
main()
crm-report.py
8.8 KB
#!/usr/bin/env python3
"""
CRM Reports - Pipeline and activity analytics
Generates reports:
- Pipeline by stage (with trends)
- Activity summary (interactions, tasks)
- Win/loss analysis
- Forecast based on expected close dates
"""
import argparse
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not Path(DB_PATH).exists():
return None
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def pipeline_report() -> dict:
"""Generate pipeline report."""
conn = get_db()
if not conn:
return {'error': 'Database not found'}
now = datetime.now()
# Current pipeline by stage
stages = conn.execute("""
SELECT
stage,
COUNT(*) as count,
SUM(value) as total_value,
AVG(value) as avg_value,
SUM(value * COALESCE(probability, 50) / 100.0) as weighted_value
FROM deals
WHERE stage NOT IN ('won', 'lost')
GROUP BY stage
ORDER BY
CASE stage
WHEN 'lead' THEN 1
WHEN 'qualified' THEN 2
WHEN 'proposal' THEN 3
WHEN 'negotiation' THEN 4
END
""").fetchall()
pipeline = {
'stages': [dict(s) for s in stages],
'totals': {
'deals': sum(s['count'] for s in stages),
'value': sum(s['total_value'] or 0 for s in stages),
'weighted': sum(s['weighted_value'] or 0 for s in stages)
}
}
# Deals by expected close month
forecast = conn.execute("""
SELECT
strftime('%Y-%m', expected_close) as month,
COUNT(*) as count,
SUM(value) as total_value,
SUM(value * COALESCE(probability, 50) / 100.0) as weighted_value
FROM deals
WHERE stage NOT IN ('won', 'lost')
AND expected_close IS NOT NULL
AND expected_close >= date('now')
GROUP BY month
ORDER BY month
LIMIT 6
""").fetchall()
pipeline['forecast'] = [dict(f) for f in forecast]
# Top deals
top_deals = conn.execute("""
SELECT d.*, c.name as contact_name
FROM deals d
LEFT JOIN contacts c ON d.contact_id = c.id
WHERE d.stage NOT IN ('won', 'lost')
ORDER BY d.value DESC NULLS LAST
LIMIT 10
""").fetchall()
pipeline['top_deals'] = [dict(d) for d in top_deals]
conn.close()
return pipeline
def activity_report(days: int = 30) -> dict:
"""Generate activity report for past N days."""
conn = get_db()
if not conn:
return {'error': 'Database not found'}
since = (datetime.now() - timedelta(days=days)).isoformat()
# Interactions by type
interactions = conn.execute("""
SELECT type, direction, COUNT(*) as count
FROM interactions
WHERE logged_at >= ?
GROUP BY type, direction
ORDER BY count DESC
""", (since,)).fetchall()
# Tasks created vs completed
tasks_created = conn.execute("""
SELECT COUNT(*) as count FROM tasks WHERE created_at >= ?
""", (since,)).fetchone()['count']
tasks_completed = conn.execute("""
SELECT COUNT(*) as count FROM tasks WHERE completed_at >= ?
""", (since,)).fetchone()['count']
# Contacts added
contacts_added = conn.execute("""
SELECT COUNT(*) as count FROM contacts WHERE created_at >= ?
""", (since,)).fetchone()['count']
# Deals created
deals_created = conn.execute("""
SELECT COUNT(*) as count, SUM(value) as total_value
FROM deals WHERE created_at >= ?
""", (since,)).fetchone()
# Deal stage movements
stage_changes = conn.execute("""
SELECT
json_extract(old_values, '$.stage') as from_stage,
json_extract(new_values, '$.stage') as to_stage,
COUNT(*) as count
FROM audit_log
WHERE table_name = 'deals'
AND action = 'UPDATE'
AND created_at >= ?
AND json_extract(new_values, '$.stage') IS NOT NULL
GROUP BY from_stage, to_stage
""", (since,)).fetchall()
conn.close()
return {
'period_days': days,
'since': since,
'interactions': [dict(i) for i in interactions],
'total_interactions': sum(i['count'] for i in interactions),
'tasks': {
'created': tasks_created,
'completed': tasks_completed,
'completion_rate': tasks_completed / tasks_created if tasks_created else 0
},
'contacts_added': contacts_added,
'deals': {
'created': deals_created['count'],
'total_value': deals_created['total_value'] or 0
},
'stage_movements': [dict(s) for s in stage_changes]
}
def win_loss_report(days: int = 90) -> dict:
"""Analyze won vs lost deals."""
conn = get_db()
if not conn:
return {'error': 'Database not found'}
since = (datetime.now() - timedelta(days=days)).isoformat()
# Won deals
won = conn.execute("""
SELECT COUNT(*) as count, SUM(value) as total_value, AVG(value) as avg_value
FROM deals
WHERE stage = 'won' AND closed_at >= ?
""", (since,)).fetchone()
# Lost deals
lost = conn.execute("""
SELECT COUNT(*) as count, SUM(value) as total_value, AVG(value) as avg_value
FROM deals
WHERE stage = 'lost' AND closed_at >= ?
""", (since,)).fetchone()
# Win rate
total_closed = (won['count'] or 0) + (lost['count'] or 0)
win_rate = (won['count'] or 0) / total_closed if total_closed else 0
# Average deal cycle (created to closed)
cycle = conn.execute("""
SELECT AVG(julianday(closed_at) - julianday(created_at)) as avg_days
FROM deals
WHERE stage = 'won' AND closed_at >= ?
""", (since,)).fetchone()
conn.close()
return {
'period_days': days,
'won': {
'count': won['count'] or 0,
'total_value': won['total_value'] or 0,
'avg_value': won['avg_value'] or 0
},
'lost': {
'count': lost['count'] or 0,
'total_value': lost['total_value'] or 0,
'avg_value': lost['avg_value'] or 0
},
'win_rate': win_rate,
'avg_cycle_days': cycle['avg_days'] or 0
}
def format_pipeline_text(report: dict) -> str:
"""Format pipeline report as text."""
if 'error' in report:
return f"β {report['error']}"
lines = []
lines.append("π **Pipeline Report**")
lines.append("")
# By stage
lines.append("**By Stage:**")
for stage in report['stages']:
val = f"${stage['total_value']:,.0f}" if stage['total_value'] else "$0"
weighted = f"${stage['weighted_value']:,.0f}" if stage['weighted_value'] else "$0"
lines.append(f"β’ {stage['stage'].title()}: {stage['count']} deals β {val} (weighted: {weighted})")
totals = report['totals']
lines.append(f"β’ **Total:** {totals['deals']} deals β ${totals['value']:,.0f} (weighted: ${totals['weighted']:,.0f})")
lines.append("")
# Forecast
if report['forecast']:
lines.append("**Forecast by Month:**")
for month in report['forecast']:
lines.append(f"β’ {month['month']}: {month['count']} deals β ${month['total_value']:,.0f}")
lines.append("")
# Top deals
if report['top_deals']:
lines.append("**Top Deals:**")
for deal in report['top_deals'][:5]:
val = f"${deal['value']:,.0f}" if deal['value'] else "TBD"
contact = f" ({deal['contact_name']})" if deal['contact_name'] else ""
lines.append(f"β’ {deal['title']}{contact} β {val} [{deal['stage']}]")
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description='CRM analytics and reports')
parser.add_argument('report', choices=['pipeline', 'activity', 'winloss'],
help='Report type')
parser.add_argument('--json', action='store_true', help='Output as JSON')
parser.add_argument('--days', '-d', type=int, default=30, help='Days to analyze')
args = parser.parse_args()
if args.report == 'pipeline':
result = pipeline_report()
if not args.json:
print(format_pipeline_text(result))
return
elif args.report == 'activity':
result = activity_report(args.days)
elif args.report == 'winloss':
result = win_loss_report(args.days)
if args.json or args.report != 'pipeline':
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
crm-webhook.py
7.7 KB
#!/usr/bin/env python3
"""
CRM Webhook Server - Ingest leads from external forms
Accepts POST requests with contact/lead data and creates CRM entries.
Supports common form formats: Typeform, Tally, raw JSON.
Run: crm-webhook --port 8901
Then configure form webhooks to POST to http://localhost:8901/lead
"""
import argparse
import json
import os
import sqlite3
import subprocess
import sys
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from urllib.parse import parse_qs, urlparse
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
CRM_SCRIPT = Path(__file__).parent / 'crm'
LOG_FILE = os.path.expanduser('~/.local/share/agent-crm/webhook.log')
def log(message: str):
"""Append to log file."""
Path(LOG_FILE).parent.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(f"{datetime.now().isoformat()} {message}\n")
def parse_typeform(data: dict) -> dict:
"""Parse Typeform webhook payload."""
answers = data.get('form_response', {}).get('answers', [])
result = {'source': 'typeform'}
field_map = {
'email': ['email'],
'name': ['short_text', 'long_text'],
'phone': ['phone_number'],
}
for answer in answers:
field_type = answer.get('type')
field_title = answer.get('field', {}).get('title', '').lower()
# Try to map by field title
if 'email' in field_title:
result['email'] = answer.get('email')
elif 'name' in field_title:
result['name'] = answer.get('text')
elif 'phone' in field_title:
result['phone'] = answer.get('phone_number')
elif 'company' in field_title:
result['company'] = answer.get('text')
elif 'message' in field_title or 'note' in field_title:
result['notes'] = answer.get('text')
# Fallback: first short_text is name, first email is email
if 'name' not in result:
for answer in answers:
if answer.get('type') == 'short_text' and answer.get('text'):
result['name'] = answer.get('text')
break
return result
def parse_tally(data: dict) -> dict:
"""Parse Tally webhook payload."""
fields = data.get('data', {}).get('fields', [])
result = {'source': 'tally'}
for field in fields:
label = field.get('label', '').lower()
value = field.get('value')
if not value:
continue
if 'email' in label:
result['email'] = value
elif 'name' in label:
result['name'] = value
elif 'phone' in label:
result['phone'] = value
elif 'company' in label:
result['company'] = value
elif 'message' in label or 'note' in label:
result['notes'] = value
return result
def parse_generic(data: dict) -> dict:
"""Parse generic JSON payload."""
result = {'source': 'webhook'}
# Common field names
field_map = {
'name': ['name', 'full_name', 'fullName', 'contact_name', 'contactName'],
'email': ['email', 'email_address', 'emailAddress', 'mail'],
'phone': ['phone', 'phone_number', 'phoneNumber', 'tel', 'telephone'],
'company': ['company', 'company_name', 'companyName', 'organization', 'org'],
'notes': ['notes', 'message', 'comment', 'description', 'body'],
'role': ['role', 'title', 'job_title', 'jobTitle', 'position'],
}
for target, sources in field_map.items():
for source in sources:
if source in data and data[source]:
result[target] = data[source]
break
return result
def create_contact(data: dict) -> dict:
"""Create contact via CRM CLI."""
if not data.get('name'):
return {'error': 'Name is required'}
cmd = [str(CRM_SCRIPT), 'add-contact', data['name']]
if data.get('email'):
cmd.extend(['--email', data['email']])
if data.get('phone'):
cmd.extend(['--phone', data['phone']])
if data.get('company'):
cmd.extend(['--company', data['company']])
if data.get('role'):
cmd.extend(['--role', data['role']])
if data.get('source'):
cmd.extend(['--source', data['source']])
if data.get('notes'):
cmd.extend(['--notes', data['notes']])
cmd.extend(['--reason', 'Webhook ingest'])
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return json.loads(result.stdout)
else:
return {'error': result.stderr or 'CLI error'}
except Exception as e:
return {'error': str(e)}
class WebhookHandler(BaseHTTPRequestHandler):
"""HTTP handler for webhook requests."""
def log_message(self, format, *args):
log(f"HTTP {args[0]}")
def _send_json(self, status: int, data: dict):
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_GET(self):
"""Health check endpoint."""
if self.path == '/health':
self._send_json(200, {'status': 'ok', 'service': 'crm-webhook'})
else:
self._send_json(404, {'error': 'Not found'})
def do_POST(self):
"""Handle incoming webhooks."""
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
try:
data = json.loads(body)
except json.JSONDecodeError:
# Try URL-encoded
try:
data = {k: v[0] for k, v in parse_qs(body.decode()).items()}
except:
self._send_json(400, {'error': 'Invalid JSON'})
return
log(f"Received webhook: {json.dumps(data)[:500]}")
# Parse based on path or payload structure
if self.path == '/lead' or self.path == '/contact':
# Detect format
if 'form_response' in data:
parsed = parse_typeform(data)
elif 'data' in data and 'fields' in data.get('data', {}):
parsed = parse_tally(data)
else:
parsed = parse_generic(data)
log(f"Parsed: {json.dumps(parsed)}")
result = create_contact(parsed)
if 'error' in result:
log(f"Error: {result['error']}")
self._send_json(400, result)
else:
log(f"Created: {result}")
self._send_json(201, result)
else:
self._send_json(404, {'error': f'Unknown endpoint: {self.path}'})
def main():
parser = argparse.ArgumentParser(description='CRM webhook server for form ingestion')
parser.add_argument('--port', '-p', type=int, default=8901, help='Port to listen on')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
args = parser.parse_args()
server = HTTPServer((args.host, args.port), WebhookHandler)
print(f"CRM Webhook server listening on {args.host}:{args.port}")
print(f"Endpoints:")
print(f" POST /lead - Create contact from form submission")
print(f" POST /contact - Create contact from form submission")
print(f" GET /health - Health check")
print(f"")
print(f"Log: {LOG_FILE}")
print(f"Database: {DB_PATH}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.shutdown()
if __name__ == '__main__':
main()
crm.py
24.3 KB
#!/usr/bin/env python3
"""
Agent CRM CLI - Core CRUD operations
"""
import argparse
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import re
# Database location
DB_PATH = os.environ.get('CRM_DB', os.path.expanduser('~/.local/share/agent-crm/crm.db'))
SCHEMA_PATH = Path(__file__).parent.parent / 'schema.sql'
def get_db() -> sqlite3.Connection:
"""Get database connection, initializing if needed."""
db_path = Path(DB_PATH)
db_path.parent.mkdir(parents=True, exist_ok=True)
needs_init = not db_path.exists()
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
if needs_init and SCHEMA_PATH.exists():
conn.executescript(SCHEMA_PATH.read_text())
conn.commit()
return conn
def parse_date(s: str) -> Optional[str]:
"""Parse flexible date strings into ISO format."""
if not s:
return None
s = s.lower().strip()
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# Relative dates
if s == 'today':
return today.isoformat()
if s == 'tomorrow':
return (today + timedelta(days=1)).isoformat()
if s == 'yesterday':
return (today - timedelta(days=1)).isoformat()
# "next week", "next month"
if s.startswith('next '):
unit = s[5:]
if unit == 'week':
return (today + timedelta(weeks=1)).isoformat()
if unit == 'month':
return (today + timedelta(days=30)).isoformat()
# "in N days/weeks"
match = re.match(r'in (\d+) (day|week|month)s?', s)
if match:
n, unit = int(match.group(1)), match.group(2)
if unit == 'day':
return (today + timedelta(days=n)).isoformat()
if unit == 'week':
return (today + timedelta(weeks=n)).isoformat()
if unit == 'month':
return (today + timedelta(days=n*30)).isoformat()
# Day names
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
if s in days or s.startswith('next ') and s[5:] in days:
target_day = days.index(s.replace('next ', ''))
current_day = today.weekday()
delta = target_day - current_day
if delta <= 0:
delta += 7
return (today + timedelta(days=delta)).isoformat()
# Try parsing as date
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%m/%d', '%B %d', '%b %d']:
try:
dt = datetime.strptime(s, fmt)
if dt.year == 1900: # No year specified
dt = dt.replace(year=today.year)
if dt < today:
dt = dt.replace(year=today.year + 1)
return dt.isoformat()
except ValueError:
continue
return s # Return as-is if unparseable
def audit_log(conn: sqlite3.Connection, table: str, record_id: str, action: str,
old: dict = None, new: dict = None, reason: str = None, conv_ref: str = None):
"""Log an action to the audit table."""
conn.execute("""
INSERT INTO audit_log (table_name, record_id, action, old_values, new_values, reason, conversation_ref)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (table, record_id, action,
json.dumps(old) if old else None,
json.dumps(new) if new else None,
reason, conv_ref))
# ============ CONTACTS ============
def add_contact(args):
"""Add a new contact."""
conn = get_db()
tags = json.dumps(args.tags.split(',')) if args.tags else None
cursor = conn.execute("""
INSERT INTO contacts (name, email, phone, company, role, source, tags, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (args.name, args.email, args.phone, args.company, args.role, args.source, tags, args.notes))
record_id = cursor.lastrowid
# Get the actual ID
row = conn.execute("SELECT id FROM contacts WHERE rowid = ?", (record_id,)).fetchone()
audit_log(conn, 'contacts', row['id'], 'INSERT', new={
'name': args.name, 'email': args.email, 'company': args.company
}, reason=args.reason)
conn.commit()
print(json.dumps({
'status': 'created',
'id': row['id'],
'name': args.name,
'company': args.company,
'email': args.email
}, indent=2))
def find_contact(args):
"""Find contacts by name, email, or company."""
conn = get_db()
query = args.query.lower()
rows = conn.execute("""
SELECT * FROM contacts
WHERE lower(name) LIKE ? OR lower(email) LIKE ? OR lower(company) LIKE ?
ORDER BY updated_at DESC
LIMIT ?
""", (f'%{query}%', f'%{query}%', f'%{query}%', args.limit)).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
def list_contacts(args):
"""List all contacts."""
conn = get_db()
order = 'updated_at DESC' if args.recent else 'name ASC'
rows = conn.execute(f"SELECT * FROM contacts ORDER BY {order} LIMIT ?", (args.limit,)).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
def update_contact(args):
"""Update a contact."""
conn = get_db()
# Find the contact
row = conn.execute("""
SELECT * FROM contacts WHERE id = ? OR lower(name) LIKE ?
""", (args.id, f'%{args.id.lower()}%')).fetchone()
if not row:
print(json.dumps({'error': f'Contact not found: {args.id}'}))
sys.exit(1)
old = dict(row)
updates = {}
for field in ['name', 'email', 'phone', 'company', 'role', 'source', 'notes']:
val = getattr(args, field, None)
if val is not None:
updates[field] = val
if args.tags:
updates['tags'] = json.dumps(args.tags.split(','))
if not updates:
print(json.dumps({'error': 'No updates provided'}))
sys.exit(1)
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
updates['updated_at'] = datetime.now().isoformat()
conn.execute(f"""
UPDATE contacts SET {set_clause}, updated_at = ? WHERE id = ?
""", (*updates.values(), row['id']))
audit_log(conn, 'contacts', row['id'], 'UPDATE', old=old, new=updates, reason=args.reason)
conn.commit()
print(json.dumps({'status': 'updated', 'id': row['id'], 'changes': updates}, indent=2))
def delete_contact(args):
"""Delete a contact."""
conn = get_db()
row = conn.execute("SELECT * FROM contacts WHERE id = ?", (args.id,)).fetchone()
if not row:
print(json.dumps({'error': f'Contact not found: {args.id}'}))
sys.exit(1)
old = dict(row)
conn.execute("DELETE FROM contacts WHERE id = ?", (args.id,))
audit_log(conn, 'contacts', args.id, 'DELETE', old=old, reason=args.reason)
conn.commit()
print(json.dumps({'status': 'deleted', 'id': args.id, 'name': old['name']}, indent=2))
# ============ DEALS ============
def add_deal(args):
"""Add a new deal."""
conn = get_db()
# Find contact if specified
contact_id = None
if args.contact:
row = conn.execute("""
SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
""", (args.contact, f'%{args.contact.lower()}%')).fetchone()
if row:
contact_id = row['id']
expected_close = parse_date(args.expected_close) if args.expected_close else None
cursor = conn.execute("""
INSERT INTO deals (contact_id, title, value, currency, stage, probability, expected_close, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (contact_id, args.title, args.value, args.currency or 'USD',
args.stage or 'lead', args.probability, expected_close, args.notes))
row = conn.execute("SELECT id FROM deals WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
audit_log(conn, 'deals', row['id'], 'INSERT', new={
'title': args.title, 'value': args.value, 'stage': args.stage or 'lead'
}, reason=args.reason)
conn.commit()
print(json.dumps({
'status': 'created',
'id': row['id'],
'title': args.title,
'value': args.value,
'stage': args.stage or 'lead'
}, indent=2))
def list_deals(args):
"""List deals, optionally filtered by stage."""
conn = get_db()
if args.stage:
rows = conn.execute("""
SELECT d.*, c.name as contact_name
FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
WHERE d.stage = ?
ORDER BY d.value DESC
LIMIT ?
""", (args.stage, args.limit)).fetchall()
else:
rows = conn.execute("""
SELECT d.*, c.name as contact_name
FROM deals d LEFT JOIN contacts c ON d.contact_id = c.id
ORDER BY d.updated_at DESC
LIMIT ?
""", (args.limit,)).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
def update_deal(args):
"""Update a deal."""
conn = get_db()
row = conn.execute("""
SELECT * FROM deals WHERE id = ? OR lower(title) LIKE ?
""", (args.id, f'%{args.id.lower()}%')).fetchone()
if not row:
print(json.dumps({'error': f'Deal not found: {args.id}'}))
sys.exit(1)
old = dict(row)
updates = {}
for field in ['title', 'value', 'currency', 'stage', 'probability', 'notes']:
val = getattr(args, field, None)
if val is not None:
updates[field] = val
if args.expected_close:
updates['expected_close'] = parse_date(args.expected_close)
if args.stage in ('won', 'lost'):
updates['closed_at'] = datetime.now().isoformat()
if not updates:
print(json.dumps({'error': 'No updates provided'}))
sys.exit(1)
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
conn.execute(f"""
UPDATE deals SET {set_clause}, updated_at = ? WHERE id = ?
""", (*updates.values(), datetime.now().isoformat(), row['id']))
audit_log(conn, 'deals', row['id'], 'UPDATE', old=old, new=updates, reason=args.reason)
conn.commit()
print(json.dumps({'status': 'updated', 'id': row['id'], 'changes': updates}, indent=2))
def pipeline(args):
"""Show pipeline summary."""
conn = get_db()
rows = conn.execute("""
SELECT stage, COUNT(*) as count, SUM(value) as total_value
FROM deals
WHERE stage NOT IN ('won', 'lost')
GROUP BY stage
ORDER BY
CASE stage
WHEN 'lead' THEN 1
WHEN 'qualified' THEN 2
WHEN 'proposal' THEN 3
WHEN 'negotiation' THEN 4
END
""").fetchall()
result = {
'stages': [dict(r) for r in rows],
'total_deals': sum(r['count'] for r in rows),
'total_value': sum(r['total_value'] or 0 for r in rows)
}
# Weighted value (probability-adjusted)
weighted = conn.execute("""
SELECT SUM(value * COALESCE(probability, 50) / 100.0) as weighted
FROM deals WHERE stage NOT IN ('won', 'lost')
""").fetchone()
result['weighted_value'] = weighted['weighted'] or 0
print(json.dumps(result, indent=2))
# ============ INTERACTIONS ============
def log_interaction(args):
"""Log an interaction."""
conn = get_db()
# Find contact
contact_id = None
if args.contact:
row = conn.execute("""
SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
""", (args.contact, f'%{args.contact.lower()}%')).fetchone()
if row:
contact_id = row['id']
occurred = parse_date(args.date) if args.date else datetime.now().isoformat()
cursor = conn.execute("""
INSERT INTO interactions (contact_id, deal_id, type, direction, summary, raw_content, occurred_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (contact_id, args.deal, args.type, args.direction, args.summary, args.raw, occurred))
row = conn.execute("SELECT id FROM interactions WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
audit_log(conn, 'interactions', row['id'], 'INSERT', new={
'type': args.type, 'summary': args.summary[:100]
}, reason=args.reason)
conn.commit()
print(json.dumps({
'status': 'logged',
'id': row['id'],
'type': args.type,
'contact_id': contact_id
}, indent=2))
def list_interactions(args):
"""List interactions for a contact or recent."""
conn = get_db()
if args.contact:
rows = conn.execute("""
SELECT i.*, c.name as contact_name
FROM interactions i
LEFT JOIN contacts c ON i.contact_id = c.id
WHERE i.contact_id = ? OR lower(c.name) LIKE ?
ORDER BY i.occurred_at DESC
LIMIT ?
""", (args.contact, f'%{args.contact.lower()}%', args.limit)).fetchall()
else:
rows = conn.execute("""
SELECT i.*, c.name as contact_name
FROM interactions i
LEFT JOIN contacts c ON i.contact_id = c.id
ORDER BY i.occurred_at DESC
LIMIT ?
""", (args.limit,)).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
# ============ TASKS ============
def add_task(args):
"""Add a task."""
conn = get_db()
contact_id = None
if args.contact:
row = conn.execute("""
SELECT id FROM contacts WHERE id = ? OR lower(name) LIKE ?
""", (args.contact, f'%{args.contact.lower()}%')).fetchone()
if row:
contact_id = row['id']
due = parse_date(args.due) if args.due else None
cursor = conn.execute("""
INSERT INTO tasks (contact_id, deal_id, title, due_at, priority)
VALUES (?, ?, ?, ?, ?)
""", (contact_id, args.deal, args.title, due, args.priority or 'normal'))
row = conn.execute("SELECT id FROM tasks WHERE rowid = ?", (cursor.lastrowid,)).fetchone()
audit_log(conn, 'tasks', row['id'], 'INSERT', new={'title': args.title, 'due': due}, reason=args.reason)
conn.commit()
print(json.dumps({
'status': 'created',
'id': row['id'],
'title': args.title,
'due_at': due
}, indent=2))
def list_tasks(args):
"""List tasks."""
conn = get_db()
if args.pending:
rows = conn.execute("""
SELECT t.*, c.name as contact_name
FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL
ORDER BY t.due_at ASC NULLS LAST
LIMIT ?
""", (args.limit,)).fetchall()
elif args.overdue:
rows = conn.execute("""
SELECT t.*, c.name as contact_name
FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
WHERE t.completed_at IS NULL AND t.due_at < datetime('now')
ORDER BY t.due_at ASC
LIMIT ?
""", (args.limit,)).fetchall()
else:
rows = conn.execute("""
SELECT t.*, c.name as contact_name
FROM tasks t
LEFT JOIN contacts c ON t.contact_id = c.id
ORDER BY t.created_at DESC
LIMIT ?
""", (args.limit,)).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
def complete_task(args):
"""Complete a task."""
conn = get_db()
row = conn.execute("""
SELECT * FROM tasks WHERE id = ? OR lower(title) LIKE ?
""", (args.id, f'%{args.id.lower()}%')).fetchone()
if not row:
print(json.dumps({'error': f'Task not found: {args.id}'}))
sys.exit(1)
old = dict(row)
now = datetime.now().isoformat()
conn.execute("UPDATE tasks SET completed_at = ? WHERE id = ?", (now, row['id']))
audit_log(conn, 'tasks', row['id'], 'UPDATE', old=old, new={'completed_at': now}, reason=args.reason)
conn.commit()
print(json.dumps({'status': 'completed', 'id': row['id'], 'title': row['title']}, indent=2))
# ============ QUERY ============
def query(args):
"""Run a raw SQL query (SELECT only)."""
conn = get_db()
sql = args.sql.strip()
if not sql.lower().startswith('select'):
print(json.dumps({'error': 'Only SELECT queries allowed'}))
sys.exit(1)
try:
rows = conn.execute(sql).fetchall()
results = [dict(r) for r in rows]
print(json.dumps(results, indent=2))
except sqlite3.Error as e:
print(json.dumps({'error': str(e)}))
sys.exit(1)
def stats(args):
"""Show CRM statistics."""
conn = get_db()
result = {}
result['contacts'] = conn.execute("SELECT COUNT(*) as count FROM contacts").fetchone()['count']
result['deals'] = conn.execute("SELECT COUNT(*) as count FROM deals").fetchone()['count']
result['open_deals'] = conn.execute(
"SELECT COUNT(*) as count FROM deals WHERE stage NOT IN ('won', 'lost')"
).fetchone()['count']
result['interactions'] = conn.execute("SELECT COUNT(*) as count FROM interactions").fetchone()['count']
result['pending_tasks'] = conn.execute(
"SELECT COUNT(*) as count FROM tasks WHERE completed_at IS NULL"
).fetchone()['count']
result['overdue_tasks'] = conn.execute(
"SELECT COUNT(*) as count FROM tasks WHERE completed_at IS NULL AND due_at < datetime('now')"
).fetchone()['count']
# Pipeline value
pipeline_row = conn.execute("""
SELECT SUM(value) as total FROM deals WHERE stage NOT IN ('won', 'lost')
""").fetchone()
result['pipeline_value'] = pipeline_row['total'] or 0
# Won this month
won_row = conn.execute("""
SELECT SUM(value) as total FROM deals
WHERE stage = 'won' AND closed_at >= date('now', 'start of month')
""").fetchone()
result['won_this_month'] = won_row['total'] or 0
print(json.dumps(result, indent=2))
def init_db(args):
"""Initialize the database and show what was created."""
from pathlib import Path
db_path = Path(DB_PATH)
already_exists = db_path.exists()
# Get or create database
conn = get_db()
# Get table info
tables = conn.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""").fetchall()
table_names = [t['name'] for t in tables]
# Get counts
counts = {}
for table in table_names:
count = conn.execute(f"SELECT COUNT(*) as c FROM {table}").fetchone()['c']
counts[table] = count
# Get database size
db_size = db_path.stat().st_size if db_path.exists() else 0
result = {
'status': 'existing' if already_exists else 'created',
'database': str(db_path),
'size_bytes': db_size,
'tables': table_names,
'record_counts': counts,
'paths': {
'database': str(db_path),
'backups': str(db_path.parent / 'backups'),
'charts': str(db_path.parent / 'charts'),
'exports': str(db_path.parent / 'exports'),
}
}
if not already_exists:
result['message'] = 'Database created and ready to use'
else:
total_records = sum(counts.values())
result['message'] = f'Database exists with {total_records} total records'
print(json.dumps(result, indent=2))
# ============ MAIN ============
def main():
parser = argparse.ArgumentParser(description='Agent CRM CLI')
subparsers = parser.add_subparsers(dest='command', required=True)
# Common args
def add_common(p):
p.add_argument('--reason', help='Reason for this action (audit)')
# Contact commands
p = subparsers.add_parser('add-contact', help='Add a contact')
p.add_argument('name', help='Contact name')
p.add_argument('--email', '-e')
p.add_argument('--phone', '-p')
p.add_argument('--company', '-c')
p.add_argument('--role', '-r')
p.add_argument('--source', '-s')
p.add_argument('--tags', '-t', help='Comma-separated tags')
p.add_argument('--notes', '-n')
add_common(p)
p.set_defaults(func=add_contact)
p = subparsers.add_parser('find-contact', help='Find contacts')
p.add_argument('query', help='Search query')
p.add_argument('--limit', '-l', type=int, default=10)
p.set_defaults(func=find_contact)
p = subparsers.add_parser('list-contacts', help='List contacts')
p.add_argument('--limit', '-l', type=int, default=20)
p.add_argument('--recent', '-r', action='store_true')
p.set_defaults(func=list_contacts)
p = subparsers.add_parser('update-contact', help='Update a contact')
p.add_argument('id', help='Contact ID or name')
p.add_argument('--name')
p.add_argument('--email', '-e')
p.add_argument('--phone', '-p')
p.add_argument('--company', '-c')
p.add_argument('--role', '-r')
p.add_argument('--source', '-s')
p.add_argument('--tags', '-t')
p.add_argument('--notes', '-n')
add_common(p)
p.set_defaults(func=update_contact)
p = subparsers.add_parser('delete-contact', help='Delete a contact')
p.add_argument('id', help='Contact ID')
add_common(p)
p.set_defaults(func=delete_contact)
# Deal commands
p = subparsers.add_parser('add-deal', help='Add a deal')
p.add_argument('title', help='Deal title')
p.add_argument('--value', '-v', type=float)
p.add_argument('--contact', '-c', help='Contact name or ID')
p.add_argument('--stage', '-s', default='lead')
p.add_argument('--probability', '-p', type=int)
p.add_argument('--currency', default='USD')
p.add_argument('--expected-close', '-e')
p.add_argument('--notes', '-n')
add_common(p)
p.set_defaults(func=add_deal)
p = subparsers.add_parser('list-deals', help='List deals')
p.add_argument('--stage', '-s')
p.add_argument('--limit', '-l', type=int, default=20)
p.set_defaults(func=list_deals)
p = subparsers.add_parser('update-deal', help='Update a deal')
p.add_argument('id', help='Deal ID or title')
p.add_argument('--title')
p.add_argument('--value', '-v', type=float)
p.add_argument('--stage', '-s')
p.add_argument('--probability', '-p', type=int)
p.add_argument('--currency')
p.add_argument('--expected-close', '-e')
p.add_argument('--notes', '-n')
add_common(p)
p.set_defaults(func=update_deal)
p = subparsers.add_parser('pipeline', help='Show pipeline summary')
p.set_defaults(func=pipeline)
# Interaction commands
p = subparsers.add_parser('log', help='Log an interaction')
p.add_argument('type', choices=['email', 'call', 'meeting', 'note', 'linkedin', 'text'])
p.add_argument('summary', help='What happened')
p.add_argument('--contact', '-c')
p.add_argument('--deal', '-d')
p.add_argument('--direction', choices=['inbound', 'outbound'])
p.add_argument('--date')
p.add_argument('--raw', help='Raw content')
add_common(p)
p.set_defaults(func=log_interaction)
p = subparsers.add_parser('list-interactions', help='List interactions')
p.add_argument('--contact', '-c')
p.add_argument('--limit', '-l', type=int, default=20)
p.set_defaults(func=list_interactions)
# Task commands
p = subparsers.add_parser('add-task', help='Add a task')
p.add_argument('title', help='Task title')
p.add_argument('--contact', '-c')
p.add_argument('--deal', '-d')
p.add_argument('--due')
p.add_argument('--priority', choices=['low', 'normal', 'high', 'urgent'])
add_common(p)
p.set_defaults(func=add_task)
p = subparsers.add_parser('list-tasks', help='List tasks')
p.add_argument('--pending', action='store_true')
p.add_argument('--overdue', action='store_true')
p.add_argument('--limit', '-l', type=int, default=20)
p.set_defaults(func=list_tasks)
p = subparsers.add_parser('complete-task', help='Complete a task')
p.add_argument('id', help='Task ID or title')
add_common(p)
p.set_defaults(func=complete_task)
# Query commands
p = subparsers.add_parser('query', help='Run SQL query')
p.add_argument('sql', help='SQL query (SELECT only)')
p.set_defaults(func=query)
p = subparsers.add_parser('stats', help='Show CRM statistics')
p.set_defaults(func=stats)
p = subparsers.add_parser('init', help='Initialize database')
p.set_defaults(func=init_db)
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
main()
skill.json
961 B
{
"name": "agent-crm",
"version": "1.0.1",
"description": "A complete CRM with no UIβjust natural language. Track contacts, deals, interactions, and tasks through conversation.",
"author": "Tyrell",
"license": "MIT",
"repository": "",
"keywords": [
"crm",
"sales",
"contacts",
"deals",
"pipeline",
"tasks"
],
"requirements": {
"python": ">=3.10",
"notes": "matplotlib auto-installs on first chart generation"
},
"scripts": [
"scripts/crm.py",
"scripts/crm-ingest.py",
"scripts/crm-digest.py",
"scripts/crm-notify.py",
"scripts/crm-webhook.py",
"scripts/crm-report.py",
"scripts/crm-chart.py",
"scripts/crm-export.py",
"scripts/crm-backup.py"
],
"data": {
"database": "~/.local/share/agent-crm/crm.db",
"backups": "~/.local/share/agent-crm/backups/",
"charts": "~/.local/share/agent-crm/charts/",
"exports": "~/.local/share/agent-crm/exports/"
}
}
scenarios.md
7.8 KB
# Agent CRM β Scenario Tests
End-to-end user stories as validation. Each scenario is a complete user journey.
Run these against a fresh database to validate the system works as expected.
---
## Scenario 1: First Contact β Deal β Win
**Story:** Meet someone at an event, track them through to a closed deal.
**Steps:**
1. **Add contact from event**
```
Input: "Met Sarah Chen at the AI meetup. She's CTO at Replicate, interested in our API. Email is sarah@replicate.com"
Expected actions:
- Create contact: Sarah Chen, CTO, Replicate, sarah@replicate.com, source: AI meetup
- Log interaction: meeting, "Met at AI meetup, interested in API"
Validate:
- crm find-contact "sarah" returns 1 result with correct fields
- crm list-interactions --contact "sarah" shows 1 meeting
```
2. **Create deal**
```
Input: "Create a $50K deal for Replicate API integration"
Expected actions:
- Create deal: "Replicate API Integration", $50,000, stage=lead, contact=Sarah Chen
Validate:
- crm list-deals shows 1 deal at $50K
- crm pipeline shows $50K in lead stage
```
3. **Log follow-up call**
```
Input: "Had a call with Sarah. Very positive - she has budget approved. Moving to proposal stage."
Expected actions:
- Log interaction: call, outbound, summary of conversation
- Update deal stage: lead β qualified (or proposal based on interpretation)
- Offer to create follow-up task
Validate:
- crm list-interactions --contact "sarah" shows 2 interactions
- Deal stage updated
```
4. **Close the deal**
```
Input: "Sarah signed the contract. Mark the deal as won."
Expected behavior:
- Agent asks for confirmation (>$10K threshold)
Input: "yes"
Expected actions:
- Update deal stage: β won
- Set closed_at timestamp
Validate:
- crm list-deals --stage won shows 1 deal
- crm-report winloss shows $50K won
```
**Success criteria:** Full journey from contact to closed-won with audit trail.
---
## Scenario 2: Email Ingest
**Story:** Forward a sales email, have it parsed and logged.
**Steps:**
1. **Forward email content**
```
Input: """
From: alex@datastack.io
Subject: Re: AI Consulting Inquiry
Date: Feb 8, 2026
Hi,
Thanks for the call yesterday. We're definitely interested in moving forward
with the AI consulting engagement we discussed. Budget is around $30K for
the initial phase.
Can you send over a proposal by end of week?
Best,
Alex Rivera
Founder, DataStack
"""
Expected actions:
- Run through crm-ingest
- Create contact: Alex Rivera, Founder, DataStack, alex@datastack.io
- Log interaction: email, inbound
- Detect deal signals: $30K, "proposal" stage hint
- Create task: "Send proposal to Alex" due Friday
Validate:
- Contact exists with correct company/role
- Interaction logged as email
- Deal created or suggested at $30K
```
**Success criteria:** Unstructured email β structured CRM data with minimal manual input.
---
## Scenario 3: Pipeline Review
**Story:** Ask for pipeline status, get actionable summary.
**Setup:** Create 3-4 deals at different stages.
**Steps:**
1. **Query pipeline**
```
Input: "What's my pipeline look like?"
Expected output:
- Deals grouped by stage with counts and values
- Total pipeline value
- Weighted pipeline value
- Highlight any deals closing soon or needing attention
```
2. **Drill into specific stage**
```
Input: "Show me deals in proposal stage"
Expected output:
- List of deals with contact, value, expected close
```
3. **Ask for forecast**
```
Input: "What's closing this month?"
Expected output:
- Deals with expected_close in current month
- Total value at risk
```
**Success criteria:** Natural language queries return useful, formatted pipeline data.
---
## Scenario 4: Task Management
**Story:** Create, track, and complete follow-up tasks.
**Steps:**
1. **Create task with natural date**
```
Input: "Remind me to follow up with Sarah next Tuesday"
Expected actions:
- Create task: "Follow up with Sarah", due next Tuesday, linked to Sarah contact
Validate:
- crm list-tasks --pending shows task with correct due date
```
2. **Check overdue**
```
Setup: Create task due yesterday
Input: "What's overdue?"
Expected output:
- List of overdue tasks with days overdue
```
3. **Complete task**
```
Input: "Done with the Sarah follow-up"
Expected actions:
- Mark task completed
Validate:
- Task has completed_at set
- crm list-tasks --pending no longer shows it
```
**Success criteria:** Task lifecycle with natural language and date parsing.
---
## Scenario 5: Stale Contact Alert
**Story:** System proactively alerts about contacts going cold.
**Setup:**
- Create contact with deal 15 days ago
- No interactions logged since
**Steps:**
1. **Run notify check**
```
Command: crm-notify --stale-days 14
Expected output:
- Alert for stale contact with open deal
- Suggests follow-up
```
2. **Agent acts on alert**
```
Input: (agent receives alert in heartbeat)
Expected behavior:
- Agent messages user about stale contact
- Offers to create follow-up task or draft email
```
**Success criteria:** Proactive alerting catches contacts before they go cold.
---
## Scenario 6: Confirmation Flow
**Story:** High-stakes actions require explicit confirmation.
**Steps:**
1. **Try to close large deal**
```
Input: "Mark the $75K Acme deal as won"
Expected behavior:
- Agent does NOT immediately execute
- Agent asks: "Confirm: Mark 'Acme Deal' ($75,000) as WON?"
```
2. **Confirm**
```
Input: "yes"
Expected actions:
- Deal updated to won
- Audit log shows confirmation
```
3. **Try to delete**
```
Input: "Delete the old contact John Smith"
Expected behavior:
- Agent asks for confirmation before delete
```
**Success criteria:** No high-stakes action executes without explicit user confirmation.
---
## Scenario 7: Webhook Ingestion
**Story:** External form submission creates CRM contact.
**Steps:**
1. **Start webhook server**
```
Command: crm-webhook --port 8901 &
```
2. **Submit form data**
```
Command: curl -X POST http://localhost:8901/lead \
-H "Content-Type: application/json" \
-d '{"name": "Jordan Lee", "email": "jordan@startup.io", "company": "StartupCo", "message": "Interested in your services"}'
Expected response:
- 201 Created
- Contact ID returned
```
3. **Verify in CRM**
```
Command: crm find-contact "jordan"
Expected:
- Contact exists with all fields populated
- Source: webhook
```
**Success criteria:** Zero-touch lead capture from external forms.
---
## Running Scenarios
### Fresh Database Test
```bash
# Reset database
rm ~/.local/share/agent-crm/crm.db
crm init
# Run through scenarios manually or via agent
```
### Automated Validation
```bash
# After each scenario, validate with queries:
crm stats # Overall counts
crm query "SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 10" # Audit trail
```
---
## Satisfaction Criteria
Following the StrongDM "dark factories" approach:
| Scenario | Pass Condition |
|----------|---------------|
| 1. First Contact β Win | Full lifecycle, all entities linked, audit complete |
| 2. Email Ingest | Correct entity extraction, >80% accuracy on fields |
| 3. Pipeline Review | Accurate totals, useful formatting |
| 4. Task Management | Date parsing works, completion tracked |
| 5. Stale Alert | Alert fires within 1 day of threshold |
| 6. Confirmation | Never auto-executes high-stakes without confirm |
| 7. Webhook | Contact created with correct source |
**Overall pass:** 6/7 scenarios pass with no critical failures.
Compatible Agents
Claude CodeOpenClaw
Details
- Category
- productivity
- Version
- 1.0.1
- Stars
- 0
- Added
- February 9, 2026
- Updated
- February 9, 2026