Rocky_Mountain_Vending/convex/crm.ts

1697 lines
48 KiB
TypeScript

// @ts-nocheck
import { action, mutation, query } from "./_generated/server"
import { v } from "convex/values"
import { api } from "./_generated/api"
import {
ensureConversationParticipant,
normalizeEmail,
normalizePhone,
sanitizeContactNameParts,
upsertCallArtifactRecord,
upsertContactRecord,
upsertConversationRecord,
upsertExternalSyncState,
upsertMessageRecord,
} from "./crmModel"
import {
fetchGhlCallLogsPage,
fetchGhlContactsPage,
fetchGhlMessagesPage,
readGhlMirrorConfig,
} from "./ghlMirror"
const GHL_SYNC_PROVIDER = "ghl"
const GHL_SYNC_STAGES = [
"contacts",
"conversations",
"messages",
"recordings",
"reconcile",
] as const
function safeJsonParse(value?: string) {
if (!value) {
return null
}
try {
return JSON.parse(value)
} catch {
return null
}
}
async function getSyncStateRecord(ctx, entityType) {
return await ctx.db
.query("externalSyncState")
.withIndex("by_provider_entityType_entityId", (q) =>
q
.eq("provider", GHL_SYNC_PROVIDER)
.eq("entityType", entityType)
.eq("entityId", entityType)
)
.unique()
}
async function markSyncStage(ctx, args) {
return await upsertExternalSyncState(ctx, {
provider: GHL_SYNC_PROVIDER,
entityType: args.entityType,
entityId: args.entityType,
cursor: args.cursor,
checksum: args.checksum,
status: args.status,
error: args.error,
metadata: args.metadata,
lastAttemptAt: Date.now(),
lastSyncedAt:
args.status === "synced" || args.status === "reconciled"
? Date.now()
: undefined,
})
}
function formatSyncStageSummary(state) {
const metadata = safeJsonParse(state?.metadata)
return {
status: state?.status || "pending",
lastAttemptAt: state?.lastAttemptAt || null,
lastSyncedAt: state?.lastSyncedAt || null,
error: state?.error || null,
cursor: state?.cursor || null,
metadata,
}
}
async function buildAdminSyncOverview(ctx) {
const stages = {
contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")),
conversations: formatSyncStageSummary(
await getSyncStateRecord(ctx, "conversations")
),
messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")),
recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")),
reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")),
}
const ghlConfigured = Boolean(
String(
process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
).trim() && String(process.env.GHL_LOCATION_ID || "").trim()
)
const syncTokenConfigured = Boolean(
String(process.env.GHL_SYNC_CRON_TOKEN || "").trim()
)
const livekitConfigured = Boolean(
String(process.env.LIVEKIT_URL || "").trim() &&
String(process.env.LIVEKIT_API_KEY || "").trim() &&
String(process.env.LIVEKIT_API_SECRET || "").trim()
)
const latestSyncAt = Math.max(
...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0),
0
)
const hasFailures = Object.values(stages).some(
(stage: any) =>
stage.status === "failed" || stage.status === "missing_config"
)
const hasRunning = Object.values(stages).some(
(stage: any) => stage.status === "running"
)
return {
ghlConfigured,
syncTokenConfigured,
livekitConfigured,
latestSyncAt: latestSyncAt || null,
overallStatus: hasRunning
? "running"
: hasFailures
? "attention"
: latestSyncAt
? "healthy"
: "idle",
stages,
}
}
function matchesSearch(values: Array<string | undefined>, search: string) {
if (!search) {
return true
}
const haystack = values
.map((value) => String(value || "").toLowerCase())
.join("\n")
return haystack.includes(search)
}
function normalizeConversationStatus(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (!normalized) {
return "open"
}
if (
[
"closed",
"completed",
"complete",
"ended",
"resolved",
"done",
].includes(normalized)
) {
return "closed"
}
if (
[
"archived",
"spam",
"blocked",
"blacklisted",
"do_not_contact",
"dnd",
].includes(normalized)
) {
return "archived"
}
return "open"
}
function normalizeConversationDirection(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (normalized === "inbound") {
return "inbound"
}
if (normalized === "outbound") {
return "outbound"
}
return "mixed"
}
function normalizeRecordingStatus(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (!normalized) {
return "pending"
}
if (["completed", "complete", "ready", "available"].includes(normalized)) {
return "completed"
}
if (["starting", "queued"].includes(normalized)) {
return "starting"
}
if (["recording", "in_progress", "processing"].includes(normalized)) {
return "recording"
}
if (["failed", "error"].includes(normalized)) {
return "failed"
}
return "pending"
}
function buildContactDisplay(contact?: {
firstName?: string
lastName?: string
email?: string
phone?: string
}) {
const firstName = String(contact?.firstName || "").trim()
const lastName = String(contact?.lastName || "").trim()
const fullName = [firstName, lastName].filter(Boolean).join(" ").trim()
const displayName =
fullName || contact?.phone || contact?.email || "Unknown Contact"
const secondaryLine =
fullName && contact?.phone
? contact.phone
: fullName && contact?.email
? contact.email
: fullName
? undefined
: contact?.email || contact?.phone || undefined
return {
displayName,
secondaryLine,
}
}
function buildConversationDisplayTitle(
conversation: { title?: string },
contact?: {
firstName?: string
lastName?: string
email?: string
phone?: string
} | null
) {
const title = String(conversation.title || "").trim()
if (title && title.toLowerCase() !== "unknown contact") {
return title
}
return buildContactDisplay(contact || undefined).displayName
}
async function buildContactTimeline(ctx, contactId) {
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const callArtifacts = await ctx.db
.query("callArtifacts")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
(lead) => lead.contactId === contactId
)
const timeline = [
...conversations.map((item) => ({
id: item._id,
type: "conversation",
timestamp: item.lastMessageAt || item.startedAt || item.updatedAt,
title: item.title || item.channel,
body: item.lastMessagePreview || item.summaryText || "",
status: item.status,
})),
...messages.map((item) => ({
id: item._id,
type: "message",
timestamp: item.sentAt,
title: `${item.channel.toUpperCase()} ${item.direction || "system"}`,
body: item.body,
status: item.status,
})),
...callArtifacts.map((item) => ({
id: item._id,
type: "recording",
timestamp: item.endedAt || item.startedAt || item.updatedAt,
title: item.recordingStatus || "recording",
body: item.recordingUrl || item.transcriptionText || "",
status: item.recordingStatus,
})),
...leads.map((item) => ({
id: item._id,
type: "lead",
timestamp: item.createdAt,
title: item.type,
body: item.message || item.intent || "",
status: item.status,
})),
]
timeline.sort((a, b) => b.timestamp - a.timestamp)
return timeline
}
export const upsertContact = mutation({
args: {
firstName: v.string(),
lastName: v.string(),
email: v.optional(v.string()),
phone: v.optional(v.string()),
company: v.optional(v.string()),
tags: v.optional(v.array(v.string())),
status: v.optional(
v.union(
v.literal("active"),
v.literal("lead"),
v.literal("customer"),
v.literal("inactive")
)
),
source: v.optional(v.string()),
notes: v.optional(v.string()),
ghlContactId: v.optional(v.string()),
livekitIdentity: v.optional(v.string()),
lastActivityAt: v.optional(v.number()),
},
handler: async (ctx, args) => {
return await upsertContactRecord(ctx, args)
},
})
export const upsertConversation = mutation({
args: {
contactId: v.optional(v.id("contacts")),
title: v.optional(v.string()),
channel: v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
),
source: v.optional(v.string()),
status: v.optional(
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
),
direction: v.optional(
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed"))
),
startedAt: v.optional(v.number()),
endedAt: v.optional(v.number()),
lastMessageAt: v.optional(v.number()),
lastMessagePreview: v.optional(v.string()),
unreadCount: v.optional(v.number()),
summaryText: v.optional(v.string()),
ghlConversationId: v.optional(v.string()),
livekitRoomName: v.optional(v.string()),
voiceSessionId: v.optional(v.id("voiceSessions")),
},
handler: async (ctx, args) => {
return await upsertConversationRecord(ctx, args)
},
})
export const upsertMessage = mutation({
args: {
conversationId: v.id("conversations"),
contactId: v.optional(v.id("contacts")),
direction: v.optional(
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system"))
),
channel: v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
),
source: v.optional(v.string()),
messageType: v.optional(v.string()),
body: v.string(),
status: v.optional(v.string()),
sentAt: v.optional(v.number()),
ghlMessageId: v.optional(v.string()),
voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")),
voiceSessionId: v.optional(v.id("voiceSessions")),
livekitRoomName: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertMessageRecord(ctx, args)
},
})
export const upsertCallArtifact = mutation({
args: {
conversationId: v.id("conversations"),
contactId: v.optional(v.id("contacts")),
source: v.optional(v.string()),
recordingId: v.optional(v.string()),
recordingUrl: v.optional(v.string()),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
transcriptionText: v.optional(v.string()),
durationMs: v.optional(v.number()),
startedAt: v.optional(v.number()),
endedAt: v.optional(v.number()),
ghlMessageId: v.optional(v.string()),
voiceSessionId: v.optional(v.id("voiceSessions")),
livekitRoomName: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertCallArtifactRecord(ctx, args)
},
})
export const updateContact = mutation({
args: {
contactId: v.id("contacts"),
firstName: v.optional(v.string()),
lastName: v.optional(v.string()),
email: v.optional(v.string()),
phone: v.optional(v.string()),
company: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("active"),
v.literal("lead"),
v.literal("customer"),
v.literal("inactive")
)
),
tags: v.optional(v.array(v.string())),
notes: v.optional(v.string()),
},
handler: async (ctx, args) => {
const existing = await ctx.db.get(args.contactId)
if (!existing) {
throw new Error("Contact not found")
}
await ctx.db.patch(args.contactId, {
firstName: args.firstName ?? existing.firstName,
lastName: args.lastName ?? existing.lastName,
email: args.email ?? existing.email,
normalizedEmail: normalizeEmail(args.email ?? existing.email),
phone: args.phone ?? existing.phone,
normalizedPhone: normalizePhone(args.phone ?? existing.phone),
company: args.company ?? existing.company,
status: args.status ?? existing.status,
tags: args.tags ?? existing.tags,
notes: args.notes ?? existing.notes,
updatedAt: Date.now(),
})
return await ctx.db.get(args.contactId)
},
})
export const linkLeadSubmission = mutation({
args: {
leadId: v.id("leadSubmissions"),
contactId: v.optional(v.id("contacts")),
conversationId: v.optional(v.id("conversations")),
},
handler: async (ctx, args) => {
await ctx.db.patch(args.leadId, {
contactId: args.contactId,
conversationId: args.conversationId,
updatedAt: Date.now(),
})
return await ctx.db.get(args.leadId)
},
})
export const importContact = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName || payload.first_name,
lastName: payload.lastName || payload.last_name,
fullName: payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.name,
email: payload.email,
phone: payload.phone,
company: payload.company || payload.companyName,
tags: Array.isArray(payload.tags) ? payload.tags : [],
status: "lead",
source: `${args.provider}:mirror`,
ghlContactId: payload.id || args.entityId,
lastActivityAt:
typeof payload.dateUpdated === "string"
? new Date(payload.dateUpdated).getTime()
: Date.now(),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "contact",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
contactId: contact?._id,
}),
})
return contact
},
})
export const importConversation = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName,
lastName: payload.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
email: payload.email,
phone: payload.phone || payload.contactPhone,
source: `${args.provider}:mirror`,
ghlContactId: payload.contactId,
status: "lead",
lastActivityAt:
typeof payload.dateUpdated === "string"
? new Date(payload.dateUpdated).getTime()
: Date.now(),
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title: payload.fullName || payload.title || payload.contactName,
channel:
payload.channel === "SMS" || payload.type === "sms" ? "sms" : "call",
source: `${args.provider}:mirror`,
status: normalizeConversationStatus(payload.status),
direction: normalizeConversationDirection(payload.direction),
startedAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
endedAt:
typeof payload.dateEnded === "string"
? new Date(payload.dateEnded).getTime()
: undefined,
lastMessageAt:
typeof payload.lastMessageAt === "string"
? new Date(payload.lastMessageAt).getTime()
: undefined,
lastMessagePreview: payload.lastMessageBody || payload.snippet,
summaryText: payload.summary,
ghlConversationId: payload.id || args.entityId,
})
await ensureConversationParticipant(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
role: "contact",
displayName:
[contact?.firstName, contact?.lastName].filter(Boolean).join(" ") ||
payload.contactName,
phone: contact?.phone,
email: contact?.email,
externalContactId: payload.contactId,
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "conversation",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
contactId: contact?._id,
conversationId: conversation?._id,
}),
})
return conversation
},
})
export const importMessage = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName,
lastName: payload.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
email: payload.email,
phone: payload.phone,
source: `${args.provider}:mirror`,
ghlContactId: payload.contactId,
status: "lead",
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title: payload.contactName,
channel:
payload.channel === "SMS" || payload.messageType === "SMS"
? "sms"
: payload.channel === "Call"
? "call"
: "unknown",
source: `${args.provider}:mirror`,
status: normalizeConversationStatus(payload.conversationStatus),
direction: normalizeConversationDirection(payload.direction),
startedAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
lastMessageAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
lastMessagePreview: payload.body || payload.message,
ghlConversationId: payload.conversationId,
})
await ensureConversationParticipant(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
role: "contact",
displayName: payload.contactName,
phone: contact?.phone,
email: contact?.email,
externalContactId: payload.contactId,
})
const message = await upsertMessageRecord(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
direction:
payload.direction === "inbound"
? "inbound"
: payload.direction === "outbound"
? "outbound"
: "system",
channel:
payload.channel === "SMS" || payload.messageType === "SMS"
? "sms"
: payload.channel === "Call"
? "call"
: "unknown",
source: `${args.provider}:mirror`,
messageType: payload.messageType || payload.type,
body: payload.body || payload.message || payload.transcript || "",
status: payload.status,
sentAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
ghlMessageId: payload.id || args.entityId,
metadata: JSON.stringify(payload),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "message",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
conversationId: conversation?._id,
messageId: message?._id,
}),
})
return message
},
})
export const importRecording = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const conversation = await upsertConversationRecord(ctx, {
channel: "call",
source: `${args.provider}:mirror`,
status: "closed",
direction: normalizeConversationDirection(payload.direction),
startedAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: Date.now(),
lastMessageAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: Date.now(),
lastMessagePreview: payload.summary || payload.transcript,
ghlConversationId: payload.conversationId,
livekitRoomName: payload.livekitRoomName,
})
const artifact = await upsertCallArtifactRecord(ctx, {
conversationId: conversation._id,
source: `${args.provider}:mirror`,
recordingId: payload.recordingId || payload.id || args.entityId,
recordingUrl: payload.recordingUrl,
recordingStatus: normalizeRecordingStatus(payload.recordingStatus),
transcriptionText: payload.transcript,
durationMs:
typeof payload.durationMs === "number"
? payload.durationMs
: typeof payload.duration === "number"
? payload.duration * 1000
: undefined,
startedAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: undefined,
endedAt:
typeof payload.endedAt === "string"
? new Date(payload.endedAt).getTime()
: undefined,
ghlMessageId: payload.messageId,
livekitRoomName: payload.livekitRoomName,
metadata: JSON.stringify(payload),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "recording",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
conversationId: conversation?._id,
callArtifactId: artifact?._id,
}),
})
return artifact
},
})
export const updateSyncCheckpoint = mutation({
args: {
provider: v.string(),
entityType: v.string(),
entityId: v.string(),
cursor: v.optional(v.string()),
checksum: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)
),
error: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertExternalSyncState(ctx, {
...args,
lastAttemptAt: Date.now(),
lastSyncedAt: args.status === "synced" ? Date.now() : undefined,
})
},
})
export const reconcileExternalState = mutation({
args: {
provider: v.string(),
},
handler: async (ctx, args) => {
const states = await ctx.db
.query("externalSyncState")
.withIndex("by_provider_entityType", (q) => q.eq("provider", args.provider))
.collect()
const mismatches = []
for (const state of states) {
const missing =
state.entityType === "contact"
? !(await ctx.db
.query("contacts")
.withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", state.entityId))
.unique())
: state.entityType === "conversation"
? !(await ctx.db
.query("conversations")
.withIndex("by_ghlConversationId", (q) =>
q.eq("ghlConversationId", state.entityId)
)
.unique())
: state.entityType === "message"
? !(await ctx.db
.query("messages")
.withIndex("by_ghlMessageId", (q) => q.eq("ghlMessageId", state.entityId))
.unique())
: state.entityType === "recording"
? !(await ctx.db
.query("callArtifacts")
.withIndex("by_recordingId", (q) => q.eq("recordingId", state.entityId))
.unique())
: false
if (missing) {
await ctx.db.patch(state._id, {
status: "mismatch",
error: "Referenced mirrored record is missing locally.",
updatedAt: Date.now(),
})
mismatches.push(state.entityId)
} else {
await ctx.db.patch(state._id, {
status: "reconciled",
error: undefined,
lastSyncedAt: state.lastSyncedAt ?? Date.now(),
updatedAt: Date.now(),
})
}
}
return {
provider: args.provider,
checked: states.length,
mismatches,
}
},
})
export const runGhlMirror = action({
args: {
reason: v.optional(v.string()),
forceFullBackfill: v.optional(v.boolean()),
maxPagesPerRun: v.optional(v.number()),
contactsLimit: v.optional(v.number()),
messagesLimit: v.optional(v.number()),
recordingsPageSize: v.optional(v.number()),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
const now = Date.now()
const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25))
const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100))
const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100))
const recordingsPageSize = Math.min(
50,
Math.max(1, args.recordingsPageSize || 50)
)
if (!config) {
for (const stage of GHL_SYNC_STAGES) {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: stage,
entityId: stage,
status: "missing_config",
error: "GHL credentials are not configured.",
metadata: JSON.stringify({
reason: args.reason || "cron",
checkedAt: now,
}),
})
}
return {
ok: false,
status: "missing_config",
message: "GHL credentials are not configured.",
}
}
const summary = {
ok: true,
status: "synced",
reason: args.reason || "cron",
contacts: 0,
conversations: 0,
messages: 0,
recordings: 0,
mismatches: [] as string[],
}
const updateRunning = async (entityType: string, metadata?: Record<string, any>) => {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "running",
error: undefined,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
startedAt: now,
}),
})
}
const failStage = async (entityType: string, error: unknown, metadata?: Record<string, any>) => {
const message =
error instanceof Error ? error.message : `Failed to sync ${entityType}`
summary.ok = false
summary.status = "failed"
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "failed",
error: message,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
failedAt: Date.now(),
}),
})
}
try {
await updateRunning("contacts")
const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let contactsCursor =
!args.forceFullBackfill &&
contactsState.stages.contacts.metadata?.nextCursor
? String(contactsState.stages.contacts.metadata.nextCursor)
: undefined
let contactsPages = 0
while (contactsPages < maxPagesPerRun) {
const fetched = await fetchGhlContactsPage(config, {
limit: contactsLimit,
cursor: contactsCursor,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importContact, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
summary.contacts += 1
}
contactsPages += 1
contactsCursor = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "contacts",
entityId: "contacts",
cursor: contactsCursor,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.contacts,
pages: contactsPages,
nextCursor: contactsCursor,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("contacts", error)
return summary
}
try {
await updateRunning("conversations")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const conversationCursors = {
SMS:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.SMS
? String(previous.stages.conversations.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.Call
? String(previous.stages.conversations.metadata.cursors.Call)
: undefined,
}
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: conversationCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
const grouped = new Map<string, any>()
for (const item of fetched.items) {
const conversationId = String(item.conversationId || item.id || "")
if (!conversationId || grouped.has(conversationId)) {
continue
}
grouped.set(conversationId, item)
}
for (const [entityId, item] of grouped.entries()) {
await ctx.runMutation(api.crm.importConversation, {
provider: GHL_SYNC_PROVIDER,
entityId,
payload: item,
})
summary.conversations += 1
}
pages += 1
conversationCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "conversations",
entityId: "conversations",
cursor: conversationCursors.Call || conversationCursors.SMS,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.conversations,
cursors: conversationCursors,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("conversations", error)
return summary
}
try {
await updateRunning("messages")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const messageCursors = {
SMS:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS
? String(previous.stages.messages.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call
? String(previous.stages.messages.metadata.cursors.Call)
: undefined,
}
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: messageCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
summary.messages += 1
}
pages += 1
messageCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "messages",
entityId: "messages",
cursor: messageCursors.Call || messageCursors.SMS,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.messages,
cursors: messageCursors,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("messages", error)
return summary
}
try {
await updateRunning("recordings")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let nextPage =
!args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage
? Number(previous.stages.recordings.metadata.nextPage)
: 1
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlCallLogsPage(config, {
page: nextPage,
pageSize: recordingsPageSize,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importRecording, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
recordingId: item.messageId || item.id,
transcript: item.transcript,
recordingUrl: item.recordingUrl,
recordingStatus: item.transcript ? "completed" : "pending",
},
})
summary.recordings += 1
}
pages += 1
const exhausted = fetched.page * fetched.pageSize >= fetched.total
nextPage = exhausted ? 1 : fetched.page + 1
if (exhausted) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "recordings",
entityId: "recordings",
cursor: String(nextPage),
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.recordings,
nextPage,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("recordings", error)
return summary
}
try {
await updateRunning("reconcile")
const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, {
provider: GHL_SYNC_PROVIDER,
})
summary.mismatches = reconcile.mismatches || []
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "reconcile",
entityId: "reconcile",
status: reconcile.mismatches?.length ? "mismatch" : "reconciled",
error: reconcile.mismatches?.length
? "Some mirrored records are missing locally."
: "",
metadata: JSON.stringify({
checked: reconcile.checked,
mismatches: reconcile.mismatches || [],
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("reconcile", error)
return summary
}
return summary
},
})
export const repairMirroredContacts = action({
args: {
reason: v.optional(v.string()),
maxPages: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
if (!config) {
return {
ok: false,
repaired: 0,
message: "GHL credentials are not configured.",
}
}
const maxPages = Math.min(250, Math.max(1, args.maxPages || 50))
const limit = Math.min(100, Math.max(1, args.limit || 100))
let cursor: string | undefined
let pages = 0
let repaired = 0
while (pages < maxPages) {
const fetched = await fetchGhlContactsPage(config, {
limit,
cursor,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importContact, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
repaired += 1
}
pages += 1
cursor = fetched.nextCursor
if (!cursor) {
break
}
}
return {
ok: true,
repaired,
pages,
cursor: cursor || null,
reason: args.reason || "manual-repair",
}
},
})
export const listAdminContacts = query({
args: {
search: v.optional(v.string()),
page: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const page = Math.max(1, args.page ?? 1)
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const search = String(args.search || "").trim().toLowerCase()
const contacts = await ctx.db.query("contacts").collect()
const filtered = contacts.filter((contact) =>
matchesSearch(
[
`${contact.firstName} ${contact.lastName}`,
contact.email,
contact.phone,
contact.company,
...(contact.tags || []),
],
search
)
)
filtered.sort(
(a, b) =>
(b.lastActivityAt || b.updatedAt || 0) - (a.lastActivityAt || a.updatedAt || 0)
)
const paged = filtered.slice((page - 1) * limit, page * limit)
const items = await Promise.all(
paged.map(async (contact) => {
const display = buildContactDisplay(contact)
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
.collect()
const leads = await ctx.db
.query("leadSubmissions")
.collect()
const contactLeads = leads.filter((lead) => lead.contactId === contact._id)
return {
id: contact._id,
firstName: contact.firstName,
lastName: contact.lastName,
displayName: display.displayName,
secondaryLine: display.secondaryLine,
email: contact.email,
phone: contact.phone,
company: contact.company,
tags: contact.tags || [],
status: contact.status || "lead",
source: contact.source,
ghlContactId: contact.ghlContactId,
lastActivityAt: contact.lastActivityAt,
conversationCount: conversations.length,
leadCount: contactLeads.length,
updatedAt: contact.updatedAt,
}
})
)
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
total: filtered.length,
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
},
}
},
})
export const getContactTimeline = query({
args: {
contactId: v.id("contacts"),
},
handler: async (ctx, args) => {
return await buildContactTimeline(ctx, args.contactId)
},
})
export const getAdminContactDetail = query({
args: {
contactId: v.string(),
},
handler: async (ctx, args) => {
const contact = await ctx.db.get(args.contactId as any)
if (!contact) {
return null
}
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
.collect()
conversations.sort(
(a, b) => (b.lastMessageAt || b.updatedAt) - (a.lastMessageAt || a.updatedAt)
)
const timeline = await buildContactTimeline(ctx, contact._id)
return {
contact: {
id: contact._id,
firstName: contact.firstName,
lastName: contact.lastName,
displayName: buildContactDisplay(contact).displayName,
secondaryLine: buildContactDisplay(contact).secondaryLine,
email: contact.email,
phone: contact.phone,
company: contact.company,
tags: contact.tags || [],
status: contact.status || "lead",
source: contact.source,
notes: contact.notes,
ghlContactId: contact.ghlContactId,
lastActivityAt: contact.lastActivityAt,
updatedAt: contact.updatedAt,
},
conversations: conversations.map((conversation) => ({
id: conversation._id,
channel: conversation.channel,
status: conversation.status || "open",
title: buildConversationDisplayTitle(conversation, contact),
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId),
})),
timeline,
sync: await buildAdminSyncOverview(ctx),
}
},
})
export const listAdminConversations = query({
args: {
search: v.optional(v.string()),
channel: v.optional(
v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
)
),
status: v.optional(
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
),
page: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const page = Math.max(1, args.page ?? 1)
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const search = String(args.search || "").trim().toLowerCase()
const conversations = await ctx.db.query("conversations").collect()
const filtered = []
for (const conversation of conversations) {
if (args.channel && conversation.channel !== args.channel) {
continue
}
if (args.status && (conversation.status || "open") !== args.status) {
continue
}
const contact = conversation.contactId
? await ctx.db.get(conversation.contactId)
: null
if (
!matchesSearch(
[
conversation.title,
conversation.lastMessagePreview,
conversation.ghlConversationId,
contact
? `${contact.firstName} ${contact.lastName}`
: undefined,
contact?.email,
contact?.phone,
],
search
)
) {
continue
}
filtered.push({ conversation, contact })
}
filtered.sort(
(a, b) =>
(b.conversation.lastMessageAt || b.conversation.updatedAt) -
(a.conversation.lastMessageAt || a.conversation.updatedAt)
)
const paged = filtered.slice((page - 1) * limit, page * limit)
const items = await Promise.all(
paged.map(async ({ conversation, contact }) => {
const display = buildContactDisplay(contact || undefined)
const recordings = await ctx.db
.query("callArtifacts")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
return {
id: conversation._id,
title: buildConversationDisplayTitle(conversation, contact),
channel: conversation.channel,
status: conversation.status || "open",
direction: conversation.direction || "mixed",
source: conversation.source,
startedAt: conversation.startedAt,
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
displayName: display.displayName,
secondaryLine: display.secondaryLine,
contact: contact
? {
id: contact._id,
name: display.displayName,
email: contact.email,
phone: contact.phone,
secondaryLine: display.secondaryLine,
}
: null,
messageCount: messages.length,
recordingCount: recordings.length,
}
})
)
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
total: filtered.length,
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
},
}
},
})
export const getAdminConversationDetail = query({
args: {
conversationId: v.string(),
},
handler: async (ctx, args) => {
const conversation = await ctx.db.get(args.conversationId as any)
if (!conversation) {
return null
}
const contact = conversation.contactId
? await ctx.db.get(conversation.contactId)
: null
const participants = await ctx.db
.query("conversationParticipants")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
messages.sort((a, b) => a.sentAt - b.sentAt)
const recordings = await ctx.db
.query("callArtifacts")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
(lead) =>
lead.conversationId === conversation._id ||
(contact && lead.contactId === contact._id)
)
return {
conversation: {
id: conversation._id,
title: buildConversationDisplayTitle(conversation, contact),
channel: conversation.channel,
status: conversation.status || "open",
direction: conversation.direction || "mixed",
source: conversation.source,
startedAt: conversation.startedAt,
endedAt: conversation.endedAt,
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
summaryText: conversation.summaryText,
ghlConversationId: conversation.ghlConversationId,
livekitRoomName: conversation.livekitRoomName,
},
contact: contact
? {
id: contact._id,
name: buildContactDisplay(contact).displayName,
email: contact.email,
phone: contact.phone,
company: contact.company,
secondaryLine: buildContactDisplay(contact).secondaryLine,
}
: null,
participants: participants.map((participant) => ({
id: participant._id,
role: participant.role || "unknown",
displayName: participant.displayName,
email: participant.email,
phone: participant.phone,
})),
messages: messages.map((message) => ({
id: message._id,
direction: message.direction || "system",
channel: message.channel,
source: message.source,
body: message.body,
status: message.status,
sentAt: message.sentAt,
})),
recordings: recordings.map((recording) => ({
id: recording._id,
recordingId: recording.recordingId,
recordingUrl: recording.recordingUrl,
recordingStatus: recording.recordingStatus,
transcriptionText: recording.transcriptionText,
durationMs: recording.durationMs,
startedAt: recording.startedAt,
endedAt: recording.endedAt,
})),
leads: leads.map((lead) => ({
id: lead._id,
type: lead.type,
status: lead.status,
message: lead.message,
intent: lead.intent,
createdAt: lead.createdAt,
})),
sync: await buildAdminSyncOverview(ctx),
}
},
})
export const getAdminSyncOverview = query({
args: {},
handler: async (ctx) => {
return await buildAdminSyncOverview(ctx)
},
})