From 133ed6d6f3b6e83f01f2972dda743988ae8f60e1 Mon Sep 17 00:00:00 2001 From: DMleadgen Date: Thu, 16 Apr 2026 11:40:19 -0600 Subject: [PATCH] feat: add GHL CRM sync status and runner --- app/admin/contacts/[id]/page.tsx | 10 +- app/admin/contacts/page.tsx | 47 ++- app/admin/conversations/[id]/page.tsx | 12 +- app/admin/conversations/page.tsx | 50 ++- app/admin/page.tsx | 43 ++- app/api/admin/ghl/sync/route.ts | 39 ++ app/api/internal/ghl/sync/run/route.ts | 39 ++ app/sign-in/[[...sign-in]]/page.tsx | 6 +- convex/crm.ts | 508 ++++++++++++++++++++++++- convex/crons.ts | 7 + convex/ghlMirror.ts | 160 ++++++++ convex/schema.ts | 2 + 12 files changed, 892 insertions(+), 31 deletions(-) create mode 100644 app/api/admin/ghl/sync/route.ts create mode 100644 app/api/internal/ghl/sync/run/route.ts create mode 100644 convex/ghlMirror.ts diff --git a/app/admin/contacts/[id]/page.tsx b/app/admin/contacts/[id]/page.tsx index 4a55bd09..274b395b 100644 --- a/app/admin/contacts/[id]/page.tsx +++ b/app/admin/contacts/[id]/page.tsx @@ -57,7 +57,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) { {detail.contact.firstName} {detail.contact.lastName}

- Unified CRM record across forms, calls, SMS, and sync imports. + Contact details and activity history.

@@ -68,9 +68,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) { Contact Profile - - Backend-owned identity and sync metadata. - + Basic details and connected records.
@@ -127,7 +125,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) { Conversations - Every mirrored conversation associated to this contact. + Conversations linked to this contact. @@ -197,5 +195,5 @@ export default async function AdminContactDetailPage({ params }: PageProps) { export const metadata = { title: "Contact Detail | Admin", - description: "Review a Rocky CRM contact and full interaction timeline", + description: "Review a contact and full interaction timeline", } diff --git a/app/admin/contacts/page.tsx b/app/admin/contacts/page.tsx index eb60ec58..27c3a1ef 100644 --- a/app/admin/contacts/page.tsx +++ b/app/admin/contacts/page.tsx @@ -34,6 +34,22 @@ function formatTimestamp(value?: number) { }) } +function getSyncMessage(sync: any) { + if (!sync.ghlConfigured) { + return "Connect GHL to load contacts and conversations." + } + if (sync.stages.contacts.status === "running") { + return "Contacts are syncing now." + } + if (sync.stages.contacts.error) { + return "Contacts could not be loaded from GHL yet." + } + if (!sync.latestSyncAt) { + return "No contacts yet." + } + return "Your contact list stays up to date from forms, calls, and GHL." +} + export default async function AdminContactsPage({ searchParams }: PageProps) { const params = await searchParams const page = Math.max(1, Number.parseInt(params.page || "1", 10) || 1) @@ -54,8 +70,7 @@ export default async function AdminContactsPage({ searchParams }: PageProps) { Contacts

- Backend-owned CRM contacts mirrored from forms, phone calls, and - GHL sync. + All customer contacts in one place.

@@ -63,6 +78,28 @@ export default async function AdminContactsPage({ searchParams }: PageProps) { + + + Sync Status + {getSyncMessage(data.sync)} + + + {data.sync.overallStatus} + + Last sync: {formatTimestamp(data.sync.latestSyncAt || undefined)} + + {!data.sync.ghlConfigured ? ( + GHL is not connected. + ) : null} + {!data.sync.syncTokenConfigured ? ( + Manual sync endpoint is not configured yet. + ) : null} + {data.sync.stages.contacts.error ? ( + {data.sync.stages.contacts.error} + ) : null} + + + @@ -107,7 +144,9 @@ export default async function AdminContactsPage({ searchParams }: PageProps) { colSpan={7} className="py-8 text-center text-muted-foreground" > - No contacts matched this filter. + {search + ? "No contacts matched this search." + : getSyncMessage(data.sync)} ) : ( @@ -160,5 +199,5 @@ export default async function AdminContactsPage({ searchParams }: PageProps) { export const metadata = { title: "Contacts | Admin", - description: "View backend-owned Rocky contact records", + description: "View Rocky customer contacts", } diff --git a/app/admin/conversations/[id]/page.tsx b/app/admin/conversations/[id]/page.tsx index 200f7d42..0fb56d3e 100644 --- a/app/admin/conversations/[id]/page.tsx +++ b/app/admin/conversations/[id]/page.tsx @@ -69,7 +69,7 @@ export default async function AdminConversationDetailPage({ {detail.conversation.title || "Conversation Detail"}

- Unified thread for Rocky-owned conversation management. + Full conversation history in one place.

@@ -80,9 +80,7 @@ export default async function AdminConversationDetailPage({ Conversation Status
- - Channel, ownership, and sync metadata. - + Channel, contact, and latest activity.
@@ -206,9 +204,7 @@ export default async function AdminConversationDetailPage({ Messages - - Full backend-owned thread history for this conversation. - + Message history for this conversation. {detail.messages.length === 0 ? ( @@ -237,5 +233,5 @@ export default async function AdminConversationDetailPage({ export const metadata = { title: "Conversation Detail | Admin", - description: "Review a Rocky conversation thread, recordings, and leads", + description: "Review a conversation, recordings, and leads", } diff --git a/app/admin/conversations/page.tsx b/app/admin/conversations/page.tsx index 560ff8ad..48e1ed08 100644 --- a/app/admin/conversations/page.tsx +++ b/app/admin/conversations/page.tsx @@ -36,6 +36,22 @@ function formatTimestamp(value?: number) { }) } +function getSyncMessage(sync: any) { + if (!sync.ghlConfigured) { + return "Connect GHL to load contacts and conversations." + } + if (sync.stages.conversations.status === "running") { + return "Conversations are syncing now." + } + if (sync.stages.conversations.error) { + return "Conversations could not be loaded from GHL yet." + } + if (!sync.latestSyncAt) { + return "No conversations yet." + } + return "Calls and messages appear here as they are synced." +} + export default async function AdminConversationsPage({ searchParams, }: PageProps) { @@ -60,8 +76,7 @@ export default async function AdminConversationsPage({ Conversations

- Unified inbox across backend-owned call and SMS conversation - threads. + Customer conversations in one inbox.

@@ -69,6 +84,28 @@ export default async function AdminConversationsPage({ + + + Sync Status + {getSyncMessage(data.sync)} + + + {data.sync.overallStatus} + + Last sync: {formatTimestamp(data.sync.latestSyncAt || undefined)} + + {!data.sync.ghlConfigured ? ( + GHL is not connected. + ) : null} + {!data.sync.syncTokenConfigured ? ( + Manual sync endpoint is not configured yet. + ) : null} + {data.sync.stages.conversations.error ? ( + {data.sync.stages.conversations.error} + ) : null} + + + @@ -76,8 +113,7 @@ export default async function AdminConversationsPage({ Conversation Inbox - Search by contact, conversation preview, phone, email, or external - ID. + Search by contact, phone, email, or recent message. @@ -136,7 +172,9 @@ export default async function AdminConversationsPage({ colSpan={8} className="py-8 text-center text-muted-foreground" > - No conversations matched this filter. + {search || params.channel || params.status + ? "No conversations matched this search." + : getSyncMessage(data.sync)} ) : ( @@ -204,5 +242,5 @@ export default async function AdminConversationsPage({ export const metadata = { title: "Conversations | Admin", - description: "View backend-owned Rocky conversation threads", + description: "View Rocky customer conversations", } diff --git a/app/admin/page.tsx b/app/admin/page.tsx index 071adf45..49c54a9a 100644 --- a/app/admin/page.tsx +++ b/app/admin/page.tsx @@ -1,5 +1,7 @@ import Link from "next/link" +import { fetchQuery } from "convex/nextjs" import { Button } from "@/components/ui/button" +import { api } from "@/convex/_generated/api" import { Card, CardContent, @@ -58,10 +60,25 @@ async function getOrdersCount() { return mockAnalytics.totalOrders } +function formatTimestamp(value?: number | null) { + if (!value) { + return "—" + } + + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} + export default async function AdminDashboard() { - const [productsCount, ordersCount] = await Promise.all([ + const [productsCount, ordersCount, sync] = await Promise.all([ getProductsCount(), getOrdersCount(), + fetchQuery(api.crm.getAdminSyncOverview, {}), ]) const dashboardCards = [ @@ -194,7 +211,7 @@ export default async function AdminDashboard() { Admin Dashboard

- Overview of your store performance and management tools + Manage orders, contacts, conversations, and calls

@@ -226,6 +243,28 @@ export default async function AdminDashboard() {
+ + + CRM Sync Status + + {!sync.ghlConfigured + ? "Connect GHL to load contacts and conversations." + : "Customer data is mirrored here from GHL and your call flows."} + + + + {sync.overallStatus} + Last sync: {formatTimestamp(sync.latestSyncAt)} + {!sync.ghlConfigured ? GHL is not connected. : null} + {!sync.syncTokenConfigured ? ( + Manual sync endpoint is not configured yet. + ) : null} + {!sync.livekitConfigured ? ( + LiveKit recordings are not connected yet. + ) : null} + + + {/* Main Stats */}
{dashboardCards.map((card, index) => { diff --git a/app/api/admin/ghl/sync/route.ts b/app/api/admin/ghl/sync/route.ts new file mode 100644 index 00000000..923107f9 --- /dev/null +++ b/app/api/admin/ghl/sync/route.ts @@ -0,0 +1,39 @@ +import { NextResponse } from "next/server" +import { fetchAction } from "convex/nextjs" +import { api } from "@/convex/_generated/api" +import { requireAdminToken } from "@/lib/server/admin-auth" + +export async function POST(request: Request) { + const authError = requireAdminToken(request) + if (authError) { + return authError + } + + try { + const body = await request.json().catch(() => ({})) + const result = await fetchAction(api.crm.runGhlMirror, { + reason: "admin", + forceFullBackfill: Boolean(body.forceFullBackfill), + maxPagesPerRun: + typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined, + contactsLimit: + typeof body.contactsLimit === "number" ? body.contactsLimit : undefined, + messagesLimit: + typeof body.messagesLimit === "number" ? body.messagesLimit : undefined, + recordingsPageSize: + typeof body.recordingsPageSize === "number" + ? body.recordingsPageSize + : undefined, + }) + + return NextResponse.json(result) + } catch (error) { + console.error("Failed to run admin GHL sync:", error) + return NextResponse.json( + { + error: error instanceof Error ? error.message : "Failed to run GHL sync", + }, + { status: 500 } + ) + } +} diff --git a/app/api/internal/ghl/sync/run/route.ts b/app/api/internal/ghl/sync/run/route.ts new file mode 100644 index 00000000..ad6514e8 --- /dev/null +++ b/app/api/internal/ghl/sync/run/route.ts @@ -0,0 +1,39 @@ +import { NextResponse } from "next/server" +import { fetchAction } from "convex/nextjs" +import { api } from "@/convex/_generated/api" +import { requireGhlSyncAuth } from "@/app/api/internal/ghl/shared" + +export async function POST(request: Request) { + const authError = await requireGhlSyncAuth(request) + if (authError) { + return authError + } + + try { + const body = await request.json().catch(() => ({})) + const result = await fetchAction(api.crm.runGhlMirror, { + reason: body.reason ? String(body.reason) : "internal", + forceFullBackfill: Boolean(body.forceFullBackfill), + maxPagesPerRun: + typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined, + contactsLimit: + typeof body.contactsLimit === "number" ? body.contactsLimit : undefined, + messagesLimit: + typeof body.messagesLimit === "number" ? body.messagesLimit : undefined, + recordingsPageSize: + typeof body.recordingsPageSize === "number" + ? body.recordingsPageSize + : undefined, + }) + + return NextResponse.json(result) + } catch (error) { + console.error("Failed to run GHL sync:", error) + return NextResponse.json( + { + error: error instanceof Error ? error.message : "Failed to run GHL sync", + }, + { status: 500 } + ) + } +} diff --git a/app/sign-in/[[...sign-in]]/page.tsx b/app/sign-in/[[...sign-in]]/page.tsx index 53efc573..a31724fd 100644 --- a/app/sign-in/[[...sign-in]]/page.tsx +++ b/app/sign-in/[[...sign-in]]/page.tsx @@ -27,7 +27,7 @@ export default async function SignInPage({ searchParams }: PageProps) { params.error === "invalid" ? "That email or password was not accepted." : params.error === "config" - ? "Admin sign-in is not fully configured yet." + ? "Admin access is not available right now." : "" return ( @@ -58,7 +58,6 @@ export default async function SignInPage({ searchParams }: PageProps) { type="email" autoComplete="email" className="flex h-11 w-full rounded-md border border-input bg-background px-3 py-2 text-sm" - placeholder="matt@rockymountainvending.com" required />
@@ -88,8 +87,7 @@ export default async function SignInPage({ searchParams }: PageProps) { Admin sign-in is not configured

- Enable admin UI, Convex, and staging credentials before using - this area. + Admin access is not available right now.

)} diff --git a/convex/crm.ts b/convex/crm.ts index 1ed08c43..b16a59fa 100644 --- a/convex/crm.ts +++ b/convex/crm.ts @@ -1,6 +1,7 @@ // @ts-nocheck -import { mutation, query } from "./_generated/server" +import { action, mutation, query } from "./_generated/server" import { v } from "convex/values" +import { api } from "./_generated/api" import { ensureConversationParticipant, normalizeEmail, @@ -11,6 +12,128 @@ import { upsertExternalSyncState, upsertMessageRecord, } from "./crmModel" +import { + fetchGhlCallLogsPage, + fetchGhlContactsPage, + fetchGhlMessagesPage, + readGhlMirrorConfig, +} from "./ghlMirror" + +const GHL_SYNC_PROVIDER = "ghl" +const GHL_SYNC_STAGES = [ + "contacts", + "conversations", + "messages", + "recordings", + "reconcile", +] as const + +function safeJsonParse(value?: string) { + if (!value) { + return null + } + + try { + return JSON.parse(value) + } catch { + return null + } +} + +async function getSyncStateRecord(ctx, entityType) { + return await ctx.db + .query("externalSyncState") + .withIndex("by_provider_entityType_entityId", (q) => + q + .eq("provider", GHL_SYNC_PROVIDER) + .eq("entityType", entityType) + .eq("entityId", entityType) + ) + .unique() +} + +async function markSyncStage(ctx, args) { + return await upsertExternalSyncState(ctx, { + provider: GHL_SYNC_PROVIDER, + entityType: args.entityType, + entityId: args.entityType, + cursor: args.cursor, + checksum: args.checksum, + status: args.status, + error: args.error, + metadata: args.metadata, + lastAttemptAt: Date.now(), + lastSyncedAt: + args.status === "synced" || args.status === "reconciled" + ? Date.now() + : undefined, + }) +} + +function formatSyncStageSummary(state) { + const metadata = safeJsonParse(state?.metadata) + return { + status: state?.status || "pending", + lastAttemptAt: state?.lastAttemptAt || null, + lastSyncedAt: state?.lastSyncedAt || null, + error: state?.error || null, + cursor: state?.cursor || null, + metadata, + } +} + +async function buildAdminSyncOverview(ctx) { + const stages = { + contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")), + conversations: formatSyncStageSummary( + await getSyncStateRecord(ctx, "conversations") + ), + messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")), + recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")), + reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")), + } + + const ghlConfigured = Boolean( + String( + process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || "" + ).trim() && String(process.env.GHL_LOCATION_ID || "").trim() + ) + const syncTokenConfigured = Boolean( + String(process.env.GHL_SYNC_CRON_TOKEN || "").trim() + ) + const livekitConfigured = Boolean( + String(process.env.LIVEKIT_URL || "").trim() && + String(process.env.LIVEKIT_API_KEY || "").trim() && + String(process.env.LIVEKIT_API_SECRET || "").trim() + ) + + const latestSyncAt = Math.max( + ...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0), + 0 + ) + const hasFailures = Object.values(stages).some( + (stage: any) => + stage.status === "failed" || stage.status === "missing_config" + ) + const hasRunning = Object.values(stages).some( + (stage: any) => stage.status === "running" + ) + + return { + ghlConfigured, + syncTokenConfigured, + livekitConfigured, + latestSyncAt: latestSyncAt || null, + overallStatus: hasRunning + ? "running" + : hasFailures + ? "attention" + : latestSyncAt + ? "healthy" + : "idle", + stages, + } +} function matchesSearch(values: Array, search: string) { if (!search) { @@ -548,9 +671,11 @@ export const updateSyncCheckpoint = mutation({ checksum: v.optional(v.string()), status: v.optional( v.union( + v.literal("running"), v.literal("pending"), v.literal("synced"), v.literal("failed"), + v.literal("missing_config"), v.literal("reconciled"), v.literal("mismatch") ) @@ -629,6 +754,376 @@ export const reconcileExternalState = mutation({ }, }) +export const runGhlMirror = action({ + args: { + reason: v.optional(v.string()), + forceFullBackfill: v.optional(v.boolean()), + maxPagesPerRun: v.optional(v.number()), + contactsLimit: v.optional(v.number()), + messagesLimit: v.optional(v.number()), + recordingsPageSize: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const config = readGhlMirrorConfig() + const now = Date.now() + const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25)) + const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100)) + const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100)) + const recordingsPageSize = Math.min( + 50, + Math.max(1, args.recordingsPageSize || 50) + ) + + if (!config) { + for (const stage of GHL_SYNC_STAGES) { + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: stage, + entityId: stage, + status: "missing_config", + error: "GHL credentials are not configured.", + metadata: JSON.stringify({ + reason: args.reason || "cron", + checkedAt: now, + }), + }) + } + + return { + ok: false, + status: "missing_config", + message: "GHL credentials are not configured.", + } + } + + const summary = { + ok: true, + status: "synced", + reason: args.reason || "cron", + contacts: 0, + conversations: 0, + messages: 0, + recordings: 0, + mismatches: [] as string[], + } + + const updateRunning = async (entityType: string, metadata?: Record) => { + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType, + entityId: entityType, + status: "running", + error: undefined, + metadata: JSON.stringify({ + ...(metadata || {}), + reason: args.reason || "cron", + startedAt: now, + }), + }) + } + + const failStage = async (entityType: string, error: unknown, metadata?: Record) => { + const message = + error instanceof Error ? error.message : `Failed to sync ${entityType}` + summary.ok = false + summary.status = "failed" + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType, + entityId: entityType, + status: "failed", + error: message, + metadata: JSON.stringify({ + ...(metadata || {}), + reason: args.reason || "cron", + failedAt: Date.now(), + }), + }) + } + + try { + await updateRunning("contacts") + const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) + let contactsCursor = + !args.forceFullBackfill && + contactsState.stages.contacts.metadata?.nextCursor + ? String(contactsState.stages.contacts.metadata.nextCursor) + : undefined + let contactsPages = 0 + + while (contactsPages < maxPagesPerRun) { + const fetched = await fetchGhlContactsPage(config, { + limit: contactsLimit, + cursor: contactsCursor, + }) + + if (!fetched.items.length) { + break + } + + for (const item of fetched.items) { + await ctx.runMutation(api.crm.importContact, { + provider: GHL_SYNC_PROVIDER, + entityId: String(item.id || ""), + payload: item, + }) + summary.contacts += 1 + } + + contactsPages += 1 + contactsCursor = fetched.nextCursor + if (!fetched.nextCursor) { + break + } + } + + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: "contacts", + entityId: "contacts", + cursor: contactsCursor, + status: "synced", + error: undefined, + metadata: JSON.stringify({ + imported: summary.contacts, + pages: contactsPages, + nextCursor: contactsCursor, + completedAt: Date.now(), + reason: args.reason || "cron", + }), + }) + } catch (error) { + await failStage("contacts", error) + return summary + } + + try { + await updateRunning("conversations") + const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) + const conversationCursors = { + SMS: + !args.forceFullBackfill && + previous.stages.conversations.metadata?.cursors?.SMS + ? String(previous.stages.conversations.metadata.cursors.SMS) + : undefined, + Call: + !args.forceFullBackfill && + previous.stages.conversations.metadata?.cursors?.Call + ? String(previous.stages.conversations.metadata.cursors.Call) + : undefined, + } + + for (const channel of ["SMS", "Call"] as const) { + let pages = 0 + while (pages < maxPagesPerRun) { + const fetched = await fetchGhlMessagesPage(config, { + limit: messagesLimit, + cursor: conversationCursors[channel], + channel, + }) + + if (!fetched.items.length) { + break + } + + const grouped = new Map() + for (const item of fetched.items) { + const conversationId = String(item.conversationId || item.id || "") + if (!conversationId || grouped.has(conversationId)) { + continue + } + grouped.set(conversationId, item) + } + + for (const [entityId, item] of grouped.entries()) { + await ctx.runMutation(api.crm.importConversation, { + provider: GHL_SYNC_PROVIDER, + entityId, + payload: item, + }) + summary.conversations += 1 + } + + pages += 1 + conversationCursors[channel] = fetched.nextCursor + if (!fetched.nextCursor) { + break + } + } + } + + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: "conversations", + entityId: "conversations", + cursor: conversationCursors.Call || conversationCursors.SMS, + status: "synced", + error: undefined, + metadata: JSON.stringify({ + imported: summary.conversations, + cursors: conversationCursors, + completedAt: Date.now(), + reason: args.reason || "cron", + }), + }) + } catch (error) { + await failStage("conversations", error) + return summary + } + + try { + await updateRunning("messages") + const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) + const messageCursors = { + SMS: + !args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS + ? String(previous.stages.messages.metadata.cursors.SMS) + : undefined, + Call: + !args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call + ? String(previous.stages.messages.metadata.cursors.Call) + : undefined, + } + + for (const channel of ["SMS", "Call"] as const) { + let pages = 0 + while (pages < maxPagesPerRun) { + const fetched = await fetchGhlMessagesPage(config, { + limit: messagesLimit, + cursor: messageCursors[channel], + channel, + }) + + if (!fetched.items.length) { + break + } + + for (const item of fetched.items) { + await ctx.runMutation(api.crm.importMessage, { + provider: GHL_SYNC_PROVIDER, + entityId: String(item.id || ""), + payload: item, + }) + summary.messages += 1 + } + + pages += 1 + messageCursors[channel] = fetched.nextCursor + if (!fetched.nextCursor) { + break + } + } + } + + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: "messages", + entityId: "messages", + cursor: messageCursors.Call || messageCursors.SMS, + status: "synced", + error: undefined, + metadata: JSON.stringify({ + imported: summary.messages, + cursors: messageCursors, + completedAt: Date.now(), + reason: args.reason || "cron", + }), + }) + } catch (error) { + await failStage("messages", error) + return summary + } + + try { + await updateRunning("recordings") + const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) + let nextPage = + !args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage + ? Number(previous.stages.recordings.metadata.nextPage) + : 1 + let pages = 0 + + while (pages < maxPagesPerRun) { + const fetched = await fetchGhlCallLogsPage(config, { + page: nextPage, + pageSize: recordingsPageSize, + }) + + if (!fetched.items.length) { + break + } + + for (const item of fetched.items) { + await ctx.runMutation(api.crm.importRecording, { + provider: GHL_SYNC_PROVIDER, + entityId: String(item.id || item.messageId || ""), + payload: { + ...item, + recordingId: item.messageId || item.id, + transcript: item.transcript, + recordingUrl: item.recordingUrl, + recordingStatus: item.transcript ? "completed" : "pending", + }, + }) + summary.recordings += 1 + } + + pages += 1 + const exhausted = fetched.page * fetched.pageSize >= fetched.total + nextPage = exhausted ? 1 : fetched.page + 1 + if (exhausted) { + break + } + } + + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: "recordings", + entityId: "recordings", + cursor: String(nextPage), + status: "synced", + error: undefined, + metadata: JSON.stringify({ + imported: summary.recordings, + nextPage, + completedAt: Date.now(), + reason: args.reason || "cron", + }), + }) + } catch (error) { + await failStage("recordings", error) + return summary + } + + try { + await updateRunning("reconcile") + const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, { + provider: GHL_SYNC_PROVIDER, + }) + summary.mismatches = reconcile.mismatches || [] + + await ctx.runMutation(api.crm.updateSyncCheckpoint, { + provider: GHL_SYNC_PROVIDER, + entityType: "reconcile", + entityId: "reconcile", + status: reconcile.mismatches?.length ? "mismatch" : "reconciled", + error: undefined, + metadata: JSON.stringify({ + checked: reconcile.checked, + mismatches: reconcile.mismatches || [], + completedAt: Date.now(), + reason: args.reason || "cron", + }), + }) + } catch (error) { + await failStage("reconcile", error) + return summary + } + + return summary + }, +}) + export const listAdminContacts = query({ args: { search: v.optional(v.string()), @@ -692,6 +1187,7 @@ export const listAdminContacts = query({ return { items, + sync: await buildAdminSyncOverview(ctx), pagination: { page, limit, @@ -757,6 +1253,7 @@ export const getAdminContactDetail = query({ recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId), })), timeline, + sync: await buildAdminSyncOverview(ctx), } }, }) @@ -870,6 +1367,7 @@ export const listAdminConversations = query({ return { items, + sync: await buildAdminSyncOverview(ctx), pagination: { page, limit, @@ -977,6 +1475,14 @@ export const getAdminConversationDetail = query({ intent: lead.intent, createdAt: lead.createdAt, })), + sync: await buildAdminSyncOverview(ctx), } }, }) + +export const getAdminSyncOverview = query({ + args: {}, + handler: async (ctx) => { + return await buildAdminSyncOverview(ctx) + }, +}) diff --git a/convex/crons.ts b/convex/crons.ts index 4de9c6a2..24db37fe 100644 --- a/convex/crons.ts +++ b/convex/crons.ts @@ -10,4 +10,11 @@ crons.interval( { reason: "cron" } ) +crons.interval( + "ghl-crm-mirror-sync", + { hours: 1 }, + api.crm.runGhlMirror, + { reason: "cron" } +) + export default crons diff --git a/convex/ghlMirror.ts b/convex/ghlMirror.ts new file mode 100644 index 00000000..4f0b1c60 --- /dev/null +++ b/convex/ghlMirror.ts @@ -0,0 +1,160 @@ +// @ts-nocheck + +type GhlMirrorConfig = { + token: string + locationId: string + baseUrl: string + version: string +} + +function normalizeBaseUrl(value?: string) { + return String(value || "https://services.leadconnectorhq.com").replace( + /\/+$/, + "" + ) +} + +export function readGhlMirrorConfig() { + const token = String( + process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || "" + ).trim() + const locationId = String(process.env.GHL_LOCATION_ID || "").trim() + const baseUrl = normalizeBaseUrl(process.env.GHL_API_BASE_URL) + const version = String(process.env.GHL_API_VERSION || "2021-07-28").trim() + + if (!token || !locationId) { + return null + } + + return { + token, + locationId, + baseUrl, + version, + } satisfies GhlMirrorConfig +} + +export async function fetchGhlMirrorJson( + config: GhlMirrorConfig, + pathname: string, + init?: RequestInit +) { + const response = await fetch(`${config.baseUrl}${pathname}`, { + ...init, + headers: { + Authorization: `Bearer ${config.token}`, + Version: config.version, + Accept: "application/json", + "Content-Type": "application/json", + ...(init?.headers || {}), + }, + cache: "no-store", + }) + + const text = await response.text() + let body: any = null + if (text) { + try { + body = JSON.parse(text) + } catch { + body = null + } + } + + if (!response.ok) { + throw new Error( + `GHL request failed (${response.status}) for ${pathname}: ${body?.message || text || "Unknown error"}` + ) + } + + return body +} + +export async function fetchGhlContactsPage( + config: GhlMirrorConfig, + args?: { + limit?: number + cursor?: string + } +) { + const searchParams = new URLSearchParams({ + locationId: config.locationId, + limit: String(Math.min(100, Math.max(1, args?.limit || 100))), + }) + + if (args?.cursor) { + searchParams.set("startAfterId", args.cursor) + } + + const payload = await fetchGhlMirrorJson( + config, + `/contacts/?${searchParams.toString()}` + ) + + const contacts = Array.isArray(payload?.contacts) + ? payload.contacts + : Array.isArray(payload?.data?.contacts) + ? payload.data.contacts + : [] + + const nextCursor = + contacts.length > 0 ? String(contacts[contacts.length - 1]?.id || "") : "" + + return { + items: contacts, + nextCursor: nextCursor || undefined, + } +} + +export async function fetchGhlMessagesPage( + config: GhlMirrorConfig, + args?: { + limit?: number + cursor?: string + channel?: "Call" | "SMS" + } +) { + const url = new URL(`${config.baseUrl}/conversations/messages/export`) + url.searchParams.set("locationId", config.locationId) + url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100)))) + url.searchParams.set("channel", args?.channel || "SMS") + + if (args?.cursor) { + url.searchParams.set("cursor", args.cursor) + } + + const payload = await fetchGhlMirrorJson(config, url.pathname + url.search) + + return { + items: Array.isArray(payload?.messages) ? payload.messages : [], + nextCursor: + typeof payload?.nextCursor === "string" && payload.nextCursor + ? payload.nextCursor + : undefined, + } +} + +export async function fetchGhlCallLogsPage( + config: GhlMirrorConfig, + args?: { + page?: number + pageSize?: number + } +) { + const url = new URL(`${config.baseUrl}/voice-ai/dashboard/call-logs`) + url.searchParams.set("locationId", config.locationId) + url.searchParams.set("page", String(Math.max(1, args?.page || 1))) + url.searchParams.set( + "pageSize", + String(Math.min(50, Math.max(1, args?.pageSize || 50))) + ) + + const payload = await fetchGhlMirrorJson(config, url.pathname + url.search) + + return { + items: Array.isArray(payload?.callLogs) ? payload.callLogs : [], + page: Number(payload?.page || args?.page || 1), + total: Number(payload?.total || 0), + pageSize: Number(payload?.pageSize || args?.pageSize || 50), + } +} diff --git a/convex/schema.ts b/convex/schema.ts index 7feef130..c874695b 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -407,9 +407,11 @@ export default defineSchema({ checksum: v.optional(v.string()), status: v.optional( v.union( + v.literal("running"), v.literal("pending"), v.literal("synced"), v.literal("failed"), + v.literal("missing_config"), v.literal("reconciled"), v.literal("mismatch") )