From e326cc6bba9cdbab5e7500d1d94a774dd98b87bb Mon Sep 17 00:00:00 2001 From: DMleadgen Date: Thu, 16 Apr 2026 11:02:22 -0600 Subject: [PATCH] feat: ship CRM admin and staging sign-in --- app/admin/contacts/[id]/page.tsx | 201 ++++ app/admin/contacts/page.tsx | 164 +++ app/admin/conversations/[id]/page.tsx | 241 +++++ app/admin/conversations/page.tsx | 208 ++++ app/admin/layout.tsx | 32 +- app/admin/page.tsx | 14 + app/api/admin/auth/login/route.ts | 37 + app/api/admin/auth/logout/route.ts | 23 + app/api/admin/contacts/[id]/route.ts | 36 + app/api/admin/contacts/route.ts | 32 + app/api/admin/conversations/[id]/route.ts | 39 + app/api/admin/conversations/route.ts | 45 + app/api/internal/ghl/shared.ts | 51 + app/api/internal/ghl/sync/contacts/route.ts | 60 ++ .../internal/ghl/sync/conversations/route.ts | 70 ++ app/api/internal/ghl/sync/messages/route.ts | 61 ++ app/api/internal/ghl/sync/reconcile/route.ts | 29 + app/api/internal/ghl/sync/recordings/route.ts | 69 ++ app/sign-in/[[...sign-in]]/page.tsx | 85 +- convex/crm.ts | 982 ++++++++++++++++++ convex/crmModel.ts | 390 +++++++ convex/leads.ts | 217 ++-- convex/schema.ts | 239 ++++- convex/voiceSessions.ts | 334 +++--- lib/server/admin-auth.ts | 105 ++ lib/server/ghl-sync.ts | 130 +++ 26 files changed, 3563 insertions(+), 331 deletions(-) create mode 100644 app/admin/contacts/[id]/page.tsx create mode 100644 app/admin/contacts/page.tsx create mode 100644 app/admin/conversations/[id]/page.tsx create mode 100644 app/admin/conversations/page.tsx create mode 100644 app/api/admin/auth/login/route.ts create mode 100644 app/api/admin/auth/logout/route.ts create mode 100644 app/api/admin/contacts/[id]/route.ts create mode 100644 app/api/admin/contacts/route.ts create mode 100644 app/api/admin/conversations/[id]/route.ts create mode 100644 app/api/admin/conversations/route.ts create mode 100644 app/api/internal/ghl/shared.ts create mode 100644 app/api/internal/ghl/sync/contacts/route.ts create mode 100644 app/api/internal/ghl/sync/conversations/route.ts create mode 100644 app/api/internal/ghl/sync/messages/route.ts create mode 100644 app/api/internal/ghl/sync/reconcile/route.ts create mode 100644 app/api/internal/ghl/sync/recordings/route.ts create mode 100644 convex/crm.ts create mode 100644 convex/crmModel.ts create mode 100644 lib/server/ghl-sync.ts diff --git a/app/admin/contacts/[id]/page.tsx b/app/admin/contacts/[id]/page.tsx new file mode 100644 index 00000000..4a55bd09 --- /dev/null +++ b/app/admin/contacts/[id]/page.tsx @@ -0,0 +1,201 @@ +import Link from "next/link" +import { notFound } from "next/navigation" +import { fetchQuery } from "convex/nextjs" +import { ArrowLeft, ContactRound, MessageSquare } from "lucide-react" +import { api } from "@/convex/_generated/api" +import { Badge } from "@/components/ui/badge" +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" + +type PageProps = { + params: Promise<{ + id: string + }> +} + +function formatTimestamp(value?: number) { + if (!value) { + return "—" + } + + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} + +export default async function AdminContactDetailPage({ params }: PageProps) { + const { id } = await params + const detail = await fetchQuery(api.crm.getAdminContactDetail, { + contactId: id, + }) + + if (!detail) { + notFound() + } + + return ( +
+
+
+ + + Back to contacts + +

+ {detail.contact.firstName} {detail.contact.lastName} +

+

+ Unified CRM record across forms, calls, SMS, and sync imports. +

+
+ +
+ + + + + Contact Profile + + + Backend-owned identity and sync metadata. + + + +
+

+ Email +

+

+ {detail.contact.email || "—"} +

+
+
+

+ Phone +

+

{detail.contact.phone || "—"}

+
+
+

+ Company +

+

{detail.contact.company || "—"}

+
+
+

+ Status +

+ + {detail.contact.status} + +
+
+

+ GHL Contact ID +

+

+ {detail.contact.ghlContactId || "—"} +

+
+
+

+ Last Activity +

+

+ {formatTimestamp(detail.contact.lastActivityAt)} +

+
+
+
+ + + + + + Conversations + + + Every mirrored conversation associated to this contact. + + + + {detail.conversations.length === 0 ? ( +

+ No conversations are linked to this contact yet. +

+ ) : ( + detail.conversations.map((conversation: any) => ( +
+
+
+

+ {conversation.title || "Untitled conversation"} +

+

+ {conversation.channel} •{" "} + {formatTimestamp(conversation.lastMessageAt)} +

+
+ + {conversation.status} + +
+

+ {conversation.lastMessagePreview || "No preview yet"} +

+
+ )) + )} +
+
+
+ + + + Timeline + + Calls, messages, recordings, and lead events in one stream. + + + + {detail.timeline.length === 0 ? ( +

+ No timeline activity for this contact yet. +

+ ) : ( + detail.timeline.map((item: any) => ( +
+
+ {item.type} + {formatTimestamp(item.timestamp)} +
+

{item.title || "Untitled"}

+

+ {item.body || "—"} +

+
+ )) + )} +
+
+
+
+ ) +} + +export const metadata = { + title: "Contact Detail | Admin", + description: "Review a Rocky CRM contact and full interaction timeline", +} diff --git a/app/admin/contacts/page.tsx b/app/admin/contacts/page.tsx new file mode 100644 index 00000000..eb60ec58 --- /dev/null +++ b/app/admin/contacts/page.tsx @@ -0,0 +1,164 @@ +import Link from "next/link" +import { fetchQuery } from "convex/nextjs" +import { ContactRound, Search } from "lucide-react" +import { api } from "@/convex/_generated/api" +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { Input } from "@/components/ui/input" + +type PageProps = { + searchParams: Promise<{ + search?: string + page?: string + }> +} + +function formatTimestamp(value?: number) { + if (!value) { + return "—" + } + + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} + +export default async function AdminContactsPage({ searchParams }: PageProps) { + const params = await searchParams + const page = Math.max(1, Number.parseInt(params.page || "1", 10) || 1) + const search = params.search?.trim() || undefined + + const data = await fetchQuery(api.crm.listAdminContacts, { + search, + page, + limit: 25, + }) + + return ( +
+
+
+
+

+ Contacts +

+

+ Backend-owned CRM contacts mirrored from forms, phone calls, and + GHL sync. +

+
+ + + +
+ + + + + + Contact Directory + + + Search by name, email, phone, company, or tag. + + + +
+
+ + +
+ +
+ +
+ + + + + + + + + + + + + + {data.items.length === 0 ? ( + + + + ) : ( + data.items.map((contact: any) => ( + + + + + + + + + + )) + )} + +
ContactCompanyStatusConversationsLeadsLast ActivityOpen
+ No contacts matched this filter. +
+
+ {contact.firstName} {contact.lastName} +
+
+ {contact.email || "No email"} +
+
+ {contact.phone || "No phone"} +
+
+ {contact.company || "—"} + + {contact.status} + {contact.conversationCount}{contact.leadCount} + {formatTimestamp(contact.lastActivityAt)} + + + + +
+
+
+
+
+
+ ) +} + +export const metadata = { + title: "Contacts | Admin", + description: "View backend-owned Rocky contact records", +} diff --git a/app/admin/conversations/[id]/page.tsx b/app/admin/conversations/[id]/page.tsx new file mode 100644 index 00000000..200f7d42 --- /dev/null +++ b/app/admin/conversations/[id]/page.tsx @@ -0,0 +1,241 @@ +import Link from "next/link" +import { notFound } from "next/navigation" +import { fetchQuery } from "convex/nextjs" +import { ArrowLeft, ExternalLink, MessageSquare } from "lucide-react" +import { api } from "@/convex/_generated/api" +import { Badge } from "@/components/ui/badge" +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" + +type PageProps = { + params: Promise<{ + id: string + }> +} + +function formatTimestamp(value?: number) { + if (!value) { + return "—" + } + + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} + +function formatDuration(value?: number) { + if (!value) { + return "—" + } + const totalSeconds = Math.round(value / 1000) + const minutes = Math.floor(totalSeconds / 60) + const seconds = totalSeconds % 60 + return `${minutes}:${String(seconds).padStart(2, "0")}` +} + +export default async function AdminConversationDetailPage({ + params, +}: PageProps) { + const { id } = await params + const detail = await fetchQuery(api.crm.getAdminConversationDetail, { + conversationId: id, + }) + + if (!detail) { + notFound() + } + + return ( +
+
+
+ + + Back to conversations + +

+ {detail.conversation.title || "Conversation Detail"} +

+

+ Unified thread for Rocky-owned conversation management. +

+
+ +
+ + + + + Conversation Status + + + Channel, ownership, and sync metadata. + + + +
+

+ Channel +

+ + {detail.conversation.channel} + +
+
+

+ Status +

+ + {detail.conversation.status} + +
+
+

+ Contact +

+

+ {detail.contact?.name || "Unlinked"} +

+
+
+

+ Started +

+

+ {formatTimestamp(detail.conversation.startedAt)} +

+
+
+

+ Last Activity +

+

+ {formatTimestamp(detail.conversation.lastMessageAt)} +

+
+
+

+ GHL Conversation ID +

+

+ {detail.conversation.ghlConversationId || "—"} +

