feat: add GHL CRM sync status and runner

This commit is contained in:
DMleadgen 2026-04-16 11:40:19 -06:00
parent a1799715c6
commit 133ed6d6f3
Signed by: matt
GPG key ID: C2720CF8CD701894
12 changed files with 892 additions and 31 deletions

View file

@ -57,7 +57,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) {
{detail.contact.firstName} {detail.contact.lastName}
</h1>
<p className="text-muted-foreground">
Unified CRM record across forms, calls, SMS, and sync imports.
Contact details and activity history.
</p>
</div>
@ -68,9 +68,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) {
<ContactRound className="h-5 w-5" />
Contact Profile
</CardTitle>
<CardDescription>
Backend-owned identity and sync metadata.
</CardDescription>
<CardDescription>Basic details and connected records.</CardDescription>
</CardHeader>
<CardContent className="grid gap-4 md:grid-cols-2">
<div>
@ -127,7 +125,7 @@ export default async function AdminContactDetailPage({ params }: PageProps) {
Conversations
</CardTitle>
<CardDescription>
Every mirrored conversation associated to this contact.
Conversations linked to this contact.
</CardDescription>
</CardHeader>
<CardContent className="space-y-3">
@ -197,5 +195,5 @@ export default async function AdminContactDetailPage({ params }: PageProps) {
export const metadata = {
title: "Contact Detail | Admin",
description: "Review a Rocky CRM contact and full interaction timeline",
description: "Review a contact and full interaction timeline",
}

View file

@ -34,6 +34,22 @@ function formatTimestamp(value?: number) {
})
}
function getSyncMessage(sync: any) {
if (!sync.ghlConfigured) {
return "Connect GHL to load contacts and conversations."
}
if (sync.stages.contacts.status === "running") {
return "Contacts are syncing now."
}
if (sync.stages.contacts.error) {
return "Contacts could not be loaded from GHL yet."
}
if (!sync.latestSyncAt) {
return "No contacts yet."
}
return "Your contact list stays up to date from forms, calls, and GHL."
}
export default async function AdminContactsPage({ searchParams }: PageProps) {
const params = await searchParams
const page = Math.max(1, Number.parseInt(params.page || "1", 10) || 1)
@ -54,8 +70,7 @@ export default async function AdminContactsPage({ searchParams }: PageProps) {
Contacts
</h1>
<p className="mt-2 text-muted-foreground">
Backend-owned CRM contacts mirrored from forms, phone calls, and
GHL sync.
All customer contacts in one place.
</p>
</div>
<Link href="/admin">
@ -63,6 +78,28 @@ export default async function AdminContactsPage({ searchParams }: PageProps) {
</Link>
</div>
<Card>
<CardHeader>
<CardTitle>Sync Status</CardTitle>
<CardDescription>{getSyncMessage(data.sync)}</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-3 text-sm text-muted-foreground">
<Badge variant="outline">{data.sync.overallStatus}</Badge>
<span>
Last sync: {formatTimestamp(data.sync.latestSyncAt || undefined)}
</span>
{!data.sync.ghlConfigured ? (
<span>GHL is not connected.</span>
) : null}
{!data.sync.syncTokenConfigured ? (
<span>Manual sync endpoint is not configured yet.</span>
) : null}
{data.sync.stages.contacts.error ? (
<span>{data.sync.stages.contacts.error}</span>
) : null}
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
@ -107,7 +144,9 @@ export default async function AdminContactsPage({ searchParams }: PageProps) {
colSpan={7}
className="py-8 text-center text-muted-foreground"
>
No contacts matched this filter.
{search
? "No contacts matched this search."
: getSyncMessage(data.sync)}
</td>
</tr>
) : (
@ -160,5 +199,5 @@ export default async function AdminContactsPage({ searchParams }: PageProps) {
export const metadata = {
title: "Contacts | Admin",
description: "View backend-owned Rocky contact records",
description: "View Rocky customer contacts",
}

View file

@ -69,7 +69,7 @@ export default async function AdminConversationDetailPage({
{detail.conversation.title || "Conversation Detail"}
</h1>
<p className="text-muted-foreground">
Unified thread for Rocky-owned conversation management.
Full conversation history in one place.
</p>
</div>
@ -80,9 +80,7 @@ export default async function AdminConversationDetailPage({
<MessageSquare className="h-5 w-5" />
Conversation Status
</CardTitle>
<CardDescription>
Channel, ownership, and sync metadata.
</CardDescription>
<CardDescription>Channel, contact, and latest activity.</CardDescription>
</CardHeader>
<CardContent className="grid gap-4 md:grid-cols-2">
<div>
@ -206,9 +204,7 @@ export default async function AdminConversationDetailPage({
<Card>
<CardHeader>
<CardTitle>Messages</CardTitle>
<CardDescription>
Full backend-owned thread history for this conversation.
</CardDescription>
<CardDescription>Message history for this conversation.</CardDescription>
</CardHeader>
<CardContent className="space-y-3">
{detail.messages.length === 0 ? (
@ -237,5 +233,5 @@ export default async function AdminConversationDetailPage({
export const metadata = {
title: "Conversation Detail | Admin",
description: "Review a Rocky conversation thread, recordings, and leads",
description: "Review a conversation, recordings, and leads",
}

View file

@ -36,6 +36,22 @@ function formatTimestamp(value?: number) {
})
}
function getSyncMessage(sync: any) {
if (!sync.ghlConfigured) {
return "Connect GHL to load contacts and conversations."
}
if (sync.stages.conversations.status === "running") {
return "Conversations are syncing now."
}
if (sync.stages.conversations.error) {
return "Conversations could not be loaded from GHL yet."
}
if (!sync.latestSyncAt) {
return "No conversations yet."
}
return "Calls and messages appear here as they are synced."
}
export default async function AdminConversationsPage({
searchParams,
}: PageProps) {
@ -60,8 +76,7 @@ export default async function AdminConversationsPage({
Conversations
</h1>
<p className="mt-2 text-muted-foreground">
Unified inbox across backend-owned call and SMS conversation
threads.
Customer conversations in one inbox.
</p>
</div>
<Link href="/admin">
@ -69,6 +84,28 @@ export default async function AdminConversationsPage({
</Link>
</div>
<Card>
<CardHeader>
<CardTitle>Sync Status</CardTitle>
<CardDescription>{getSyncMessage(data.sync)}</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-3 text-sm text-muted-foreground">
<Badge variant="outline">{data.sync.overallStatus}</Badge>
<span>
Last sync: {formatTimestamp(data.sync.latestSyncAt || undefined)}
</span>
{!data.sync.ghlConfigured ? (
<span>GHL is not connected.</span>
) : null}
{!data.sync.syncTokenConfigured ? (
<span>Manual sync endpoint is not configured yet.</span>
) : null}
{data.sync.stages.conversations.error ? (
<span>{data.sync.stages.conversations.error}</span>
) : null}
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
@ -76,8 +113,7 @@ export default async function AdminConversationsPage({
Conversation Inbox
</CardTitle>
<CardDescription>
Search by contact, conversation preview, phone, email, or external
ID.
Search by contact, phone, email, or recent message.
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
@ -136,7 +172,9 @@ export default async function AdminConversationsPage({
colSpan={8}
className="py-8 text-center text-muted-foreground"
>
No conversations matched this filter.
{search || params.channel || params.status
? "No conversations matched this search."
: getSyncMessage(data.sync)}
</td>
</tr>
) : (
@ -204,5 +242,5 @@ export default async function AdminConversationsPage({
export const metadata = {
title: "Conversations | Admin",
description: "View backend-owned Rocky conversation threads",
description: "View Rocky customer conversations",
}

View file

@ -1,5 +1,7 @@
import Link from "next/link"
import { fetchQuery } from "convex/nextjs"
import { Button } from "@/components/ui/button"
import { api } from "@/convex/_generated/api"
import {
Card,
CardContent,
@ -58,10 +60,25 @@ async function getOrdersCount() {
return mockAnalytics.totalOrders
}
function formatTimestamp(value?: number | null) {
if (!value) {
return "—"
}
return new Date(value).toLocaleString("en-US", {
month: "short",
day: "numeric",
year: "numeric",
hour: "2-digit",
minute: "2-digit",
})
}
export default async function AdminDashboard() {
const [productsCount, ordersCount] = await Promise.all([
const [productsCount, ordersCount, sync] = await Promise.all([
getProductsCount(),
getOrdersCount(),
fetchQuery(api.crm.getAdminSyncOverview, {}),
])
const dashboardCards = [
@ -194,7 +211,7 @@ export default async function AdminDashboard() {
Admin Dashboard
</h1>
<p className="text-muted-foreground mt-2">
Overview of your store performance and management tools
Manage orders, contacts, conversations, and calls
</p>
</div>
<div className="flex gap-2">
@ -226,6 +243,28 @@ export default async function AdminDashboard() {
</div>
</div>
<Card>
<CardHeader>
<CardTitle>CRM Sync Status</CardTitle>
<CardDescription>
{!sync.ghlConfigured
? "Connect GHL to load contacts and conversations."
: "Customer data is mirrored here from GHL and your call flows."}
</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-3 text-sm text-muted-foreground">
<Badge variant="outline">{sync.overallStatus}</Badge>
<span>Last sync: {formatTimestamp(sync.latestSyncAt)}</span>
{!sync.ghlConfigured ? <span>GHL is not connected.</span> : null}
{!sync.syncTokenConfigured ? (
<span>Manual sync endpoint is not configured yet.</span>
) : null}
{!sync.livekitConfigured ? (
<span>LiveKit recordings are not connected yet.</span>
) : null}
</CardContent>
</Card>
{/* Main Stats */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6">
{dashboardCards.map((card, index) => {

View file

@ -0,0 +1,39 @@
import { NextResponse } from "next/server"
import { fetchAction } from "convex/nextjs"
import { api } from "@/convex/_generated/api"
import { requireAdminToken } from "@/lib/server/admin-auth"
export async function POST(request: Request) {
const authError = requireAdminToken(request)
if (authError) {
return authError
}
try {
const body = await request.json().catch(() => ({}))
const result = await fetchAction(api.crm.runGhlMirror, {
reason: "admin",
forceFullBackfill: Boolean(body.forceFullBackfill),
maxPagesPerRun:
typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined,
contactsLimit:
typeof body.contactsLimit === "number" ? body.contactsLimit : undefined,
messagesLimit:
typeof body.messagesLimit === "number" ? body.messagesLimit : undefined,
recordingsPageSize:
typeof body.recordingsPageSize === "number"
? body.recordingsPageSize
: undefined,
})
return NextResponse.json(result)
} catch (error) {
console.error("Failed to run admin GHL sync:", error)
return NextResponse.json(
{
error: error instanceof Error ? error.message : "Failed to run GHL sync",
},
{ status: 500 }
)
}
}

View file

@ -0,0 +1,39 @@
import { NextResponse } from "next/server"
import { fetchAction } from "convex/nextjs"
import { api } from "@/convex/_generated/api"
import { requireGhlSyncAuth } from "@/app/api/internal/ghl/shared"
export async function POST(request: Request) {
const authError = await requireGhlSyncAuth(request)
if (authError) {
return authError
}
try {
const body = await request.json().catch(() => ({}))
const result = await fetchAction(api.crm.runGhlMirror, {
reason: body.reason ? String(body.reason) : "internal",
forceFullBackfill: Boolean(body.forceFullBackfill),
maxPagesPerRun:
typeof body.maxPagesPerRun === "number" ? body.maxPagesPerRun : undefined,
contactsLimit:
typeof body.contactsLimit === "number" ? body.contactsLimit : undefined,
messagesLimit:
typeof body.messagesLimit === "number" ? body.messagesLimit : undefined,
recordingsPageSize:
typeof body.recordingsPageSize === "number"
? body.recordingsPageSize
: undefined,
})
return NextResponse.json(result)
} catch (error) {
console.error("Failed to run GHL sync:", error)
return NextResponse.json(
{
error: error instanceof Error ? error.message : "Failed to run GHL sync",
},
{ status: 500 }
)
}
}

View file

@ -27,7 +27,7 @@ export default async function SignInPage({ searchParams }: PageProps) {
params.error === "invalid"
? "That email or password was not accepted."
: params.error === "config"
? "Admin sign-in is not fully configured yet."
? "Admin access is not available right now."
: ""
return (
@ -58,7 +58,6 @@ export default async function SignInPage({ searchParams }: PageProps) {
type="email"
autoComplete="email"
className="flex h-11 w-full rounded-md border border-input bg-background px-3 py-2 text-sm"
placeholder="matt@rockymountainvending.com"
required
/>
</div>
@ -88,8 +87,7 @@ export default async function SignInPage({ searchParams }: PageProps) {
Admin sign-in is not configured
</h2>
<p className="mt-3 text-sm text-muted-foreground">
Enable admin UI, Convex, and staging credentials before using
this area.
Admin access is not available right now.
</p>
</>
)}

View file

@ -1,6 +1,7 @@
// @ts-nocheck
import { mutation, query } from "./_generated/server"
import { action, mutation, query } from "./_generated/server"
import { v } from "convex/values"
import { api } from "./_generated/api"
import {
ensureConversationParticipant,
normalizeEmail,
@ -11,6 +12,128 @@ import {
upsertExternalSyncState,
upsertMessageRecord,
} from "./crmModel"
import {
fetchGhlCallLogsPage,
fetchGhlContactsPage,
fetchGhlMessagesPage,
readGhlMirrorConfig,
} from "./ghlMirror"
const GHL_SYNC_PROVIDER = "ghl"
const GHL_SYNC_STAGES = [
"contacts",
"conversations",
"messages",
"recordings",
"reconcile",
] as const
function safeJsonParse(value?: string) {
if (!value) {
return null
}
try {
return JSON.parse(value)
} catch {
return null
}
}
async function getSyncStateRecord(ctx, entityType) {
return await ctx.db
.query("externalSyncState")
.withIndex("by_provider_entityType_entityId", (q) =>
q
.eq("provider", GHL_SYNC_PROVIDER)
.eq("entityType", entityType)
.eq("entityId", entityType)
)
.unique()
}
async function markSyncStage(ctx, args) {
return await upsertExternalSyncState(ctx, {
provider: GHL_SYNC_PROVIDER,
entityType: args.entityType,
entityId: args.entityType,
cursor: args.cursor,
checksum: args.checksum,
status: args.status,
error: args.error,
metadata: args.metadata,
lastAttemptAt: Date.now(),
lastSyncedAt:
args.status === "synced" || args.status === "reconciled"
? Date.now()
: undefined,
})
}
function formatSyncStageSummary(state) {
const metadata = safeJsonParse(state?.metadata)
return {
status: state?.status || "pending",
lastAttemptAt: state?.lastAttemptAt || null,
lastSyncedAt: state?.lastSyncedAt || null,
error: state?.error || null,
cursor: state?.cursor || null,
metadata,
}
}
async function buildAdminSyncOverview(ctx) {
const stages = {
contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")),
conversations: formatSyncStageSummary(
await getSyncStateRecord(ctx, "conversations")
),
messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")),
recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")),
reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")),
}
const ghlConfigured = Boolean(
String(
process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
).trim() && String(process.env.GHL_LOCATION_ID || "").trim()
)
const syncTokenConfigured = Boolean(
String(process.env.GHL_SYNC_CRON_TOKEN || "").trim()
)
const livekitConfigured = Boolean(
String(process.env.LIVEKIT_URL || "").trim() &&
String(process.env.LIVEKIT_API_KEY || "").trim() &&
String(process.env.LIVEKIT_API_SECRET || "").trim()
)
const latestSyncAt = Math.max(
...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0),
0
)
const hasFailures = Object.values(stages).some(
(stage: any) =>
stage.status === "failed" || stage.status === "missing_config"
)
const hasRunning = Object.values(stages).some(
(stage: any) => stage.status === "running"
)
return {
ghlConfigured,
syncTokenConfigured,
livekitConfigured,
latestSyncAt: latestSyncAt || null,
overallStatus: hasRunning
? "running"
: hasFailures
? "attention"
: latestSyncAt
? "healthy"
: "idle",
stages,
}
}
function matchesSearch(values: Array<string | undefined>, search: string) {
if (!search) {
@ -548,9 +671,11 @@ export const updateSyncCheckpoint = mutation({
checksum: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)
@ -629,6 +754,376 @@ export const reconcileExternalState = mutation({
},
})
export const runGhlMirror = action({
args: {
reason: v.optional(v.string()),
forceFullBackfill: v.optional(v.boolean()),
maxPagesPerRun: v.optional(v.number()),
contactsLimit: v.optional(v.number()),
messagesLimit: v.optional(v.number()),
recordingsPageSize: v.optional(v.number()),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
const now = Date.now()
const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25))
const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100))
const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100))
const recordingsPageSize = Math.min(
50,
Math.max(1, args.recordingsPageSize || 50)
)
if (!config) {
for (const stage of GHL_SYNC_STAGES) {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: stage,
entityId: stage,
status: "missing_config",
error: "GHL credentials are not configured.",
metadata: JSON.stringify({
reason: args.reason || "cron",
checkedAt: now,
}),
})
}
return {
ok: false,
status: "missing_config",
message: "GHL credentials are not configured.",
}
}
const summary = {
ok: true,
status: "synced",
reason: args.reason || "cron",
contacts: 0,
conversations: 0,
messages: 0,
recordings: 0,
mismatches: [] as string[],
}
const updateRunning = async (entityType: string, metadata?: Record<string, any>) => {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "running",
error: undefined,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
startedAt: now,
}),
})
}
const failStage = async (entityType: string, error: unknown, metadata?: Record<string, any>) => {
const message =
error instanceof Error ? error.message : `Failed to sync ${entityType}`
summary.ok = false
summary.status = "failed"
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "failed",
error: message,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
failedAt: Date.now(),
}),
})
}
try {
await updateRunning("contacts")
const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let contactsCursor =
!args.forceFullBackfill &&
contactsState.stages.contacts.metadata?.nextCursor
? String(contactsState.stages.contacts.metadata.nextCursor)
: undefined
let contactsPages = 0
while (contactsPages < maxPagesPerRun) {
const fetched = await fetchGhlContactsPage(config, {
limit: contactsLimit,
cursor: contactsCursor,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importContact, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
summary.contacts += 1
}
contactsPages += 1
contactsCursor = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "contacts",
entityId: "contacts",
cursor: contactsCursor,
status: "synced",
error: undefined,
metadata: JSON.stringify({
imported: summary.contacts,
pages: contactsPages,
nextCursor: contactsCursor,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("contacts", error)
return summary
}
try {
await updateRunning("conversations")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const conversationCursors = {
SMS:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.SMS
? String(previous.stages.conversations.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.Call
? String(previous.stages.conversations.metadata.cursors.Call)
: undefined,
}
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: conversationCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
const grouped = new Map<string, any>()
for (const item of fetched.items) {
const conversationId = String(item.conversationId || item.id || "")
if (!conversationId || grouped.has(conversationId)) {
continue
}
grouped.set(conversationId, item)
}
for (const [entityId, item] of grouped.entries()) {
await ctx.runMutation(api.crm.importConversation, {
provider: GHL_SYNC_PROVIDER,
entityId,
payload: item,
})
summary.conversations += 1
}
pages += 1
conversationCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "conversations",
entityId: "conversations",
cursor: conversationCursors.Call || conversationCursors.SMS,
status: "synced",
error: undefined,
metadata: JSON.stringify({
imported: summary.conversations,
cursors: conversationCursors,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("conversations", error)
return summary
}
try {
await updateRunning("messages")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const messageCursors = {
SMS:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS
? String(previous.stages.messages.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call
? String(previous.stages.messages.metadata.cursors.Call)
: undefined,
}
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: messageCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
summary.messages += 1
}
pages += 1
messageCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "messages",
entityId: "messages",
cursor: messageCursors.Call || messageCursors.SMS,
status: "synced",
error: undefined,
metadata: JSON.stringify({
imported: summary.messages,
cursors: messageCursors,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("messages", error)
return summary
}
try {
await updateRunning("recordings")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let nextPage =
!args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage
? Number(previous.stages.recordings.metadata.nextPage)
: 1
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlCallLogsPage(config, {
page: nextPage,
pageSize: recordingsPageSize,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importRecording, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
recordingId: item.messageId || item.id,
transcript: item.transcript,
recordingUrl: item.recordingUrl,
recordingStatus: item.transcript ? "completed" : "pending",
},
})
summary.recordings += 1
}
pages += 1
const exhausted = fetched.page * fetched.pageSize >= fetched.total
nextPage = exhausted ? 1 : fetched.page + 1
if (exhausted) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "recordings",
entityId: "recordings",
cursor: String(nextPage),
status: "synced",
error: undefined,
metadata: JSON.stringify({
imported: summary.recordings,
nextPage,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("recordings", error)
return summary
}
try {
await updateRunning("reconcile")
const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, {
provider: GHL_SYNC_PROVIDER,
})
summary.mismatches = reconcile.mismatches || []
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "reconcile",
entityId: "reconcile",
status: reconcile.mismatches?.length ? "mismatch" : "reconciled",
error: undefined,
metadata: JSON.stringify({
checked: reconcile.checked,
mismatches: reconcile.mismatches || [],
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("reconcile", error)
return summary
}
return summary
},
})
export const listAdminContacts = query({
args: {
search: v.optional(v.string()),
@ -692,6 +1187,7 @@ export const listAdminContacts = query({
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
@ -757,6 +1253,7 @@ export const getAdminContactDetail = query({
recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId),
})),
timeline,
sync: await buildAdminSyncOverview(ctx),
}
},
})
@ -870,6 +1367,7 @@ export const listAdminConversations = query({
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
@ -977,6 +1475,14 @@ export const getAdminConversationDetail = query({
intent: lead.intent,
createdAt: lead.createdAt,
})),
sync: await buildAdminSyncOverview(ctx),
}
},
})
export const getAdminSyncOverview = query({
args: {},
handler: async (ctx) => {
return await buildAdminSyncOverview(ctx)
},
})

View file

@ -10,4 +10,11 @@ crons.interval(
{ reason: "cron" }
)
crons.interval(
"ghl-crm-mirror-sync",
{ hours: 1 },
api.crm.runGhlMirror,
{ reason: "cron" }
)
export default crons

160
convex/ghlMirror.ts Normal file
View file

@ -0,0 +1,160 @@
// @ts-nocheck
type GhlMirrorConfig = {
token: string
locationId: string
baseUrl: string
version: string
}
function normalizeBaseUrl(value?: string) {
return String(value || "https://services.leadconnectorhq.com").replace(
/\/+$/,
""
)
}
export function readGhlMirrorConfig() {
const token = String(
process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
).trim()
const locationId = String(process.env.GHL_LOCATION_ID || "").trim()
const baseUrl = normalizeBaseUrl(process.env.GHL_API_BASE_URL)
const version = String(process.env.GHL_API_VERSION || "2021-07-28").trim()
if (!token || !locationId) {
return null
}
return {
token,
locationId,
baseUrl,
version,
} satisfies GhlMirrorConfig
}
export async function fetchGhlMirrorJson(
config: GhlMirrorConfig,
pathname: string,
init?: RequestInit
) {
const response = await fetch(`${config.baseUrl}${pathname}`, {
...init,
headers: {
Authorization: `Bearer ${config.token}`,
Version: config.version,
Accept: "application/json",
"Content-Type": "application/json",
...(init?.headers || {}),
},
cache: "no-store",
})
const text = await response.text()
let body: any = null
if (text) {
try {
body = JSON.parse(text)
} catch {
body = null
}
}
if (!response.ok) {
throw new Error(
`GHL request failed (${response.status}) for ${pathname}: ${body?.message || text || "Unknown error"}`
)
}
return body
}
export async function fetchGhlContactsPage(
config: GhlMirrorConfig,
args?: {
limit?: number
cursor?: string
}
) {
const searchParams = new URLSearchParams({
locationId: config.locationId,
limit: String(Math.min(100, Math.max(1, args?.limit || 100))),
})
if (args?.cursor) {
searchParams.set("startAfterId", args.cursor)
}
const payload = await fetchGhlMirrorJson(
config,
`/contacts/?${searchParams.toString()}`
)
const contacts = Array.isArray(payload?.contacts)
? payload.contacts
: Array.isArray(payload?.data?.contacts)
? payload.data.contacts
: []
const nextCursor =
contacts.length > 0 ? String(contacts[contacts.length - 1]?.id || "") : ""
return {
items: contacts,
nextCursor: nextCursor || undefined,
}
}
export async function fetchGhlMessagesPage(
config: GhlMirrorConfig,
args?: {
limit?: number
cursor?: string
channel?: "Call" | "SMS"
}
) {
const url = new URL(`${config.baseUrl}/conversations/messages/export`)
url.searchParams.set("locationId", config.locationId)
url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100))))
url.searchParams.set("channel", args?.channel || "SMS")
if (args?.cursor) {
url.searchParams.set("cursor", args.cursor)
}
const payload = await fetchGhlMirrorJson(config, url.pathname + url.search)
return {
items: Array.isArray(payload?.messages) ? payload.messages : [],
nextCursor:
typeof payload?.nextCursor === "string" && payload.nextCursor
? payload.nextCursor
: undefined,
}
}
export async function fetchGhlCallLogsPage(
config: GhlMirrorConfig,
args?: {
page?: number
pageSize?: number
}
) {
const url = new URL(`${config.baseUrl}/voice-ai/dashboard/call-logs`)
url.searchParams.set("locationId", config.locationId)
url.searchParams.set("page", String(Math.max(1, args?.page || 1)))
url.searchParams.set(
"pageSize",
String(Math.min(50, Math.max(1, args?.pageSize || 50)))
)
const payload = await fetchGhlMirrorJson(config, url.pathname + url.search)
return {
items: Array.isArray(payload?.callLogs) ? payload.callLogs : [],
page: Number(payload?.page || args?.page || 1),
total: Number(payload?.total || 0),
pageSize: Number(payload?.pageSize || args?.pageSize || 50),
}
}

View file

@ -407,9 +407,11 @@ export default defineSchema({
checksum: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)