@@ -136,7 +172,9 @@ export default async function AdminConversationsPage({
colSpan={8}
className="py-8 text-center text-muted-foreground"
>
- No conversations matched this filter.
+ {search || params.channel || params.status
+ ? "No conversations matched this search."
+ : getSyncMessage(data.sync)}
) : (
@@ -204,5 +242,5 @@ export default async function AdminConversationsPage({
export const metadata = {
title: "Conversations | Admin",
- description: "View backend-owned Rocky conversation threads",
+ description: "View Rocky customer conversations",
}
diff --git a/app/admin/page.tsx b/app/admin/page.tsx
index 071adf45..49c54a9a 100644
--- a/app/admin/page.tsx
+++ b/app/admin/page.tsx
@@ -1,5 +1,7 @@
import Link from "next/link"
+import { fetchQuery } from "convex/nextjs"
import { Button } from "@/components/ui/button"
+import { api } from "@/convex/_generated/api"
import {
Card,
CardContent,
@@ -58,10 +60,25 @@ async function getOrdersCount() {
return mockAnalytics.totalOrders
}
+function formatTimestamp(value?: number | null) {
+ if (!value) {
+ return "—"
+ }
+
+ return new Date(value).toLocaleString("en-US", {
+ month: "short",
+ day: "numeric",
+ year: "numeric",
+ hour: "2-digit",
+ minute: "2-digit",
+ })
+}
+
export default async function AdminDashboard() {
- const [productsCount, ordersCount] = await Promise.all([
+ const [productsCount, ordersCount, sync] = await Promise.all([
getProductsCount(),
getOrdersCount(),
+ fetchQuery(api.crm.getAdminSyncOverview, {}),
])
const dashboardCards = [
@@ -194,7 +211,7 @@ export default async function AdminDashboard() {
Admin Dashboard
- Overview of your store performance and management tools
+ Manage orders, contacts, conversations, and calls
@@ -226,6 +243,28 @@ export default async function AdminDashboard() {
+
+
+ CRM Sync Status
+
+ {!sync.ghlConfigured
+ ? "Connect GHL to load contacts and conversations."
+ : "Customer data is mirrored here from GHL and your call flows."}
+
+
+
+ {sync.overallStatus}
+ Last sync: {formatTimestamp(sync.latestSyncAt)}
+ {!sync.ghlConfigured ? GHL is not connected. : null}
+ {!sync.syncTokenConfigured ? (
+ Manual sync endpoint is not configured yet.
+ ) : null}
+ {!sync.livekitConfigured ? (
+ LiveKit recordings are not connected yet.
+ ) : null}
+
+
+
{/* Main Stats */}
{dashboardCards.map((card, index) => {
diff --git a/app/api/admin/ghl/sync/route.ts b/app/api/admin/ghl/sync/route.ts
new file mode 100644
index 00000000..923107f9
--- /dev/null
+++ b/app/api/admin/ghl/sync/route.ts
@@ -0,0 +1,39 @@
+import { NextResponse } from "next/server"
+import { fetchAction } from "convex/nextjs"
+import { api } from "@/convex/_generated/api"
+import { requireAdminToken } from "@/lib/server/admin-auth"
+
+export async function POST(request: Request) {
+ const authError = requireAdminToken(request)
+ if (authError) {
+ return authError
+ }
+
+ try {
+ const body = await request.json().catch(() => ({}))
+ const result = await fetchAction(api.crm.runGhlMirror, {
+ reason: "admin",
+ forceFullBackfill: Boolean(body.forceFullBackfill),
+ maxPagesPerRun:
+ typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined,
+ contactsLimit:
+ typeof body.contactsLimit === "number" ? body.contactsLimit : undefined,
+ messagesLimit:
+ typeof body.messagesLimit === "number" ? body.messagesLimit : undefined,
+ recordingsPageSize:
+ typeof body.recordingsPageSize === "number"
+ ? body.recordingsPageSize
+ : undefined,
+ })
+
+ return NextResponse.json(result)
+ } catch (error) {
+ console.error("Failed to run admin GHL sync:", error)
+ return NextResponse.json(
+ {
+ error: error instanceof Error ? error.message : "Failed to run GHL sync",
+ },
+ { status: 500 }
+ )
+ }
+}
diff --git a/app/api/internal/ghl/sync/run/route.ts b/app/api/internal/ghl/sync/run/route.ts
new file mode 100644
index 00000000..ad6514e8
--- /dev/null
+++ b/app/api/internal/ghl/sync/run/route.ts
@@ -0,0 +1,39 @@
+import { NextResponse } from "next/server"
+import { fetchAction } from "convex/nextjs"
+import { api } from "@/convex/_generated/api"
+import { requireGhlSyncAuth } from "@/app/api/internal/ghl/shared"
+
+export async function POST(request: Request) {
+ const authError = await requireGhlSyncAuth(request)
+ if (authError) {
+ return authError
+ }
+
+ try {
+ const body = await request.json().catch(() => ({}))
+ const result = await fetchAction(api.crm.runGhlMirror, {
+ reason: body.reason ? String(body.reason) : "internal",
+ forceFullBackfill: Boolean(body.forceFullBackfill),
+ maxPagesPerRun:
+ typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined,
+ contactsLimit:
+ typeof body.contactsLimit === "number" ? body.contactsLimit : undefined,
+ messagesLimit:
+ typeof body.messagesLimit === "number" ? body.messagesLimit : undefined,
+ recordingsPageSize:
+ typeof body.recordingsPageSize === "number"
+ ? body.recordingsPageSize
+ : undefined,
+ })
+
+ return NextResponse.json(result)
+ } catch (error) {
+ console.error("Failed to run GHL sync:", error)
+ return NextResponse.json(
+ {
+ error: error instanceof Error ? error.message : "Failed to run GHL sync",
+ },
+ { status: 500 }
+ )
+ }
+}
diff --git a/app/sign-in/[[...sign-in]]/page.tsx b/app/sign-in/[[...sign-in]]/page.tsx
index 53efc573..a31724fd 100644
--- a/app/sign-in/[[...sign-in]]/page.tsx
+++ b/app/sign-in/[[...sign-in]]/page.tsx
@@ -27,7 +27,7 @@ export default async function SignInPage({ searchParams }: PageProps) {
params.error === "invalid"
? "That email or password was not accepted."
: params.error === "config"
- ? "Admin sign-in is not fully configured yet."
+ ? "Admin access is not available right now."
: ""
return (
@@ -58,7 +58,6 @@ export default async function SignInPage({ searchParams }: PageProps) {
type="email"
autoComplete="email"
className="flex h-11 w-full rounded-md border border-input bg-background px-3 py-2 text-sm"
- placeholder="matt@rockymountainvending.com"
required
/>
@@ -88,8 +87,7 @@ export default async function SignInPage({ searchParams }: PageProps) {
Admin sign-in is not configured
- Enable admin UI, Convex, and staging credentials before using
- this area.
+ Admin access is not available right now.
>
)}
diff --git a/convex/crm.ts b/convex/crm.ts
index 1ed08c43..b16a59fa 100644
--- a/convex/crm.ts
+++ b/convex/crm.ts
@@ -1,6 +1,7 @@
// @ts-nocheck
-import { mutation, query } from "./_generated/server"
+import { action, mutation, query } from "./_generated/server"
import { v } from "convex/values"
+import { api } from "./_generated/api"
import {
ensureConversationParticipant,
normalizeEmail,
@@ -11,6 +12,128 @@ import {
upsertExternalSyncState,
upsertMessageRecord,
} from "./crmModel"
+import {
+ fetchGhlCallLogsPage,
+ fetchGhlContactsPage,
+ fetchGhlMessagesPage,
+ readGhlMirrorConfig,
+} from "./ghlMirror"
+
+const GHL_SYNC_PROVIDER = "ghl"
+const GHL_SYNC_STAGES = [
+ "contacts",
+ "conversations",
+ "messages",
+ "recordings",
+ "reconcile",
+] as const
+
+function safeJsonParse(value?: string) {
+ if (!value) {
+ return null
+ }
+
+ try {
+ return JSON.parse(value)
+ } catch {
+ return null
+ }
+}
+
+async function getSyncStateRecord(ctx, entityType) {
+ return await ctx.db
+ .query("externalSyncState")
+ .withIndex("by_provider_entityType_entityId", (q) =>
+ q
+ .eq("provider", GHL_SYNC_PROVIDER)
+ .eq("entityType", entityType)
+ .eq("entityId", entityType)
+ )
+ .unique()
+}
+
+async function markSyncStage(ctx, args) {
+ return await upsertExternalSyncState(ctx, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: args.entityType,
+ entityId: args.entityType,
+ cursor: args.cursor,
+ checksum: args.checksum,
+ status: args.status,
+ error: args.error,
+ metadata: args.metadata,
+ lastAttemptAt: Date.now(),
+ lastSyncedAt:
+ args.status === "synced" || args.status === "reconciled"
+ ? Date.now()
+ : undefined,
+ })
+}
+
+function formatSyncStageSummary(state) {
+ const metadata = safeJsonParse(state?.metadata)
+ return {
+ status: state?.status || "pending",
+ lastAttemptAt: state?.lastAttemptAt || null,
+ lastSyncedAt: state?.lastSyncedAt || null,
+ error: state?.error || null,
+ cursor: state?.cursor || null,
+ metadata,
+ }
+}
+
+async function buildAdminSyncOverview(ctx) {
+ const stages = {
+ contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")),
+ conversations: formatSyncStageSummary(
+ await getSyncStateRecord(ctx, "conversations")
+ ),
+ messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")),
+ recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")),
+ reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")),
+ }
+
+ const ghlConfigured = Boolean(
+ String(
+ process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
+ ).trim() && String(process.env.GHL_LOCATION_ID || "").trim()
+ )
+ const syncTokenConfigured = Boolean(
+ String(process.env.GHL_SYNC_CRON_TOKEN || "").trim()
+ )
+ const livekitConfigured = Boolean(
+ String(process.env.LIVEKIT_URL || "").trim() &&
+ String(process.env.LIVEKIT_API_KEY || "").trim() &&
+ String(process.env.LIVEKIT_API_SECRET || "").trim()
+ )
+
+ const latestSyncAt = Math.max(
+ ...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0),
+ 0
+ )
+ const hasFailures = Object.values(stages).some(
+ (stage: any) =>
+ stage.status === "failed" || stage.status === "missing_config"
+ )
+ const hasRunning = Object.values(stages).some(
+ (stage: any) => stage.status === "running"
+ )
+
+ return {
+ ghlConfigured,
+ syncTokenConfigured,
+ livekitConfigured,
+ latestSyncAt: latestSyncAt || null,
+ overallStatus: hasRunning
+ ? "running"
+ : hasFailures
+ ? "attention"
+ : latestSyncAt
+ ? "healthy"
+ : "idle",
+ stages,
+ }
+}
function matchesSearch(values: Array, search: string) {
if (!search) {
@@ -548,9 +671,11 @@ export const updateSyncCheckpoint = mutation({
checksum: v.optional(v.string()),
status: v.optional(
v.union(
+ v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
+ v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)
@@ -629,6 +754,376 @@ export const reconcileExternalState = mutation({
},
})
+export const runGhlMirror = action({
+ args: {
+ reason: v.optional(v.string()),
+ forceFullBackfill: v.optional(v.boolean()),
+ maxPagesPerRun: v.optional(v.number()),
+ contactsLimit: v.optional(v.number()),
+ messagesLimit: v.optional(v.number()),
+ recordingsPageSize: v.optional(v.number()),
+ },
+ handler: async (ctx, args) => {
+ const config = readGhlMirrorConfig()
+ const now = Date.now()
+ const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25))
+ const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100))
+ const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100))
+ const recordingsPageSize = Math.min(
+ 50,
+ Math.max(1, args.recordingsPageSize || 50)
+ )
+
+ if (!config) {
+ for (const stage of GHL_SYNC_STAGES) {
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: stage,
+ entityId: stage,
+ status: "missing_config",
+ error: "GHL credentials are not configured.",
+ metadata: JSON.stringify({
+ reason: args.reason || "cron",
+ checkedAt: now,
+ }),
+ })
+ }
+
+ return {
+ ok: false,
+ status: "missing_config",
+ message: "GHL credentials are not configured.",
+ }
+ }
+
+ const summary = {
+ ok: true,
+ status: "synced",
+ reason: args.reason || "cron",
+ contacts: 0,
+ conversations: 0,
+ messages: 0,
+ recordings: 0,
+ mismatches: [] as string[],
+ }
+
+ const updateRunning = async (entityType: string, metadata?: Record) => {
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType,
+ entityId: entityType,
+ status: "running",
+ error: undefined,
+ metadata: JSON.stringify({
+ ...(metadata || {}),
+ reason: args.reason || "cron",
+ startedAt: now,
+ }),
+ })
+ }
+
+ const failStage = async (entityType: string, error: unknown, metadata?: Record) => {
+ const message =
+ error instanceof Error ? error.message : `Failed to sync ${entityType}`
+ summary.ok = false
+ summary.status = "failed"
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType,
+ entityId: entityType,
+ status: "failed",
+ error: message,
+ metadata: JSON.stringify({
+ ...(metadata || {}),
+ reason: args.reason || "cron",
+ failedAt: Date.now(),
+ }),
+ })
+ }
+
+ try {
+ await updateRunning("contacts")
+ const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
+ let contactsCursor =
+ !args.forceFullBackfill &&
+ contactsState.stages.contacts.metadata?.nextCursor
+ ? String(contactsState.stages.contacts.metadata.nextCursor)
+ : undefined
+ let contactsPages = 0
+
+ while (contactsPages < maxPagesPerRun) {
+ const fetched = await fetchGhlContactsPage(config, {
+ limit: contactsLimit,
+ cursor: contactsCursor,
+ })
+
+ if (!fetched.items.length) {
+ break
+ }
+
+ for (const item of fetched.items) {
+ await ctx.runMutation(api.crm.importContact, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(item.id || ""),
+ payload: item,
+ })
+ summary.contacts += 1
+ }
+
+ contactsPages += 1
+ contactsCursor = fetched.nextCursor
+ if (!fetched.nextCursor) {
+ break
+ }
+ }
+
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: "contacts",
+ entityId: "contacts",
+ cursor: contactsCursor,
+ status: "synced",
+ error: undefined,
+ metadata: JSON.stringify({
+ imported: summary.contacts,
+ pages: contactsPages,
+ nextCursor: contactsCursor,
+ completedAt: Date.now(),
+ reason: args.reason || "cron",
+ }),
+ })
+ } catch (error) {
+ await failStage("contacts", error)
+ return summary
+ }
+
+ try {
+ await updateRunning("conversations")
+ const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
+ const conversationCursors = {
+ SMS:
+ !args.forceFullBackfill &&
+ previous.stages.conversations.metadata?.cursors?.SMS
+ ? String(previous.stages.conversations.metadata.cursors.SMS)
+ : undefined,
+ Call:
+ !args.forceFullBackfill &&
+ previous.stages.conversations.metadata?.cursors?.Call
+ ? String(previous.stages.conversations.metadata.cursors.Call)
+ : undefined,
+ }
+
+ for (const channel of ["SMS", "Call"] as const) {
+ let pages = 0
+ while (pages < maxPagesPerRun) {
+ const fetched = await fetchGhlMessagesPage(config, {
+ limit: messagesLimit,
+ cursor: conversationCursors[channel],
+ channel,
+ })
+
+ if (!fetched.items.length) {
+ break
+ }
+
+ const grouped = new Map()
+ for (const item of fetched.items) {
+ const conversationId = String(item.conversationId || item.id || "")
+ if (!conversationId || grouped.has(conversationId)) {
+ continue
+ }
+ grouped.set(conversationId, item)
+ }
+
+ for (const [entityId, item] of grouped.entries()) {
+ await ctx.runMutation(api.crm.importConversation, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId,
+ payload: item,
+ })
+ summary.conversations += 1
+ }
+
+ pages += 1
+ conversationCursors[channel] = fetched.nextCursor
+ if (!fetched.nextCursor) {
+ break
+ }
+ }
+ }
+
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: "conversations",
+ entityId: "conversations",
+ cursor: conversationCursors.Call || conversationCursors.SMS,
+ status: "synced",
+ error: undefined,
+ metadata: JSON.stringify({
+ imported: summary.conversations,
+ cursors: conversationCursors,
+ completedAt: Date.now(),
+ reason: args.reason || "cron",
+ }),
+ })
+ } catch (error) {
+ await failStage("conversations", error)
+ return summary
+ }
+
+ try {
+ await updateRunning("messages")
+ const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
+ const messageCursors = {
+ SMS:
+ !args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS
+ ? String(previous.stages.messages.metadata.cursors.SMS)
+ : undefined,
+ Call:
+ !args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call
+ ? String(previous.stages.messages.metadata.cursors.Call)
+ : undefined,
+ }
+
+ for (const channel of ["SMS", "Call"] as const) {
+ let pages = 0
+ while (pages < maxPagesPerRun) {
+ const fetched = await fetchGhlMessagesPage(config, {
+ limit: messagesLimit,
+ cursor: messageCursors[channel],
+ channel,
+ })
+
+ if (!fetched.items.length) {
+ break
+ }
+
+ for (const item of fetched.items) {
+ await ctx.runMutation(api.crm.importMessage, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(item.id || ""),
+ payload: item,
+ })
+ summary.messages += 1
+ }
+
+ pages += 1
+ messageCursors[channel] = fetched.nextCursor
+ if (!fetched.nextCursor) {
+ break
+ }
+ }
+ }
+
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: "messages",
+ entityId: "messages",
+ cursor: messageCursors.Call || messageCursors.SMS,
+ status: "synced",
+ error: undefined,
+ metadata: JSON.stringify({
+ imported: summary.messages,
+ cursors: messageCursors,
+ completedAt: Date.now(),
+ reason: args.reason || "cron",
+ }),
+ })
+ } catch (error) {
+ await failStage("messages", error)
+ return summary
+ }
+
+ try {
+ await updateRunning("recordings")
+ const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
+ let nextPage =
+ !args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage
+ ? Number(previous.stages.recordings.metadata.nextPage)
+ : 1
+ let pages = 0
+
+ while (pages < maxPagesPerRun) {
+ const fetched = await fetchGhlCallLogsPage(config, {
+ page: nextPage,
+ pageSize: recordingsPageSize,
+ })
+
+ if (!fetched.items.length) {
+ break
+ }
+
+ for (const item of fetched.items) {
+ await ctx.runMutation(api.crm.importRecording, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(item.id || item.messageId || ""),
+ payload: {
+ ...item,
+ recordingId: item.messageId || item.id,
+ transcript: item.transcript,
+ recordingUrl: item.recordingUrl,
+ recordingStatus: item.transcript ? "completed" : "pending",
+ },
+ })
+ summary.recordings += 1
+ }
+
+ pages += 1
+ const exhausted = fetched.page * fetched.pageSize >= fetched.total
+ nextPage = exhausted ? 1 : fetched.page + 1
+ if (exhausted) {
+ break
+ }
+ }
+
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: "recordings",
+ entityId: "recordings",
+ cursor: String(nextPage),
+ status: "synced",
+ error: undefined,
+ metadata: JSON.stringify({
+ imported: summary.recordings,
+ nextPage,
+ completedAt: Date.now(),
+ reason: args.reason || "cron",
+ }),
+ })
+ } catch (error) {
+ await failStage("recordings", error)
+ return summary
+ }
+
+ try {
+ await updateRunning("reconcile")
+ const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, {
+ provider: GHL_SYNC_PROVIDER,
+ })
+ summary.mismatches = reconcile.mismatches || []
+
+ await ctx.runMutation(api.crm.updateSyncCheckpoint, {
+ provider: GHL_SYNC_PROVIDER,
+ entityType: "reconcile",
+ entityId: "reconcile",
+ status: reconcile.mismatches?.length ? "mismatch" : "reconciled",
+ error: undefined,
+ metadata: JSON.stringify({
+ checked: reconcile.checked,
+ mismatches: reconcile.mismatches || [],
+ completedAt: Date.now(),
+ reason: args.reason || "cron",
+ }),
+ })
+ } catch (error) {
+ await failStage("reconcile", error)
+ return summary
+ }
+
+ return summary
+ },
+})
+
export const listAdminContacts = query({
args: {
search: v.optional(v.string()),
@@ -692,6 +1187,7 @@ export const listAdminContacts = query({
return {
items,
+ sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
@@ -757,6 +1253,7 @@ export const getAdminContactDetail = query({
recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId),
})),
timeline,
+ sync: await buildAdminSyncOverview(ctx),
}
},
})
@@ -870,6 +1367,7 @@ export const listAdminConversations = query({
return {
items,
+ sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
@@ -977,6 +1475,14 @@ export const getAdminConversationDetail = query({
intent: lead.intent,
createdAt: lead.createdAt,
})),
+ sync: await buildAdminSyncOverview(ctx),
}
},
})
+
+export const getAdminSyncOverview = query({
+ args: {},
+ handler: async (ctx) => {
+ return await buildAdminSyncOverview(ctx)
+ },
+})
diff --git a/convex/crons.ts b/convex/crons.ts
index 4de9c6a2..24db37fe 100644
--- a/convex/crons.ts
+++ b/convex/crons.ts
@@ -10,4 +10,11 @@ crons.interval(
{ reason: "cron" }
)
+crons.interval(
+ "ghl-crm-mirror-sync",
+ { hours: 1 },
+ api.crm.runGhlMirror,
+ { reason: "cron" }
+)
+
export default crons
diff --git a/convex/ghlMirror.ts b/convex/ghlMirror.ts
new file mode 100644
index 00000000..4f0b1c60
--- /dev/null
+++ b/convex/ghlMirror.ts
@@ -0,0 +1,160 @@
+// @ts-nocheck
+
+type GhlMirrorConfig = {
+ token: string
+ locationId: string
+ baseUrl: string
+ version: string
+}
+
+function normalizeBaseUrl(value?: string) {
+ return String(value || "https://services.leadconnectorhq.com").replace(
+ /\/+$/,
+ ""
+ )
+}
+
+export function readGhlMirrorConfig() {
+ const token = String(
+ process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
+ ).trim()
+ const locationId = String(process.env.GHL_LOCATION_ID || "").trim()
+ const baseUrl = normalizeBaseUrl(process.env.GHL_API_BASE_URL)
+ const version = String(process.env.GHL_API_VERSION || "2021-07-28").trim()
+
+ if (!token || !locationId) {
+ return null
+ }
+
+ return {
+ token,
+ locationId,
+ baseUrl,
+ version,
+ } satisfies GhlMirrorConfig
+}
+
+export async function fetchGhlMirrorJson(
+ config: GhlMirrorConfig,
+ pathname: string,
+ init?: RequestInit
+) {
+ const response = await fetch(`${config.baseUrl}${pathname}`, {
+ ...init,
+ headers: {
+ Authorization: `Bearer ${config.token}`,
+ Version: config.version,
+ Accept: "application/json",
+ "Content-Type": "application/json",
+ ...(init?.headers || {}),
+ },
+ cache: "no-store",
+ })
+
+ const text = await response.text()
+ let body: any = null
+ if (text) {
+ try {
+ body = JSON.parse(text)
+ } catch {
+ body = null
+ }
+ }
+
+ if (!response.ok) {
+ throw new Error(
+ `GHL request failed (${response.status}) for ${pathname}: ${body?.message || text || "Unknown error"}`
+ )
+ }
+
+ return body
+}
+
+export async function fetchGhlContactsPage(
+ config: GhlMirrorConfig,
+ args?: {
+ limit?: number
+ cursor?: string
+ }
+) {
+ const searchParams = new URLSearchParams({
+ locationId: config.locationId,
+ limit: String(Math.min(100, Math.max(1, args?.limit || 100))),
+ })
+
+ if (args?.cursor) {
+ searchParams.set("startAfterId", args.cursor)
+ }
+
+ const payload = await fetchGhlMirrorJson(
+ config,
+ `/contacts/?${searchParams.toString()}`
+ )
+
+ const contacts = Array.isArray(payload?.contacts)
+ ? payload.contacts
+ : Array.isArray(payload?.data?.contacts)
+ ? payload.data.contacts
+ : []
+
+ const nextCursor =
+ contacts.length > 0 ? String(contacts[contacts.length - 1]?.id || "") : ""
+
+ return {
+ items: contacts,
+ nextCursor: nextCursor || undefined,
+ }
+}
+
+export async function fetchGhlMessagesPage(
+ config: GhlMirrorConfig,
+ args?: {
+ limit?: number
+ cursor?: string
+ channel?: "Call" | "SMS"
+ }
+) {
+ const url = new URL(`${config.baseUrl}/conversations/messages/export`)
+ url.searchParams.set("locationId", config.locationId)
+ url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100))))
+ url.searchParams.set("channel", args?.channel || "SMS")
+
+ if (args?.cursor) {
+ url.searchParams.set("cursor", args.cursor)
+ }
+
+ const payload = await fetchGhlMirrorJson(config, url.pathname + url.search)
+
+ return {
+ items: Array.isArray(payload?.messages) ? payload.messages : [],
+ nextCursor:
+ typeof payload?.nextCursor === "string" && payload.nextCursor
+ ? payload.nextCursor
+ : undefined,
+ }
+}
+
+export async function fetchGhlCallLogsPage(
+ config: GhlMirrorConfig,
+ args?: {
+ page?: number
+ pageSize?: number
+ }
+) {
+ const url = new URL(`${config.baseUrl}/voice-ai/dashboard/call-logs`)
+ url.searchParams.set("locationId", config.locationId)
+ url.searchParams.set("page", String(Math.max(1, args?.page || 1)))
+ url.searchParams.set(
+ "pageSize",
+ String(Math.min(50, Math.max(1, args?.pageSize || 50)))
+ )
+
+ const payload = await fetchGhlMirrorJson(config, url.pathname + url.search)
+
+ return {
+ items: Array.isArray(payload?.callLogs) ? payload.callLogs : [],
+ page: Number(payload?.page || args?.page || 1),
+ total: Number(payload?.total || 0),
+ pageSize: Number(payload?.pageSize || args?.pageSize || 50),
+ }
+}
diff --git a/convex/schema.ts b/convex/schema.ts
index 7feef130..c874695b 100644
--- a/convex/schema.ts
+++ b/convex/schema.ts
@@ -407,9 +407,11 @@ export default defineSchema({
checksum: v.optional(v.string()),
status: v.optional(
v.union(
+ v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
+ v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)