+
+ {detail.conversation.summaryText ? ( +
+

+ Summary +

+

+ {detail.conversation.summaryText} +

+
+ ) : null} +
+
+ + + + Recordings & Leads + + Call artifacts and related lead outcomes for this thread. + + + + {detail.recordings.map((recording: any) => ( +
+
+ + {recording.recordingStatus || "recording"} + + + {formatDuration(recording.durationMs)} + +
+ {recording.recordingUrl ? ( + + Open recording + + + ) : null} + {recording.transcriptionText ? ( +

+ {recording.transcriptionText} +

+ ) : null} +
+ ))} + + {detail.leads.map((lead: any) => ( +
+
+

{lead.type}

+ {lead.status} +
+

+ {lead.message || lead.intent || "—"} +

+
+ ))} + + {detail.recordings.length === 0 && detail.leads.length === 0 ? ( +

+ No recordings or linked leads for this conversation yet. +

+ ) : null} +
+
+
+ + + + Messages + + Full backend-owned thread history for this conversation. + + + + {detail.messages.length === 0 ? ( +

+ No messages have been mirrored into this conversation yet. +

+ ) : ( + detail.messages.map((message: any) => ( +
+
+ + {message.channel} • {message.direction} + + {formatTimestamp(message.sentAt)} +
+

{message.body}

+
+ )) + )} +
+
+
+
+ ) +} + +export const metadata = { + title: "Conversation Detail | Admin", + description: "Review a Rocky conversation thread, recordings, and leads", +} diff --git a/app/admin/conversations/page.tsx b/app/admin/conversations/page.tsx new file mode 100644 index 00000000..560ff8ad --- /dev/null +++ b/app/admin/conversations/page.tsx @@ -0,0 +1,208 @@ +import Link from "next/link" +import { fetchQuery } from "convex/nextjs" +import { MessageSquare, Search } from "lucide-react" +import { api } from "@/convex/_generated/api" +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { Input } from "@/components/ui/input" + +type PageProps = { + searchParams: Promise<{ + search?: string + channel?: "call" | "sms" | "chat" | "unknown" + status?: "open" | "closed" | "archived" + page?: string + }> +} + +function formatTimestamp(value?: number) { + if (!value) { + return "—" + } + + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} + +export default async function AdminConversationsPage({ + searchParams, +}: PageProps) { + const params = await searchParams + const page = Math.max(1, Number.parseInt(params.page || "1", 10) || 1) + const search = params.search?.trim() || undefined + + const data = await fetchQuery(api.crm.listAdminConversations, { + search, + page, + limit: 25, + channel: params.channel, + status: params.status, + }) + + return ( +
+
+
+
+

+ Conversations +

+

+ Unified inbox across backend-owned call and SMS conversation + threads. +

+
+ + + +
+ + + + + + Conversation Inbox + + + Search by contact, conversation preview, phone, email, or external + ID. + + + +
+
+ + +
+ + + +
+ +
+ + + + + + + + + + + + + + + {data.items.length === 0 ? ( + + + + ) : ( + data.items.map((conversation: any) => ( + + + + + + + + + + + )) + )} + +
ConversationContactChannelStatusMessagesRecordingsLast ActivityOpen
+ No conversations matched this filter. +
+
+ {conversation.title || "Untitled conversation"} +
+
+ {conversation.lastMessagePreview || "No preview yet"} +
+
+ {conversation.contact ? ( +
+
+ {conversation.contact.name} +
+
+ {conversation.contact.phone || + conversation.contact.email || + "—"} +
+
+ ) : ( + "—" + )} +
+ {conversation.channel} + + {conversation.status} + {conversation.messageCount} + {conversation.recordingCount} + + {formatTimestamp(conversation.lastMessageAt)} + + + + +
+
+
+
+
+
+ ) +} + +export const metadata = { + title: "Conversations | Admin", + description: "View backend-owned Rocky conversation threads", +} diff --git a/app/admin/layout.tsx b/app/admin/layout.tsx index a7674a3a..38a23f98 100644 --- a/app/admin/layout.tsx +++ b/app/admin/layout.tsx @@ -1,5 +1,9 @@ +import Link from "next/link" import { redirect } from "next/navigation" -import { isAdminUiEnabled } from "@/lib/server/admin-auth" +import { + getAdminUserFromCookies, + isAdminUiEnabled, +} from "@/lib/server/admin-auth" export default async function AdminLayout({ children, @@ -10,5 +14,29 @@ export default async function AdminLayout({ redirect("/") } - return <>{children} + const adminUser = await getAdminUserFromCookies() + if (!adminUser) { + redirect("/sign-in") + } + + return ( +
+
+
+
+ + Rocky Admin + + {adminUser.email} +
+
+ +
+
+
+ {children} +
+ ) } diff --git a/app/admin/page.tsx b/app/admin/page.tsx index 058e5ed7..071adf45 100644 --- a/app/admin/page.tsx +++ b/app/admin/page.tsx @@ -21,6 +21,8 @@ import { Settings, BarChart3, Phone, + MessageSquare, + ContactRound, } from "lucide-react" import { fetchAllProducts } from "@/lib/stripe/products" @@ -196,6 +198,18 @@ export default async function AdminDashboard() {

+ + + + + + + + ) : ( + <> +

+ Admin sign-in is not configured +

+

+ Enable admin UI, Convex, and staging credentials before using + this area. +

+ + )}
diff --git a/convex/crm.ts b/convex/crm.ts new file mode 100644 index 00000000..1ed08c43 --- /dev/null +++ b/convex/crm.ts @@ -0,0 +1,982 @@ +// @ts-nocheck +import { mutation, query } from "./_generated/server" +import { v } from "convex/values" +import { + ensureConversationParticipant, + normalizeEmail, + normalizePhone, + upsertCallArtifactRecord, + upsertContactRecord, + upsertConversationRecord, + upsertExternalSyncState, + upsertMessageRecord, +} from "./crmModel" + +function matchesSearch(values: Array, search: string) { + if (!search) { + return true + } + + const haystack = values + .map((value) => String(value || "").toLowerCase()) + .join("\n") + + return haystack.includes(search) +} + +async function buildContactTimeline(ctx, contactId) { + const conversations = await ctx.db + .query("conversations") + .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) + .collect() + const messages = await ctx.db + .query("messages") + .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) + .collect() + const callArtifacts = await ctx.db + .query("callArtifacts") + .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) + .collect() + const leads = (await ctx.db.query("leadSubmissions").collect()).filter( + (lead) => lead.contactId === contactId + ) + + const timeline = [ + ...conversations.map((item) => ({ + id: item._id, + type: "conversation", + timestamp: item.lastMessageAt || item.startedAt || item.updatedAt, + title: item.title || item.channel, + body: item.lastMessagePreview || item.summaryText || "", + status: item.status, + })), + ...messages.map((item) => ({ + id: item._id, + type: "message", + timestamp: item.sentAt, + title: `${item.channel.toUpperCase()} ${item.direction || "system"}`, + body: item.body, + status: item.status, + })), + ...callArtifacts.map((item) => ({ + id: item._id, + type: "recording", + timestamp: item.endedAt || item.startedAt || item.updatedAt, + title: item.recordingStatus || "recording", + body: item.recordingUrl || item.transcriptionText || "", + status: item.recordingStatus, + })), + ...leads.map((item) => ({ + id: item._id, + type: "lead", + timestamp: item.createdAt, + title: item.type, + body: item.message || item.intent || "", + status: item.status, + })), + ] + + timeline.sort((a, b) => b.timestamp - a.timestamp) + return timeline +} + +export const upsertContact = mutation({ + args: { + firstName: v.string(), + lastName: v.string(), + email: v.optional(v.string()), + phone: v.optional(v.string()), + company: v.optional(v.string()), + tags: v.optional(v.array(v.string())), + status: v.optional( + v.union( + v.literal("active"), + v.literal("lead"), + v.literal("customer"), + v.literal("inactive") + ) + ), + source: v.optional(v.string()), + notes: v.optional(v.string()), + ghlContactId: v.optional(v.string()), + livekitIdentity: v.optional(v.string()), + lastActivityAt: v.optional(v.number()), + }, + handler: async (ctx, args) => { + return await upsertContactRecord(ctx, args) + }, +}) + +export const upsertConversation = mutation({ + args: { + contactId: v.optional(v.id("contacts")), + title: v.optional(v.string()), + channel: v.union( + v.literal("call"), + v.literal("sms"), + v.literal("chat"), + v.literal("unknown") + ), + source: v.optional(v.string()), + status: v.optional( + v.union(v.literal("open"), v.literal("closed"), v.literal("archived")) + ), + direction: v.optional( + v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed")) + ), + startedAt: v.optional(v.number()), + endedAt: v.optional(v.number()), + lastMessageAt: v.optional(v.number()), + lastMessagePreview: v.optional(v.string()), + unreadCount: v.optional(v.number()), + summaryText: v.optional(v.string()), + ghlConversationId: v.optional(v.string()), + livekitRoomName: v.optional(v.string()), + voiceSessionId: v.optional(v.id("voiceSessions")), + }, + handler: async (ctx, args) => { + return await upsertConversationRecord(ctx, args) + }, +}) + +export const upsertMessage = mutation({ + args: { + conversationId: v.id("conversations"), + contactId: v.optional(v.id("contacts")), + direction: v.optional( + v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system")) + ), + channel: v.union( + v.literal("call"), + v.literal("sms"), + v.literal("chat"), + v.literal("unknown") + ), + source: v.optional(v.string()), + messageType: v.optional(v.string()), + body: v.string(), + status: v.optional(v.string()), + sentAt: v.optional(v.number()), + ghlMessageId: v.optional(v.string()), + voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")), + voiceSessionId: v.optional(v.id("voiceSessions")), + livekitRoomName: v.optional(v.string()), + metadata: v.optional(v.string()), + }, + handler: async (ctx, args) => { + return await upsertMessageRecord(ctx, args) + }, +}) + +export const upsertCallArtifact = mutation({ + args: { + conversationId: v.id("conversations"), + contactId: v.optional(v.id("contacts")), + source: v.optional(v.string()), + recordingId: v.optional(v.string()), + recordingUrl: v.optional(v.string()), + recordingStatus: v.optional( + v.union( + v.literal("pending"), + v.literal("starting"), + v.literal("recording"), + v.literal("completed"), + v.literal("failed") + ) + ), + transcriptionText: v.optional(v.string()), + durationMs: v.optional(v.number()), + startedAt: v.optional(v.number()), + endedAt: v.optional(v.number()), + ghlMessageId: v.optional(v.string()), + voiceSessionId: v.optional(v.id("voiceSessions")), + livekitRoomName: v.optional(v.string()), + metadata: v.optional(v.string()), + }, + handler: async (ctx, args) => { + return await upsertCallArtifactRecord(ctx, args) + }, +}) + +export const updateContact = mutation({ + args: { + contactId: v.id("contacts"), + firstName: v.optional(v.string()), + lastName: v.optional(v.string()), + email: v.optional(v.string()), + phone: v.optional(v.string()), + company: v.optional(v.string()), + status: v.optional( + v.union( + v.literal("active"), + v.literal("lead"), + v.literal("customer"), + v.literal("inactive") + ) + ), + tags: v.optional(v.array(v.string())), + notes: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const existing = await ctx.db.get(args.contactId) + if (!existing) { + throw new Error("Contact not found") + } + + await ctx.db.patch(args.contactId, { + firstName: args.firstName ?? existing.firstName, + lastName: args.lastName ?? existing.lastName, + email: args.email ?? existing.email, + normalizedEmail: normalizeEmail(args.email ?? existing.email), + phone: args.phone ?? existing.phone, + normalizedPhone: normalizePhone(args.phone ?? existing.phone), + company: args.company ?? existing.company, + status: args.status ?? existing.status, + tags: args.tags ?? existing.tags, + notes: args.notes ?? existing.notes, + updatedAt: Date.now(), + }) + + return await ctx.db.get(args.contactId) + }, +}) + +export const linkLeadSubmission = mutation({ + args: { + leadId: v.id("leadSubmissions"), + contactId: v.optional(v.id("contacts")), + conversationId: v.optional(v.id("conversations")), + }, + handler: async (ctx, args) => { + await ctx.db.patch(args.leadId, { + contactId: args.contactId, + conversationId: args.conversationId, + updatedAt: Date.now(), + }) + return await ctx.db.get(args.leadId) + }, +}) + +export const importContact = mutation({ + args: { + provider: v.string(), + entityId: v.string(), + payload: v.any(), + }, + handler: async (ctx, args) => { + const payload = args.payload || {} + const contact = await upsertContactRecord(ctx, { + firstName: + payload.firstName || payload.first_name || payload.name || "Unknown", + lastName: payload.lastName || payload.last_name || "Contact", + email: payload.email, + phone: payload.phone, + company: payload.company || payload.companyName, + tags: Array.isArray(payload.tags) ? payload.tags : [], + status: "lead", + source: `${args.provider}:mirror`, + ghlContactId: payload.id || args.entityId, + lastActivityAt: + typeof payload.dateUpdated === "string" + ? new Date(payload.dateUpdated).getTime() + : Date.now(), + }) + + await upsertExternalSyncState(ctx, { + provider: args.provider, + entityType: "contact", + entityId: args.entityId, + status: "synced", + lastAttemptAt: Date.now(), + lastSyncedAt: Date.now(), + metadata: JSON.stringify({ + contactId: contact?._id, + }), + }) + + return contact + }, +}) + +export const importConversation = mutation({ + args: { + provider: v.string(), + entityId: v.string(), + payload: v.any(), + }, + handler: async (ctx, args) => { + const payload = args.payload || {} + const contact = await upsertContactRecord(ctx, { + firstName: payload.firstName || payload.contactName || "Unknown", + lastName: payload.lastName || "Contact", + email: payload.email, + phone: payload.phone || payload.contactPhone, + source: `${args.provider}:mirror`, + ghlContactId: payload.contactId, + status: "lead", + lastActivityAt: + typeof payload.dateUpdated === "string" + ? new Date(payload.dateUpdated).getTime() + : Date.now(), + }) + + const conversation = await upsertConversationRecord(ctx, { + contactId: contact?._id, + title: payload.fullName || payload.title || payload.contactName, + channel: + payload.channel === "SMS" || payload.type === "sms" ? "sms" : "call", + source: `${args.provider}:mirror`, + status: payload.status || "open", + direction: payload.direction || "mixed", + startedAt: + typeof payload.dateAdded === "string" + ? new Date(payload.dateAdded).getTime() + : Date.now(), + endedAt: + typeof payload.dateEnded === "string" + ? new Date(payload.dateEnded).getTime() + : undefined, + lastMessageAt: + typeof payload.lastMessageAt === "string" + ? new Date(payload.lastMessageAt).getTime() + : undefined, + lastMessagePreview: payload.lastMessageBody || payload.snippet, + summaryText: payload.summary, + ghlConversationId: payload.id || args.entityId, + }) + + await ensureConversationParticipant(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + role: "contact", + displayName: + [contact?.firstName, contact?.lastName].filter(Boolean).join(" ") || + payload.contactName, + phone: contact?.phone, + email: contact?.email, + externalContactId: payload.contactId, + }) + + await upsertExternalSyncState(ctx, { + provider: args.provider, + entityType: "conversation", + entityId: args.entityId, + status: "synced", + lastAttemptAt: Date.now(), + lastSyncedAt: Date.now(), + metadata: JSON.stringify({ + contactId: contact?._id, + conversationId: conversation?._id, + }), + }) + + return conversation + }, +}) + +export const importMessage = mutation({ + args: { + provider: v.string(), + entityId: v.string(), + payload: v.any(), + }, + handler: async (ctx, args) => { + const payload = args.payload || {} + const contact = await upsertContactRecord(ctx, { + firstName: payload.firstName || payload.contactName || "Unknown", + lastName: payload.lastName || "Contact", + email: payload.email, + phone: payload.phone, + source: `${args.provider}:mirror`, + ghlContactId: payload.contactId, + status: "lead", + }) + + const conversation = await upsertConversationRecord(ctx, { + contactId: contact?._id, + title: payload.contactName, + channel: + payload.channel === "SMS" || payload.messageType === "SMS" + ? "sms" + : payload.channel === "Call" + ? "call" + : "unknown", + source: `${args.provider}:mirror`, + status: "open", + direction: payload.direction || "mixed", + startedAt: + typeof payload.dateAdded === "string" + ? new Date(payload.dateAdded).getTime() + : Date.now(), + lastMessageAt: + typeof payload.dateAdded === "string" + ? new Date(payload.dateAdded).getTime() + : Date.now(), + lastMessagePreview: payload.body || payload.message, + ghlConversationId: payload.conversationId, + }) + + await ensureConversationParticipant(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + role: "contact", + displayName: payload.contactName, + phone: contact?.phone, + email: contact?.email, + externalContactId: payload.contactId, + }) + + const message = await upsertMessageRecord(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + direction: + payload.direction === "inbound" + ? "inbound" + : payload.direction === "outbound" + ? "outbound" + : "system", + channel: + payload.channel === "SMS" || payload.messageType === "SMS" + ? "sms" + : payload.channel === "Call" + ? "call" + : "unknown", + source: `${args.provider}:mirror`, + messageType: payload.messageType || payload.type, + body: payload.body || payload.message || payload.transcript || "", + status: payload.status, + sentAt: + typeof payload.dateAdded === "string" + ? new Date(payload.dateAdded).getTime() + : Date.now(), + ghlMessageId: payload.id || args.entityId, + metadata: JSON.stringify(payload), + }) + + await upsertExternalSyncState(ctx, { + provider: args.provider, + entityType: "message", + entityId: args.entityId, + status: "synced", + lastAttemptAt: Date.now(), + lastSyncedAt: Date.now(), + metadata: JSON.stringify({ + conversationId: conversation?._id, + messageId: message?._id, + }), + }) + + return message + }, +}) + +export const importRecording = mutation({ + args: { + provider: v.string(), + entityId: v.string(), + payload: v.any(), + }, + handler: async (ctx, args) => { + const payload = args.payload || {} + const conversation = await upsertConversationRecord(ctx, { + channel: "call", + source: `${args.provider}:mirror`, + status: "closed", + direction: payload.direction || "mixed", + startedAt: + typeof payload.createdAt === "string" + ? new Date(payload.createdAt).getTime() + : Date.now(), + lastMessageAt: + typeof payload.createdAt === "string" + ? new Date(payload.createdAt).getTime() + : Date.now(), + lastMessagePreview: payload.summary || payload.transcript, + ghlConversationId: payload.conversationId, + livekitRoomName: payload.livekitRoomName, + }) + + const artifact = await upsertCallArtifactRecord(ctx, { + conversationId: conversation._id, + source: `${args.provider}:mirror`, + recordingId: payload.recordingId || payload.id || args.entityId, + recordingUrl: payload.recordingUrl, + recordingStatus: payload.recordingStatus || "completed", + transcriptionText: payload.transcript, + durationMs: + typeof payload.durationMs === "number" + ? payload.durationMs + : typeof payload.duration === "number" + ? payload.duration * 1000 + : undefined, + startedAt: + typeof payload.createdAt === "string" + ? new Date(payload.createdAt).getTime() + : undefined, + endedAt: + typeof payload.endedAt === "string" + ? new Date(payload.endedAt).getTime() + : undefined, + ghlMessageId: payload.messageId, + livekitRoomName: payload.livekitRoomName, + metadata: JSON.stringify(payload), + }) + + await upsertExternalSyncState(ctx, { + provider: args.provider, + entityType: "recording", + entityId: args.entityId, + status: "synced", + lastAttemptAt: Date.now(), + lastSyncedAt: Date.now(), + metadata: JSON.stringify({ + conversationId: conversation?._id, + callArtifactId: artifact?._id, + }), + }) + + return artifact + }, +}) + +export const updateSyncCheckpoint = mutation({ + args: { + provider: v.string(), + entityType: v.string(), + entityId: v.string(), + cursor: v.optional(v.string()), + checksum: v.optional(v.string()), + status: v.optional( + v.union( + v.literal("pending"), + v.literal("synced"), + v.literal("failed"), + v.literal("reconciled"), + v.literal("mismatch") + ) + ), + error: v.optional(v.string()), + metadata: v.optional(v.string()), + }, + handler: async (ctx, args) => { + return await upsertExternalSyncState(ctx, { + ...args, + lastAttemptAt: Date.now(), + lastSyncedAt: args.status === "synced" ? Date.now() : undefined, + }) + }, +}) + +export const reconcileExternalState = mutation({ + args: { + provider: v.string(), + }, + handler: async (ctx, args) => { + const states = await ctx.db + .query("externalSyncState") + .withIndex("by_provider_entityType", (q) => q.eq("provider", args.provider)) + .collect() + + const mismatches = [] + for (const state of states) { + const missing = + state.entityType === "contact" + ? !(await ctx.db + .query("contacts") + .withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", state.entityId)) + .unique()) + : state.entityType === "conversation" + ? !(await ctx.db + .query("conversations") + .withIndex("by_ghlConversationId", (q) => + q.eq("ghlConversationId", state.entityId) + ) + .unique()) + : state.entityType === "message" + ? !(await ctx.db + .query("messages") + .withIndex("by_ghlMessageId", (q) => q.eq("ghlMessageId", state.entityId)) + .unique()) + : state.entityType === "recording" + ? !(await ctx.db + .query("callArtifacts") + .withIndex("by_recordingId", (q) => q.eq("recordingId", state.entityId)) + .unique()) + : false + + if (missing) { + await ctx.db.patch(state._id, { + status: "mismatch", + error: "Referenced mirrored record is missing locally.", + updatedAt: Date.now(), + }) + mismatches.push(state.entityId) + } else { + await ctx.db.patch(state._id, { + status: "reconciled", + error: undefined, + lastSyncedAt: state.lastSyncedAt ?? Date.now(), + updatedAt: Date.now(), + }) + } + } + + return { + provider: args.provider, + checked: states.length, + mismatches, + } + }, +}) + +export const listAdminContacts = query({ + args: { + search: v.optional(v.string()), + page: v.optional(v.number()), + limit: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const page = Math.max(1, args.page ?? 1) + const limit = Math.min(100, Math.max(1, args.limit ?? 25)) + const search = String(args.search || "").trim().toLowerCase() + + const contacts = await ctx.db.query("contacts").collect() + const filtered = contacts.filter((contact) => + matchesSearch( + [ + `${contact.firstName} ${contact.lastName}`, + contact.email, + contact.phone, + contact.company, + ...(contact.tags || []), + ], + search + ) + ) + + filtered.sort( + (a, b) => + (b.lastActivityAt || b.updatedAt || 0) - (a.lastActivityAt || a.updatedAt || 0) + ) + + const paged = filtered.slice((page - 1) * limit, page * limit) + const items = await Promise.all( + paged.map(async (contact) => { + const conversations = await ctx.db + .query("conversations") + .withIndex("by_contactId", (q) => q.eq("contactId", contact._id)) + .collect() + const leads = await ctx.db + .query("leadSubmissions") + .collect() + const contactLeads = leads.filter((lead) => lead.contactId === contact._id) + + return { + id: contact._id, + firstName: contact.firstName, + lastName: contact.lastName, + email: contact.email, + phone: contact.phone, + company: contact.company, + tags: contact.tags || [], + status: contact.status || "lead", + source: contact.source, + ghlContactId: contact.ghlContactId, + lastActivityAt: contact.lastActivityAt, + conversationCount: conversations.length, + leadCount: contactLeads.length, + updatedAt: contact.updatedAt, + } + }) + ) + + return { + items, + pagination: { + page, + limit, + total: filtered.length, + totalPages: Math.max(1, Math.ceil(filtered.length / limit)), + }, + } + }, +}) + +export const getContactTimeline = query({ + args: { + contactId: v.id("contacts"), + }, + handler: async (ctx, args) => { + return await buildContactTimeline(ctx, args.contactId) + }, +}) + +export const getAdminContactDetail = query({ + args: { + contactId: v.string(), + }, + handler: async (ctx, args) => { + const contact = await ctx.db.get(args.contactId as any) + if (!contact) { + return null + } + + const conversations = await ctx.db + .query("conversations") + .withIndex("by_contactId", (q) => q.eq("contactId", contact._id)) + .collect() + conversations.sort( + (a, b) => (b.lastMessageAt || b.updatedAt) - (a.lastMessageAt || a.updatedAt) + ) + + const timeline = await buildContactTimeline(ctx, contact._id) + + return { + contact: { + id: contact._id, + firstName: contact.firstName, + lastName: contact.lastName, + email: contact.email, + phone: contact.phone, + company: contact.company, + tags: contact.tags || [], + status: contact.status || "lead", + source: contact.source, + notes: contact.notes, + ghlContactId: contact.ghlContactId, + lastActivityAt: contact.lastActivityAt, + updatedAt: contact.updatedAt, + }, + conversations: conversations.map((conversation) => ({ + id: conversation._id, + channel: conversation.channel, + status: conversation.status || "open", + title: conversation.title, + lastMessageAt: conversation.lastMessageAt, + lastMessagePreview: conversation.lastMessagePreview, + recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId), + })), + timeline, + } + }, +}) + +export const listAdminConversations = query({ + args: { + search: v.optional(v.string()), + channel: v.optional( + v.union( + v.literal("call"), + v.literal("sms"), + v.literal("chat"), + v.literal("unknown") + ) + ), + status: v.optional( + v.union(v.literal("open"), v.literal("closed"), v.literal("archived")) + ), + page: v.optional(v.number()), + limit: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const page = Math.max(1, args.page ?? 1) + const limit = Math.min(100, Math.max(1, args.limit ?? 25)) + const search = String(args.search || "").trim().toLowerCase() + + const conversations = await ctx.db.query("conversations").collect() + const filtered = [] + + for (const conversation of conversations) { + if (args.channel && conversation.channel !== args.channel) { + continue + } + if (args.status && (conversation.status || "open") !== args.status) { + continue + } + + const contact = conversation.contactId + ? await ctx.db.get(conversation.contactId) + : null + if ( + !matchesSearch( + [ + conversation.title, + conversation.lastMessagePreview, + conversation.ghlConversationId, + contact + ? `${contact.firstName} ${contact.lastName}` + : undefined, + contact?.email, + contact?.phone, + ], + search + ) + ) { + continue + } + + filtered.push({ conversation, contact }) + } + + filtered.sort( + (a, b) => + (b.conversation.lastMessageAt || b.conversation.updatedAt) - + (a.conversation.lastMessageAt || a.conversation.updatedAt) + ) + + const paged = filtered.slice((page - 1) * limit, page * limit) + const items = await Promise.all( + paged.map(async ({ conversation, contact }) => { + const recordings = await ctx.db + .query("callArtifacts") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", conversation._id) + ) + .collect() + const messages = await ctx.db + .query("messages") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", conversation._id) + ) + .collect() + + return { + id: conversation._id, + title: + conversation.title || + (contact + ? `${contact.firstName} ${contact.lastName}`.trim() + : "Unnamed conversation"), + channel: conversation.channel, + status: conversation.status || "open", + direction: conversation.direction || "mixed", + source: conversation.source, + startedAt: conversation.startedAt, + lastMessageAt: conversation.lastMessageAt, + lastMessagePreview: conversation.lastMessagePreview, + contact: contact + ? { + id: contact._id, + name: `${contact.firstName} ${contact.lastName}`.trim(), + email: contact.email, + phone: contact.phone, + } + : null, + messageCount: messages.length, + recordingCount: recordings.length, + } + }) + ) + + return { + items, + pagination: { + page, + limit, + total: filtered.length, + totalPages: Math.max(1, Math.ceil(filtered.length / limit)), + }, + } + }, +}) + +export const getAdminConversationDetail = query({ + args: { + conversationId: v.string(), + }, + handler: async (ctx, args) => { + const conversation = await ctx.db.get(args.conversationId as any) + if (!conversation) { + return null + } + + const contact = conversation.contactId + ? await ctx.db.get(conversation.contactId) + : null + const participants = await ctx.db + .query("conversationParticipants") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", conversation._id) + ) + .collect() + const messages = await ctx.db + .query("messages") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", conversation._id) + ) + .collect() + messages.sort((a, b) => a.sentAt - b.sentAt) + const recordings = await ctx.db + .query("callArtifacts") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", conversation._id) + ) + .collect() + const leads = (await ctx.db.query("leadSubmissions").collect()).filter( + (lead) => + lead.conversationId === conversation._id || + (contact && lead.contactId === contact._id) + ) + + return { + conversation: { + id: conversation._id, + title: conversation.title, + channel: conversation.channel, + status: conversation.status || "open", + direction: conversation.direction || "mixed", + source: conversation.source, + startedAt: conversation.startedAt, + endedAt: conversation.endedAt, + lastMessageAt: conversation.lastMessageAt, + lastMessagePreview: conversation.lastMessagePreview, + summaryText: conversation.summaryText, + ghlConversationId: conversation.ghlConversationId, + livekitRoomName: conversation.livekitRoomName, + }, + contact: contact + ? { + id: contact._id, + name: `${contact.firstName} ${contact.lastName}`.trim(), + email: contact.email, + phone: contact.phone, + company: contact.company, + } + : null, + participants: participants.map((participant) => ({ + id: participant._id, + role: participant.role || "unknown", + displayName: participant.displayName, + email: participant.email, + phone: participant.phone, + })), + messages: messages.map((message) => ({ + id: message._id, + direction: message.direction || "system", + channel: message.channel, + source: message.source, + body: message.body, + status: message.status, + sentAt: message.sentAt, + })), + recordings: recordings.map((recording) => ({ + id: recording._id, + recordingId: recording.recordingId, + recordingUrl: recording.recordingUrl, + recordingStatus: recording.recordingStatus, + transcriptionText: recording.transcriptionText, + durationMs: recording.durationMs, + startedAt: recording.startedAt, + endedAt: recording.endedAt, + })), + leads: leads.map((lead) => ({ + id: lead._id, + type: lead.type, + status: lead.status, + message: lead.message, + intent: lead.intent, + createdAt: lead.createdAt, + })), + } + }, +}) diff --git a/convex/crmModel.ts b/convex/crmModel.ts new file mode 100644 index 00000000..2ec06c80 --- /dev/null +++ b/convex/crmModel.ts @@ -0,0 +1,390 @@ +// @ts-nocheck + +export function normalizeEmail(value?: string) { + const normalized = String(value || "") + .trim() + .toLowerCase() + return normalized || undefined +} + +export function normalizePhone(value?: string) { + const digits = String(value || "").replace(/\D/g, "") + if (!digits) { + return undefined + } + + if (digits.length === 10) { + return `+1${digits}` + } + + if (digits.length === 11 && digits.startsWith("1")) { + return `+${digits}` + } + + return `+${digits}` +} + +export function dedupeStrings(values?: string[]) { + return Array.from( + new Set( + (values || []) + .map((value) => String(value || "").trim()) + .filter(Boolean) + ) + ) +} + +export async function findContactByIdentity(ctx, args) { + if (args.ghlContactId) { + const byGhl = await ctx.db + .query("contacts") + .withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", args.ghlContactId)) + .unique() + if (byGhl) { + return byGhl + } + } + + const normalizedEmail = normalizeEmail(args.email) + if (normalizedEmail) { + const byEmail = await ctx.db + .query("contacts") + .withIndex("by_normalizedEmail", (q) => + q.eq("normalizedEmail", normalizedEmail) + ) + .unique() + if (byEmail) { + return byEmail + } + } + + const normalizedPhone = normalizePhone(args.phone) + if (normalizedPhone) { + const byPhone = await ctx.db + .query("contacts") + .withIndex("by_normalizedPhone", (q) => + q.eq("normalizedPhone", normalizedPhone) + ) + .unique() + if (byPhone) { + return byPhone + } + } + + return null +} + +export async function upsertContactRecord(ctx, input) { + const now = input.updatedAt ?? Date.now() + const normalizedEmail = normalizeEmail(input.email) + const normalizedPhone = normalizePhone(input.phone) + const existing = await findContactByIdentity(ctx, { + ghlContactId: input.ghlContactId, + email: normalizedEmail, + phone: normalizedPhone, + }) + + const patch = { + firstName: String(input.firstName || existing?.firstName || "Unknown"), + lastName: String(input.lastName || existing?.lastName || "Contact"), + email: input.email || existing?.email, + normalizedEmail: normalizedEmail || existing?.normalizedEmail, + phone: input.phone || existing?.phone, + normalizedPhone: normalizedPhone || existing?.normalizedPhone, + company: input.company ?? existing?.company, + tags: dedupeStrings([...(existing?.tags || []), ...(input.tags || [])]), + status: input.status || existing?.status || "lead", + source: input.source || existing?.source, + notes: input.notes ?? existing?.notes, + ghlContactId: input.ghlContactId || existing?.ghlContactId, + livekitIdentity: input.livekitIdentity || existing?.livekitIdentity, + lastActivityAt: + input.lastActivityAt ?? existing?.lastActivityAt ?? input.createdAt ?? now, + updatedAt: now, + } + + if (existing) { + await ctx.db.patch(existing._id, patch) + return await ctx.db.get(existing._id) + } + + const id = await ctx.db.insert("contacts", { + ...patch, + createdAt: input.createdAt ?? now, + }) + + return await ctx.db.get(id) +} + +export async function upsertConversationRecord(ctx, input) { + const now = input.updatedAt ?? Date.now() + let existing = null + + if (input.ghlConversationId) { + existing = await ctx.db + .query("conversations") + .withIndex("by_ghlConversationId", (q) => + q.eq("ghlConversationId", input.ghlConversationId) + ) + .unique() + } + + if (!existing && input.livekitRoomName) { + existing = await ctx.db + .query("conversations") + .withIndex("by_livekitRoomName", (q) => + q.eq("livekitRoomName", input.livekitRoomName) + ) + .unique() + } + + if (!existing && input.voiceSessionId) { + existing = await ctx.db + .query("conversations") + .withIndex("by_voiceSessionId", (q) => + q.eq("voiceSessionId", input.voiceSessionId) + ) + .unique() + } + + const patch = { + contactId: input.contactId ?? existing?.contactId, + title: input.title || existing?.title, + channel: input.channel || existing?.channel || "unknown", + source: input.source || existing?.source, + status: input.status || existing?.status || "open", + direction: input.direction || existing?.direction || "mixed", + startedAt: input.startedAt ?? existing?.startedAt ?? now, + endedAt: input.endedAt ?? existing?.endedAt, + lastMessageAt: input.lastMessageAt ?? existing?.lastMessageAt, + lastMessagePreview: input.lastMessagePreview ?? existing?.lastMessagePreview, + unreadCount: input.unreadCount ?? existing?.unreadCount ?? 0, + summaryText: input.summaryText ?? existing?.summaryText, + ghlConversationId: input.ghlConversationId || existing?.ghlConversationId, + livekitRoomName: input.livekitRoomName || existing?.livekitRoomName, + voiceSessionId: input.voiceSessionId ?? existing?.voiceSessionId, + updatedAt: now, + } + + if (existing) { + await ctx.db.patch(existing._id, patch) + return await ctx.db.get(existing._id) + } + + const id = await ctx.db.insert("conversations", { + ...patch, + createdAt: input.createdAt ?? now, + }) + return await ctx.db.get(id) +} + +export async function ensureConversationParticipant(ctx, input) { + const participants = await ctx.db + .query("conversationParticipants") + .withIndex("by_conversationId", (q) => + q.eq("conversationId", input.conversationId) + ) + .collect() + + const normalizedEmail = normalizeEmail(input.email) + const normalizedPhone = normalizePhone(input.phone) + const existing = participants.find((participant) => { + if (input.contactId && participant.contactId === input.contactId) { + return true + } + if ( + input.externalContactId && + participant.externalContactId === input.externalContactId + ) { + return true + } + if (normalizedEmail && participant.normalizedEmail === normalizedEmail) { + return true + } + if (normalizedPhone && participant.normalizedPhone === normalizedPhone) { + return true + } + return false + }) + + const patch = { + contactId: input.contactId ?? existing?.contactId, + role: input.role || existing?.role || "unknown", + displayName: input.displayName || existing?.displayName, + phone: input.phone || existing?.phone, + normalizedPhone: normalizedPhone || existing?.normalizedPhone, + email: input.email || existing?.email, + normalizedEmail: normalizedEmail || existing?.normalizedEmail, + externalContactId: input.externalContactId || existing?.externalContactId, + updatedAt: Date.now(), + } + + if (existing) { + await ctx.db.patch(existing._id, patch) + return await ctx.db.get(existing._id) + } + + const id = await ctx.db.insert("conversationParticipants", { + conversationId: input.conversationId, + ...patch, + createdAt: Date.now(), + }) + return await ctx.db.get(id) +} + +export async function upsertMessageRecord(ctx, input) { + let existing = null + + if (input.ghlMessageId) { + existing = await ctx.db + .query("messages") + .withIndex("by_ghlMessageId", (q) => + q.eq("ghlMessageId", input.ghlMessageId) + ) + .unique() + } + + if (!existing && input.voiceTranscriptTurnId) { + existing = await ctx.db + .query("messages") + .withIndex("by_voiceTranscriptTurnId", (q) => + q.eq("voiceTranscriptTurnId", input.voiceTranscriptTurnId) + ) + .unique() + } + + const now = input.updatedAt ?? Date.now() + const patch = { + conversationId: input.conversationId, + contactId: input.contactId, + direction: input.direction || existing?.direction || "system", + channel: input.channel || existing?.channel || "unknown", + source: input.source || existing?.source, + messageType: input.messageType || existing?.messageType, + body: String(input.body || existing?.body || "").trim(), + status: input.status || existing?.status, + sentAt: input.sentAt ?? existing?.sentAt ?? now, + ghlMessageId: input.ghlMessageId || existing?.ghlMessageId, + voiceTranscriptTurnId: + input.voiceTranscriptTurnId ?? existing?.voiceTranscriptTurnId, + voiceSessionId: input.voiceSessionId ?? existing?.voiceSessionId, + livekitRoomName: input.livekitRoomName || existing?.livekitRoomName, + metadata: input.metadata || existing?.metadata, + updatedAt: now, + } + + let message + if (existing) { + await ctx.db.patch(existing._id, patch) + message = await ctx.db.get(existing._id) + } else { + const id = await ctx.db.insert("messages", { + ...patch, + createdAt: input.createdAt ?? now, + }) + message = await ctx.db.get(id) + } + + await ctx.db.patch(input.conversationId, { + lastMessageAt: patch.sentAt, + lastMessagePreview: patch.body.slice(0, 240), + updatedAt: now, + }) + + return message +} + +export async function upsertCallArtifactRecord(ctx, input) { + let existing = null + + if (input.recordingId) { + existing = await ctx.db + .query("callArtifacts") + .withIndex("by_recordingId", (q) => q.eq("recordingId", input.recordingId)) + .unique() + } + + if (!existing && input.voiceSessionId) { + existing = await ctx.db + .query("callArtifacts") + .withIndex("by_voiceSessionId", (q) => + q.eq("voiceSessionId", input.voiceSessionId) + ) + .unique() + } + + if (!existing && input.ghlMessageId) { + existing = await ctx.db + .query("callArtifacts") + .withIndex("by_ghlMessageId", (q) => + q.eq("ghlMessageId", input.ghlMessageId) + ) + .unique() + } + + const now = input.updatedAt ?? Date.now() + const patch = { + conversationId: input.conversationId, + contactId: input.contactId ?? existing?.contactId, + source: input.source || existing?.source, + recordingId: input.recordingId || existing?.recordingId, + recordingUrl: input.recordingUrl || existing?.recordingUrl, + recordingStatus: input.recordingStatus || existing?.recordingStatus, + transcriptionText: input.transcriptionText ?? existing?.transcriptionText, + durationMs: input.durationMs ?? existing?.durationMs, + startedAt: input.startedAt ?? existing?.startedAt, + endedAt: input.endedAt ?? existing?.endedAt, + ghlMessageId: input.ghlMessageId || existing?.ghlMessageId, + voiceSessionId: input.voiceSessionId ?? existing?.voiceSessionId, + livekitRoomName: input.livekitRoomName || existing?.livekitRoomName, + metadata: input.metadata || existing?.metadata, + updatedAt: now, + } + + if (existing) { + await ctx.db.patch(existing._id, patch) + return await ctx.db.get(existing._id) + } + + const id = await ctx.db.insert("callArtifacts", { + ...patch, + createdAt: input.createdAt ?? now, + }) + return await ctx.db.get(id) +} + +export async function upsertExternalSyncState(ctx, input) { + const existing = await ctx.db + .query("externalSyncState") + .withIndex("by_provider_entityType_entityId", (q) => + q + .eq("provider", input.provider) + .eq("entityType", input.entityType) + .eq("entityId", input.entityId) + ) + .unique() + + const patch = { + cursor: input.cursor ?? existing?.cursor, + checksum: input.checksum ?? existing?.checksum, + status: input.status || existing?.status || "pending", + lastAttemptAt: input.lastAttemptAt ?? existing?.lastAttemptAt ?? Date.now(), + lastSyncedAt: input.lastSyncedAt ?? existing?.lastSyncedAt, + error: input.error ?? existing?.error, + metadata: input.metadata ?? existing?.metadata, + updatedAt: Date.now(), + } + + if (existing) { + await ctx.db.patch(existing._id, patch) + return await ctx.db.get(existing._id) + } + + const id = await ctx.db.insert("externalSyncState", { + provider: input.provider, + entityType: input.entityType, + entityId: input.entityId, + ...patch, + }) + return await ctx.db.get(id) +} diff --git a/convex/leads.ts b/convex/leads.ts index 7ad25695..e14c493d 100644 --- a/convex/leads.ts +++ b/convex/leads.ts @@ -1,23 +1,12 @@ // @ts-nocheck import { action, mutation } from "./_generated/server" import { v } from "convex/values" - -function normalizePhone(value?: string | null) { - const digits = String(value || "").replace(/\D/g, "") - if (!digits) { - return undefined - } - if (digits.length === 10) { - return `+1${digits}` - } - if (digits.length === 11 && digits.startsWith("1")) { - return `+${digits}` - } - if (digits.length >= 11) { - return `+${digits}` - } - return undefined -} +import { + ensureConversationParticipant, + upsertContactRecord, + upsertConversationRecord, + upsertMessageRecord, +} from "./crmModel" const leadSyncStatus = v.union( v.literal("pending"), @@ -136,49 +125,64 @@ export const createLead = mutation({ }, handler: async (ctx, args) => { const now = Date.now() - const normalizedPhone = normalizePhone(args.phone) - const leadId = await ctx.db.insert("leadSubmissions", { + const contact = await upsertContactRecord(ctx, { + firstName: args.firstName, + lastName: args.lastName, + email: args.email, + phone: args.phone, + company: args.company, + source: args.source, + status: args.status === "delivered" ? "active" : "lead", + lastActivityAt: now, + }) + + const conversation = await upsertConversationRecord(ctx, { + contactId: contact?._id, + title: + args.type === "requestMachine" + ? "Machine request" + : "Website contact", + channel: "chat", + source: args.source || "website", + status: args.status === "failed" ? "archived" : "open", + direction: "inbound", + startedAt: now, + lastMessageAt: now, + lastMessagePreview: args.message || args.intent, + summaryText: args.intent, + }) + + await ensureConversationParticipant(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + role: "contact", + displayName: `${args.firstName} ${args.lastName}`.trim(), + phone: args.phone, + email: args.email, + }) + + if (args.message || args.intent) { + await upsertMessageRecord(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + direction: "inbound", + channel: "chat", + source: args.source || "website", + messageType: args.type, + body: args.message || args.intent || "", + status: args.status, + sentAt: now, + }) + } + + return await ctx.db.insert("leadSubmissions", { ...args, - normalizedPhone, + contactId: contact?._id, + conversationId: conversation?._id, createdAt: now, updatedAt: now, deliveredAt: args.status === "delivered" ? now : undefined, }) - - if (normalizedPhone) { - const displayName = `${args.firstName} ${args.lastName}`.trim() - const existingProfile = await ctx.db - .query("contactProfiles") - .withIndex("by_normalizedPhone", (q) => - q.eq("normalizedPhone", normalizedPhone) - ) - .unique() - - const patch = { - displayName: displayName || existingProfile?.displayName, - firstName: args.firstName || existingProfile?.firstName, - lastName: args.lastName || existingProfile?.lastName, - email: args.email || existingProfile?.email, - company: args.company || existingProfile?.company, - lastIntent: args.intent || existingProfile?.lastIntent, - lastLeadOutcome: args.type, - lastSummaryText: args.message || existingProfile?.lastSummaryText, - source: args.source || existingProfile?.source, - updatedAt: now, - } - - if (existingProfile) { - await ctx.db.patch(existingProfile._id, patch) - } else { - await ctx.db.insert("contactProfiles", { - normalizedPhone, - ...patch, - createdAt: now, - }) - } - } - - return leadId }, }) @@ -230,7 +234,54 @@ export const ingestLead = mutation({ const fallbackName = splitName(args.name) const type = mapServiceToType(args.service) const now = Date.now() - const normalizedPhone = normalizePhone(args.phone) + const contact = await upsertContactRecord(ctx, { + firstName: args.firstName || fallbackName.firstName, + lastName: args.lastName || fallbackName.lastName, + email: args.email, + phone: args.phone, + company: args.company, + source: args.source, + status: "lead", + lastActivityAt: now, + }) + const conversation = await upsertConversationRecord(ctx, { + contactId: contact?._id, + title: type === "requestMachine" ? "Machine request" : "Website contact", + channel: "chat", + source: args.source || "website", + status: "open", + direction: "inbound", + startedAt: now, + lastMessageAt: now, + lastMessagePreview: args.message || args.intent, + summaryText: args.intent || args.service, + }) + + await ensureConversationParticipant(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + role: "contact", + displayName: `${args.firstName || fallbackName.firstName} ${args.lastName || fallbackName.lastName}`.trim(), + phone: args.phone, + email: args.email, + }) + + await upsertMessageRecord(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + direction: "inbound", + channel: "chat", + source: args.source || "website", + messageType: type, + body: args.message, + status: "pending", + sentAt: now, + metadata: JSON.stringify({ + intent: args.intent, + service: args.service, + }), + }) + const leadId = await ctx.db.insert("leadSubmissions", { type, status: "pending", @@ -239,7 +290,6 @@ export const ingestLead = mutation({ lastName: args.lastName || fallbackName.lastName, email: args.email, phone: args.phone, - normalizedPhone, company: args.company, intent: args.intent || args.service, message: args.message, @@ -254,47 +304,14 @@ export const ingestLead = mutation({ consentVersion: args.consentVersion, consentCapturedAt: args.consentCapturedAt, consentSourcePage: args.consentSourcePage, + contactId: contact?._id, + conversationId: conversation?._id, usesendStatus: "pending", ghlStatus: "pending", createdAt: now, updatedAt: now, }) - if (normalizedPhone) { - const displayName = `${args.firstName || fallbackName.firstName} ${args.lastName || fallbackName.lastName}`.trim() - const existingProfile = await ctx.db - .query("contactProfiles") - .withIndex("by_normalizedPhone", (q) => - q.eq("normalizedPhone", normalizedPhone) - ) - .unique() - - const patch = { - displayName: displayName || existingProfile?.displayName, - firstName: - args.firstName || fallbackName.firstName || existingProfile?.firstName, - lastName: - args.lastName || fallbackName.lastName || existingProfile?.lastName, - email: args.email || existingProfile?.email, - company: args.company || existingProfile?.company, - lastIntent: args.intent || args.service || existingProfile?.lastIntent, - lastLeadOutcome: type, - lastSummaryText: args.message || existingProfile?.lastSummaryText, - source: args.source || existingProfile?.source, - updatedAt: now, - } - - if (existingProfile) { - await ctx.db.patch(existingProfile._id, patch) - } else { - await ctx.db.insert("contactProfiles", { - normalizedPhone, - ...patch, - createdAt: now, - }) - } - } - return { inserted: true, leadId, @@ -332,6 +349,22 @@ export const updateLeadSyncStatus = mutation({ updatedAt: now, }) + if (lead.contactId) { + await ctx.db.patch(lead.contactId, { + status: status === "delivered" ? "active" : "lead", + lastActivityAt: now, + updatedAt: now, + }) + } + + if (lead.conversationId) { + await ctx.db.patch(lead.conversationId, { + status: status === "failed" ? "archived" : "open", + lastMessageAt: now, + updatedAt: now, + }) + } + return await ctx.db.get(args.leadId) }, }) diff --git a/convex/schema.ts b/convex/schema.ts index a9c4b46d..7feef130 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -155,7 +155,6 @@ export default defineSchema({ lastName: v.string(), email: v.string(), phone: v.string(), - normalizedPhone: v.optional(v.string()), company: v.optional(v.string()), intent: v.optional(v.string()), message: v.optional(v.string()), @@ -190,6 +189,8 @@ export default defineSchema({ v.literal("skipped") ) ), + contactId: v.optional(v.id("contacts")), + conversationId: v.optional(v.id("conversations")), error: v.optional(v.string()), deliveredAt: v.optional(v.number()), createdAt: v.number(), @@ -198,34 +199,7 @@ export default defineSchema({ .index("by_type", ["type"]) .index("by_status", ["status"]) .index("by_createdAt", ["createdAt"]) - .index("by_idempotencyKey", ["idempotencyKey"]) - .index("by_normalizedPhone", ["normalizedPhone"]), - - contactProfiles: defineTable({ - normalizedPhone: v.string(), - displayName: v.optional(v.string()), - firstName: v.optional(v.string()), - lastName: v.optional(v.string()), - email: v.optional(v.string()), - company: v.optional(v.string()), - lastIntent: v.optional(v.string()), - lastLeadOutcome: v.optional( - v.union( - v.literal("none"), - v.literal("contact"), - v.literal("requestMachine") - ) - ), - lastSummaryText: v.optional(v.string()), - lastCallAt: v.optional(v.number()), - lastReminderAt: v.optional(v.number()), - reminderNotes: v.optional(v.string()), - source: v.optional(v.string()), - createdAt: v.number(), - updatedAt: v.number(), - }) - .index("by_normalizedPhone", ["normalizedPhone"]) - .index("by_updatedAt", ["updatedAt"]), + .index("by_idempotencyKey", ["idempotencyKey"]), adminUsers: defineTable({ email: v.string(), @@ -271,17 +245,191 @@ export default defineSchema({ .index("by_kind", ["kind"]) .index("by_status", ["status"]), + contacts: defineTable({ + firstName: v.string(), + lastName: v.string(), + email: v.optional(v.string()), + normalizedEmail: v.optional(v.string()), + phone: v.optional(v.string()), + normalizedPhone: v.optional(v.string()), + company: v.optional(v.string()), + tags: v.optional(v.array(v.string())), + status: v.optional( + v.union( + v.literal("active"), + v.literal("lead"), + v.literal("customer"), + v.literal("inactive") + ) + ), + source: v.optional(v.string()), + notes: v.optional(v.string()), + ghlContactId: v.optional(v.string()), + livekitIdentity: v.optional(v.string()), + lastActivityAt: v.optional(v.number()), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_normalizedEmail", ["normalizedEmail"]) + .index("by_normalizedPhone", ["normalizedPhone"]) + .index("by_ghlContactId", ["ghlContactId"]) + .index("by_lastActivityAt", ["lastActivityAt"]) + .index("by_updatedAt", ["updatedAt"]), + + conversations: defineTable({ + contactId: v.optional(v.id("contacts")), + title: v.optional(v.string()), + channel: v.union( + v.literal("call"), + v.literal("sms"), + v.literal("chat"), + v.literal("unknown") + ), + source: v.optional(v.string()), + status: v.optional( + v.union(v.literal("open"), v.literal("closed"), v.literal("archived")) + ), + direction: v.optional( + v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed")) + ), + startedAt: v.number(), + endedAt: v.optional(v.number()), + lastMessageAt: v.optional(v.number()), + lastMessagePreview: v.optional(v.string()), + unreadCount: v.optional(v.number()), + summaryText: v.optional(v.string()), + ghlConversationId: v.optional(v.string()), + livekitRoomName: v.optional(v.string()), + voiceSessionId: v.optional(v.id("voiceSessions")), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_contactId", ["contactId"]) + .index("by_channel", ["channel"]) + .index("by_status", ["status"]) + .index("by_ghlConversationId", ["ghlConversationId"]) + .index("by_livekitRoomName", ["livekitRoomName"]) + .index("by_voiceSessionId", ["voiceSessionId"]) + .index("by_lastMessageAt", ["lastMessageAt"]), + + conversationParticipants: defineTable({ + conversationId: v.id("conversations"), + contactId: v.optional(v.id("contacts")), + role: v.optional( + v.union( + v.literal("contact"), + v.literal("agent"), + v.literal("system"), + v.literal("unknown") + ) + ), + displayName: v.optional(v.string()), + phone: v.optional(v.string()), + normalizedPhone: v.optional(v.string()), + email: v.optional(v.string()), + normalizedEmail: v.optional(v.string()), + externalContactId: v.optional(v.string()), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_conversationId", ["conversationId"]) + .index("by_contactId", ["contactId"]) + .index("by_externalContactId", ["externalContactId"]), + + messages: defineTable({ + conversationId: v.id("conversations"), + contactId: v.optional(v.id("contacts")), + direction: v.optional( + v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system")) + ), + channel: v.union( + v.literal("call"), + v.literal("sms"), + v.literal("chat"), + v.literal("unknown") + ), + source: v.optional(v.string()), + messageType: v.optional(v.string()), + body: v.string(), + status: v.optional(v.string()), + sentAt: v.number(), + ghlMessageId: v.optional(v.string()), + voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")), + voiceSessionId: v.optional(v.id("voiceSessions")), + livekitRoomName: v.optional(v.string()), + metadata: v.optional(v.string()), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_conversationId", ["conversationId"]) + .index("by_contactId", ["contactId"]) + .index("by_ghlMessageId", ["ghlMessageId"]) + .index("by_voiceTranscriptTurnId", ["voiceTranscriptTurnId"]) + .index("by_sentAt", ["sentAt"]), + + callArtifacts: defineTable({ + conversationId: v.id("conversations"), + contactId: v.optional(v.id("contacts")), + source: v.optional(v.string()), + recordingId: v.optional(v.string()), + recordingUrl: v.optional(v.string()), + recordingStatus: v.optional( + v.union( + v.literal("pending"), + v.literal("starting"), + v.literal("recording"), + v.literal("completed"), + v.literal("failed") + ) + ), + transcriptionText: v.optional(v.string()), + durationMs: v.optional(v.number()), + startedAt: v.optional(v.number()), + endedAt: v.optional(v.number()), + ghlMessageId: v.optional(v.string()), + voiceSessionId: v.optional(v.id("voiceSessions")), + livekitRoomName: v.optional(v.string()), + metadata: v.optional(v.string()), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_conversationId", ["conversationId"]) + .index("by_contactId", ["contactId"]) + .index("by_recordingId", ["recordingId"]) + .index("by_voiceSessionId", ["voiceSessionId"]) + .index("by_ghlMessageId", ["ghlMessageId"]), + + externalSyncState: defineTable({ + provider: v.string(), + entityType: v.string(), + entityId: v.string(), + cursor: v.optional(v.string()), + checksum: v.optional(v.string()), + status: v.optional( + v.union( + v.literal("pending"), + v.literal("synced"), + v.literal("failed"), + v.literal("reconciled"), + v.literal("mismatch") + ) + ), + lastAttemptAt: v.optional(v.number()), + lastSyncedAt: v.optional(v.number()), + error: v.optional(v.string()), + metadata: v.optional(v.string()), + updatedAt: v.number(), + }) + .index("by_provider_entityType", ["provider", "entityType"]) + .index("by_provider_entityType_entityId", ["provider", "entityType", "entityId"]), + voiceSessions: defineTable({ roomName: v.string(), participantIdentity: v.string(), - callerPhone: v.optional(v.string()), siteUrl: v.optional(v.string()), pathname: v.optional(v.string()), pageUrl: v.optional(v.string()), source: v.optional(v.string()), - contactProfileId: v.optional(v.id("contactProfiles")), - contactDisplayName: v.optional(v.string()), - contactCompany: v.optional(v.string()), startedAt: v.number(), endedAt: v.optional(v.number()), callStatus: v.optional( @@ -310,28 +458,6 @@ export default defineSchema({ ), notificationSentAt: v.optional(v.number()), notificationError: v.optional(v.string()), - reminderStatus: v.optional( - v.union(v.literal("none"), v.literal("scheduled"), v.literal("sameDay")) - ), - reminderRequestedAt: v.optional(v.number()), - reminderStartAt: v.optional(v.number()), - reminderEndAt: v.optional(v.number()), - reminderCalendarEventId: v.optional(v.string()), - reminderCalendarHtmlLink: v.optional(v.string()), - reminderNote: v.optional(v.string()), - warmTransferStatus: v.optional( - v.union( - v.literal("none"), - v.literal("attempted"), - v.literal("connected"), - v.literal("failed"), - v.literal("fallback") - ) - ), - warmTransferTarget: v.optional(v.string()), - warmTransferAttemptedAt: v.optional(v.number()), - warmTransferConnectedAt: v.optional(v.number()), - warmTransferFailureReason: v.optional(v.string()), recordingDisclosureAt: v.optional(v.number()), recordingStatus: v.optional( v.union( @@ -346,12 +472,13 @@ export default defineSchema({ recordingUrl: v.optional(v.string()), recordingError: v.optional(v.string()), metadata: v.optional(v.string()), + contactId: v.optional(v.id("contacts")), + conversationId: v.optional(v.id("conversations")), createdAt: v.number(), updatedAt: v.number(), }) .index("by_roomName", ["roomName"]) .index("by_participantIdentity", ["participantIdentity"]) - .index("by_callerPhone", ["callerPhone"]) .index("by_source", ["source"]) .index("by_source_startedAt", ["source", "startedAt"]) .index("by_startedAt", ["startedAt"]), diff --git a/convex/voiceSessions.ts b/convex/voiceSessions.ts index a8417d61..67b96116 100644 --- a/convex/voiceSessions.ts +++ b/convex/voiceSessions.ts @@ -1,6 +1,68 @@ // @ts-nocheck import { mutation, query } from "./_generated/server" import { v } from "convex/values" +import { + ensureConversationParticipant, + upsertCallArtifactRecord, + upsertContactRecord, + upsertConversationRecord, + upsertMessageRecord, +} from "./crmModel" + +async function syncPhoneConversation(ctx, session, overrides = {}) { + const contact = await upsertContactRecord(ctx, { + firstName: "Phone", + lastName: "Caller", + phone: session.participantIdentity, + livekitIdentity: session.participantIdentity, + source: "phone-agent", + status: "lead", + lastActivityAt: + overrides.lastActivityAt ?? session.updatedAt ?? session.startedAt ?? Date.now(), + }) + + const conversation = await upsertConversationRecord(ctx, { + contactId: contact?._id, + title: `Phone call ${session.roomName}`, + channel: "call", + source: "phone-agent", + status: + session.callStatus === "completed" + ? "closed" + : session.callStatus === "failed" + ? "archived" + : "open", + direction: "inbound", + startedAt: session.startedAt, + endedAt: session.endedAt, + lastMessageAt: overrides.lastActivityAt ?? session.updatedAt ?? session.startedAt, + lastMessagePreview: + overrides.lastMessagePreview ?? session.summaryText ?? session.handoffReason, + summaryText: session.summaryText, + livekitRoomName: session.roomName, + voiceSessionId: session._id, + }) + + await ensureConversationParticipant(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + role: "contact", + displayName: contact ? `${contact.firstName} ${contact.lastName}`.trim() : "Phone caller", + phone: contact?.phone || session.participantIdentity, + email: contact?.email, + }) + + await ctx.db.patch(session._id, { + contactId: contact?._id, + conversationId: conversation?._id, + updatedAt: Date.now(), + }) + + return { + contact, + conversation, + } +} export const getByRoom = query({ args: { @@ -61,15 +123,11 @@ export const createSession = mutation({ args: { roomName: v.string(), participantIdentity: v.string(), - callerPhone: v.optional(v.string()), siteUrl: v.optional(v.string()), pathname: v.optional(v.string()), pageUrl: v.optional(v.string()), source: v.optional(v.string()), metadata: v.optional(v.string()), - contactProfileId: v.optional(v.id("contactProfiles")), - contactDisplayName: v.optional(v.string()), - contactCompany: v.optional(v.string()), startedAt: v.optional(v.number()), recordingDisclosureAt: v.optional(v.number()), callStatus: v.optional( @@ -87,7 +145,7 @@ export const createSession = mutation({ }, handler: async (ctx, args) => { const now = args.startedAt ?? Date.now() - return await ctx.db.insert("voiceSessions", { + const id = await ctx.db.insert("voiceSessions", { ...args, startedAt: now, callStatus: args.callStatus, @@ -95,11 +153,16 @@ export const createSession = mutation({ leadOutcome: "none", handoffRequested: false, notificationStatus: "pending", - reminderStatus: "none", - warmTransferStatus: "none", createdAt: now, updatedAt: now, }) + + const session = await ctx.db.get(id) + if (session) { + await syncPhoneConversation(ctx, session) + } + + return id }, }) @@ -107,15 +170,11 @@ export const upsertPhoneCallSession = mutation({ args: { roomName: v.string(), participantIdentity: v.string(), - callerPhone: v.optional(v.string()), siteUrl: v.optional(v.string()), pathname: v.optional(v.string()), pageUrl: v.optional(v.string()), source: v.optional(v.string()), metadata: v.optional(v.string()), - contactProfileId: v.optional(v.id("contactProfiles")), - contactDisplayName: v.optional(v.string()), - contactCompany: v.optional(v.string()), startedAt: v.optional(v.number()), recordingDisclosureAt: v.optional(v.number()), recordingStatus: v.optional( @@ -138,41 +197,34 @@ export const upsertPhoneCallSession = mutation({ if (existing) { await ctx.db.patch(existing._id, { participantIdentity: args.participantIdentity, - callerPhone: args.callerPhone || existing.callerPhone, siteUrl: args.siteUrl, pathname: args.pathname, pageUrl: args.pageUrl, source: args.source, metadata: args.metadata, - contactProfileId: args.contactProfileId || existing.contactProfileId, - contactDisplayName: - args.contactDisplayName || existing.contactDisplayName, - contactCompany: args.contactCompany || existing.contactCompany, startedAt: existing.startedAt || now, recordingDisclosureAt: args.recordingDisclosureAt ?? existing.recordingDisclosureAt, recordingStatus: args.recordingStatus ?? existing.recordingStatus, callStatus: existing.callStatus || "started", notificationStatus: existing.notificationStatus || "pending", - reminderStatus: existing.reminderStatus || "none", - warmTransferStatus: existing.warmTransferStatus || "none", updatedAt: Date.now(), }) - return await ctx.db.get(existing._id) + const updated = await ctx.db.get(existing._id) + if (updated) { + await syncPhoneConversation(ctx, updated) + } + return updated } const id = await ctx.db.insert("voiceSessions", { roomName: args.roomName, participantIdentity: args.participantIdentity, - callerPhone: args.callerPhone, siteUrl: args.siteUrl, pathname: args.pathname, pageUrl: args.pageUrl, source: args.source, metadata: args.metadata, - contactProfileId: args.contactProfileId, - contactDisplayName: args.contactDisplayName, - contactCompany: args.contactCompany, startedAt: now, recordingDisclosureAt: args.recordingDisclosureAt, recordingStatus: args.recordingStatus, @@ -181,13 +233,15 @@ export const upsertPhoneCallSession = mutation({ leadOutcome: "none", handoffRequested: false, notificationStatus: "pending", - reminderStatus: "none", - warmTransferStatus: "none", createdAt: now, updatedAt: now, }) - return await ctx.db.get(id) + const session = await ctx.db.get(id) + if (session) { + await syncPhoneConversation(ctx, session) + } + return session }, }) @@ -226,6 +280,33 @@ export const addTranscriptTurn = mutation({ : session.agentAnsweredAt, updatedAt: Date.now(), }) + + const { contact, conversation } = await syncPhoneConversation(ctx, { + ...session, + updatedAt: createdAt, + }, { + lastActivityAt: createdAt, + lastMessagePreview: args.text, + }) + + await upsertMessageRecord(ctx, { + conversationId: conversation._id, + contactId: args.role === "user" ? contact?._id : undefined, + direction: + args.role === "user" + ? "inbound" + : args.role === "assistant" + ? "outbound" + : "system", + channel: "call", + source: args.source || "phone-agent", + messageType: args.kind || "transcript", + body: args.text, + sentAt: createdAt, + voiceTranscriptTurnId: turnId, + voiceSessionId: args.sessionId, + livekitRoomName: args.roomName, + }) } return turnId @@ -236,9 +317,6 @@ export const linkPhoneCallLead = mutation({ args: { sessionId: v.id("voiceSessions"), linkedLeadId: v.optional(v.string()), - contactProfileId: v.optional(v.id("contactProfiles")), - contactDisplayName: v.optional(v.string()), - contactCompany: v.optional(v.string()), leadOutcome: v.optional( v.union( v.literal("none"), @@ -248,65 +326,30 @@ export const linkPhoneCallLead = mutation({ ), handoffRequested: v.optional(v.boolean()), handoffReason: v.optional(v.string()), - reminderStatus: v.optional( - v.union(v.literal("none"), v.literal("scheduled"), v.literal("sameDay")) - ), - reminderRequestedAt: v.optional(v.number()), - reminderStartAt: v.optional(v.number()), - reminderEndAt: v.optional(v.number()), - reminderCalendarEventId: v.optional(v.string()), - reminderCalendarHtmlLink: v.optional(v.string()), - reminderNote: v.optional(v.string()), - warmTransferStatus: v.optional( - v.union( - v.literal("none"), - v.literal("attempted"), - v.literal("connected"), - v.literal("failed"), - v.literal("fallback") - ) - ), - warmTransferTarget: v.optional(v.string()), - warmTransferAttemptedAt: v.optional(v.number()), - warmTransferConnectedAt: v.optional(v.number()), - warmTransferFailureReason: v.optional(v.string()), }, handler: async (ctx, args) => { - const patch: Record = { - updatedAt: Date.now(), - } - - const optionalEntries = { + await ctx.db.patch(args.sessionId, { linkedLeadId: args.linkedLeadId, - contactProfileId: args.contactProfileId, - contactDisplayName: args.contactDisplayName, - contactCompany: args.contactCompany, leadOutcome: args.leadOutcome, handoffRequested: args.handoffRequested, handoffReason: args.handoffReason, - reminderStatus: args.reminderStatus, - reminderRequestedAt: args.reminderRequestedAt, - reminderStartAt: args.reminderStartAt, - reminderEndAt: args.reminderEndAt, - reminderCalendarEventId: args.reminderCalendarEventId, - reminderCalendarHtmlLink: args.reminderCalendarHtmlLink, - reminderNote: args.reminderNote, - warmTransferStatus: args.warmTransferStatus, - warmTransferTarget: args.warmTransferTarget, - warmTransferAttemptedAt: args.warmTransferAttemptedAt, - warmTransferConnectedAt: args.warmTransferConnectedAt, - warmTransferFailureReason: args.warmTransferFailureReason, - } - - for (const [key, value] of Object.entries(optionalEntries)) { - if (value !== undefined) { - patch[key] = value + updatedAt: Date.now(), + }) + const session = await ctx.db.get(args.sessionId) + if (session) { + const { conversation } = await syncPhoneConversation(ctx, session) + if (args.linkedLeadId || args.leadOutcome || args.handoffReason) { + await ctx.db.patch(conversation._id, { + summaryText: + session.summaryText || + args.handoffReason || + conversation.summaryText, + updatedAt: Date.now(), + }) } } - await ctx.db.patch(args.sessionId, patch) - - return await ctx.db.get(args.sessionId) + return session }, }) @@ -334,7 +377,21 @@ export const updateRecording = mutation({ recordingError: args.recordingError, updatedAt: Date.now(), }) - return await ctx.db.get(args.sessionId) + const session = await ctx.db.get(args.sessionId) + if (session) { + const { contact, conversation } = await syncPhoneConversation(ctx, session) + await upsertCallArtifactRecord(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + source: "phone-agent", + recordingId: args.recordingId, + recordingUrl: args.recordingUrl, + recordingStatus: args.recordingStatus, + voiceSessionId: session._id, + livekitRoomName: session.roomName, + }) + } + return session }, }) @@ -384,7 +441,31 @@ export const completeSession = mutation({ notificationError: args.notificationError, updatedAt: endedAt, }) - return await ctx.db.get(args.sessionId) + const session = await ctx.db.get(args.sessionId) + if (session) { + const { contact, conversation } = await syncPhoneConversation(ctx, session, { + lastActivityAt: endedAt, + lastMessagePreview: args.summaryText || session.summaryText, + }) + await upsertCallArtifactRecord(ctx, { + conversationId: conversation._id, + contactId: contact?._id, + source: "phone-agent", + recordingId: args.recordingId, + recordingUrl: args.recordingUrl, + recordingStatus: args.recordingStatus, + transcriptionText: args.summaryText, + durationMs: + typeof session.startedAt === "number" + ? Math.max(0, endedAt - session.startedAt) + : undefined, + startedAt: session.startedAt, + endedAt, + voiceSessionId: session._id, + livekitRoomName: session.roomName, + }) + } + return session }, }) @@ -398,13 +479,9 @@ function normalizePhoneCallForAdmin(session: any) { id: session._id, roomName: session.roomName, participantIdentity: session.participantIdentity, - callerPhone: session.callerPhone, pathname: session.pathname, pageUrl: session.pageUrl, source: session.source, - contactProfileId: session.contactProfileId, - contactDisplayName: session.contactDisplayName, - contactCompany: session.contactCompany, startedAt: session.startedAt, endedAt: session.endedAt, durationMs, @@ -420,91 +497,12 @@ function normalizePhoneCallForAdmin(session: any) { notificationStatus: session.notificationStatus || "pending", notificationSentAt: session.notificationSentAt, notificationError: session.notificationError, - reminderStatus: session.reminderStatus || "none", - reminderRequestedAt: session.reminderRequestedAt, - reminderStartAt: session.reminderStartAt, - reminderEndAt: session.reminderEndAt, - reminderCalendarEventId: session.reminderCalendarEventId, - reminderCalendarHtmlLink: session.reminderCalendarHtmlLink, - reminderNote: session.reminderNote, - warmTransferStatus: session.warmTransferStatus || "none", - warmTransferTarget: session.warmTransferTarget, - warmTransferAttemptedAt: session.warmTransferAttemptedAt, - warmTransferConnectedAt: session.warmTransferConnectedAt, - warmTransferFailureReason: session.warmTransferFailureReason, recordingStatus: session.recordingStatus, recordingUrl: session.recordingUrl, recordingError: session.recordingError, } } -export const getPhoneAgentContextByPhone = query({ - args: { - normalizedPhone: v.string(), - }, - handler: async (ctx, args) => { - const contactProfile = await ctx.db - .query("contactProfiles") - .withIndex("by_normalizedPhone", (q) => - q.eq("normalizedPhone", args.normalizedPhone) - ) - .unique() - - const recentSessions = await ctx.db - .query("voiceSessions") - .withIndex("by_callerPhone", (q) => q.eq("callerPhone", args.normalizedPhone)) - .collect() - - recentSessions.sort((a, b) => (b.startedAt || 0) - (a.startedAt || 0)) - const recentSession = recentSessions[0] || null - - const recentLead = await ctx.db - .query("leadSubmissions") - .withIndex("by_normalizedPhone", (q) => - q.eq("normalizedPhone", args.normalizedPhone) - ) - .collect() - - recentLead.sort((a, b) => (b.createdAt || 0) - (a.createdAt || 0)) - const latestLead = recentLead[0] || null - - return { - contactProfile: contactProfile - ? { - id: contactProfile._id, - normalizedPhone: contactProfile.normalizedPhone, - displayName: contactProfile.displayName, - firstName: contactProfile.firstName, - lastName: contactProfile.lastName, - email: contactProfile.email, - company: contactProfile.company, - lastIntent: contactProfile.lastIntent, - lastLeadOutcome: contactProfile.lastLeadOutcome, - lastSummaryText: contactProfile.lastSummaryText, - lastCallAt: contactProfile.lastCallAt, - lastReminderAt: contactProfile.lastReminderAt, - reminderNotes: contactProfile.reminderNotes, - } - : null, - recentSession: recentSession ? normalizePhoneCallForAdmin(recentSession) : null, - recentLead: latestLead - ? { - id: latestLead._id, - type: latestLead.type, - firstName: latestLead.firstName, - lastName: latestLead.lastName, - email: latestLead.email, - phone: latestLead.phone, - company: latestLead.company, - intent: latestLead.intent, - message: latestLead.message, - createdAt: latestLead.createdAt, - } - : null, - } - }, -}) - export const listAdminPhoneCalls = query({ args: { search: v.optional(v.string()), @@ -538,15 +536,10 @@ export const listAdminPhoneCalls = query({ const haystack = [ session.roomName, session.participantIdentity, - session.callerPhone, - session.contactDisplayName, - session.contactCompany, session.pathname, session.linkedLeadId, session.summaryText, session.handoffReason, - session.reminderNote, - session.warmTransferFailureReason, ] .map((value) => String(value || "").toLowerCase()) .join("\n") @@ -619,9 +612,6 @@ export const getAdminPhoneCallDetail = query({ createdAt: linkedLead.createdAt, } : null, - contactProfile: session.contactProfileId - ? await ctx.db.get(session.contactProfileId) - : null, turns: turns.map((turn) => ({ id: turn._id, role: turn.role, diff --git a/lib/server/admin-auth.ts b/lib/server/admin-auth.ts index 025badb3..56cf4edd 100644 --- a/lib/server/admin-auth.ts +++ b/lib/server/admin-auth.ts @@ -1,4 +1,12 @@ +import { createHash, randomBytes } from "node:crypto" +import { cookies } from "next/headers" +import { fetchMutation, fetchQuery } from "convex/nextjs" import { NextResponse } from "next/server" +import { api } from "@/convex/_generated/api" +import { hasConvexUrl } from "@/lib/convex-config" + +export const ADMIN_SESSION_COOKIE = "rmv_admin_session" +const ADMIN_SESSION_TTL_MS = 1000 * 60 * 60 * 24 * 7 function getProvidedToken(request: Request) { const authHeader = request.headers.get("authorization") || "" @@ -30,3 +38,100 @@ export function requireAdminToken(request: Request) { export function isAdminUiEnabled() { return process.env.ADMIN_UI_ENABLED === "true" } + +export function getConfiguredAdminEmail() { + return String(process.env.ADMIN_EMAIL || "") + .trim() + .toLowerCase() +} + +function getConfiguredAdminPassword() { + return String(process.env.ADMIN_PASSWORD || "") +} + +function hashAdminSessionToken(token: string) { + return createHash("sha256").update(token).digest("hex") +} + +export function isAdminCredentialLoginConfigured() { + return Boolean( + isAdminUiEnabled() && + hasConvexUrl() && + getConfiguredAdminEmail() && + getConfiguredAdminPassword() + ) +} + +export function isAdminCredentialMatch(email: string, password: string) { + return ( + email.trim().toLowerCase() === getConfiguredAdminEmail() && + password === getConfiguredAdminPassword() + ) +} + +export async function createAdminSession(email: string) { + if (!hasConvexUrl()) { + throw new Error("Convex is not configured for admin sessions.") + } + + const normalizedEmail = email.trim().toLowerCase() + const rawToken = randomBytes(32).toString("hex") + const tokenHash = hashAdminSessionToken(rawToken) + const expiresAt = Date.now() + ADMIN_SESSION_TTL_MS + + await fetchMutation(api.admin.ensureAdminUser, { + email: normalizedEmail, + name: normalizedEmail.split("@")[0], + }) + + await fetchMutation(api.admin.createSession, { + email: normalizedEmail, + tokenHash, + expiresAt, + }) + + return { + token: rawToken, + expiresAt, + } +} + +export async function destroyAdminSession(rawToken?: string | null) { + if (!rawToken || !hasConvexUrl()) { + return + } + + try { + await fetchMutation(api.admin.destroySession, { + tokenHash: hashAdminSessionToken(rawToken), + }) + } catch (error) { + console.error("Failed to destroy admin session:", error) + } +} + +export async function validateAdminSession(rawToken?: string | null) { + if (!rawToken || !hasConvexUrl()) { + return null + } + + try { + return await fetchQuery(api.admin.validateSession, { + tokenHash: hashAdminSessionToken(rawToken), + }) + } catch (error) { + console.error("Failed to validate admin session:", error) + return null + } +} + +export async function getAdminUserFromCookies() { + if (!isAdminUiEnabled()) { + return null + } + + const cookieStore = await cookies() + const rawToken = cookieStore.get(ADMIN_SESSION_COOKIE)?.value + const session = await validateAdminSession(rawToken) + return session?.user || null +} diff --git a/lib/server/ghl-sync.ts b/lib/server/ghl-sync.ts new file mode 100644 index 00000000..4bd691cf --- /dev/null +++ b/lib/server/ghl-sync.ts @@ -0,0 +1,130 @@ +type GhlSyncEnv = { + token: string + locationId: string + baseUrl: string +} + +function normalizeBaseUrl(value?: string) { + return (value || "https://services.leadconnectorhq.com").replace(/\/+$/, "") +} + +export function getGhlSyncEnv(): GhlSyncEnv { + const token = String( + process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || "" + ).trim() + const locationId = String(process.env.GHL_LOCATION_ID || "").trim() + const baseUrl = normalizeBaseUrl(process.env.GHL_API_BASE_URL) + + if (!token || !locationId) { + throw new Error("GHL token or location ID is not configured.") + } + + return { token, locationId, baseUrl } +} + +async function fetchGhlJson(pathname: string, init?: RequestInit) { + const env = getGhlSyncEnv() + const response = await fetch(`${env.baseUrl}${pathname}`, { + ...init, + headers: { + Authorization: `Bearer ${env.token}`, + Version: process.env.GHL_API_VERSION || "2021-07-28", + Accept: "application/json", + "Content-Type": "application/json", + ...(init?.headers || {}), + }, + cache: "no-store", + }) + + const text = await response.text() + let body: any = null + if (text) { + try { + body = JSON.parse(text) + } catch { + body = null + } + } + if (!response.ok) { + throw new Error( + `GHL request failed (${response.status}) for ${pathname}: ${body?.message || text || "Unknown error"}` + ) + } + + return body +} + +export async function fetchGhlContacts(args?: { + limit?: number + cursor?: string +}) { + const env = getGhlSyncEnv() + const searchParams = new URLSearchParams({ + locationId: env.locationId, + limit: String(Math.min(100, Math.max(1, args?.limit || 100))), + }) + if (args?.cursor) { + searchParams.set("startAfterId", args.cursor) + } + + const payload = await fetchGhlJson(`/contacts/?${searchParams.toString()}`) + const contacts = Array.isArray(payload?.contacts) + ? payload.contacts + : Array.isArray(payload?.data?.contacts) + ? payload.data.contacts + : [] + + const nextCursor = + contacts.length > 0 ? String(contacts[contacts.length - 1]?.id || "") : "" + + return { + items: contacts, + nextCursor: nextCursor || undefined, + } +} + +export async function fetchGhlMessages(args?: { + limit?: number + cursor?: string + channel?: "Call" | "SMS" +}) { + const env = getGhlSyncEnv() + const url = new URL(`${env.baseUrl}/conversations/messages/export`) + url.searchParams.set("locationId", env.locationId) + url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100)))) + url.searchParams.set("channel", args?.channel || "SMS") + if (args?.cursor) { + url.searchParams.set("cursor", args.cursor) + } + + const payload = await fetchGhlJson(url.pathname + url.search) + return { + items: Array.isArray(payload?.messages) ? payload.messages : [], + nextCursor: + typeof payload?.nextCursor === "string" && payload.nextCursor + ? payload.nextCursor + : undefined, + } +} + +export async function fetchGhlCallLogs(args?: { + page?: number + pageSize?: number +}) { + const env = getGhlSyncEnv() + const url = new URL(`${env.baseUrl}/voice-ai/dashboard/call-logs`) + url.searchParams.set("locationId", env.locationId) + url.searchParams.set("page", String(Math.max(1, args?.page || 1))) + url.searchParams.set( + "pageSize", + String(Math.min(50, Math.max(1, args?.pageSize || 50))) + ) + + const payload = await fetchGhlJson(url.pathname + url.search) + return { + items: Array.isArray(payload?.callLogs) ? payload.callLogs : [], + page: Number(payload?.page || args?.page || 1), + total: Number(payload?.total || 0), + pageSize: Number(payload?.pageSize || args?.pageSize || 50), + } +}