Rocky_Mountain_Vending/convex/crm.ts

1976 lines
56 KiB
TypeScript

// @ts-nocheck
import { action, mutation, query } from "./_generated/server"
import { v } from "convex/values"
import { api } from "./_generated/api"
import {
ensureConversationParticipant,
normalizeEmail,
normalizePhone,
sanitizeContactNameParts,
upsertCallArtifactRecord,
upsertContactRecord,
upsertConversationRecord,
upsertExternalSyncState,
upsertMessageRecord,
} from "./crmModel"
import {
fetchGhlCallLogsPage,
fetchGhlConversationsPage,
fetchGhlConversationMessages,
fetchGhlContactsPage,
fetchGhlMessagesPage,
readGhlMirrorConfig,
sendGhlConversationMessage,
} from "./ghlMirror"
const GHL_SYNC_PROVIDER = "ghl"
const GHL_SYNC_STAGES = [
"contacts",
"conversations",
"messages",
"recordings",
"reconcile",
] as const
function safeJsonParse(value?: string) {
if (!value) {
return null
}
try {
return JSON.parse(value)
} catch {
return null
}
}
async function getSyncStateRecord(ctx, entityType) {
return await ctx.db
.query("externalSyncState")
.withIndex("by_provider_entityType_entityId", (q) =>
q
.eq("provider", GHL_SYNC_PROVIDER)
.eq("entityType", entityType)
.eq("entityId", entityType)
)
.unique()
}
async function markSyncStage(ctx, args) {
return await upsertExternalSyncState(ctx, {
provider: GHL_SYNC_PROVIDER,
entityType: args.entityType,
entityId: args.entityType,
cursor: args.cursor,
checksum: args.checksum,
status: args.status,
error: args.error,
metadata: args.metadata,
lastAttemptAt: Date.now(),
lastSyncedAt:
args.status === "synced" || args.status === "reconciled"
? Date.now()
: undefined,
})
}
function formatSyncStageSummary(state) {
const metadata = safeJsonParse(state?.metadata)
return {
status: state?.status || "pending",
lastAttemptAt: state?.lastAttemptAt || null,
lastSyncedAt: state?.lastSyncedAt || null,
error: state?.error || null,
cursor: state?.cursor || null,
metadata,
}
}
async function buildAdminSyncOverview(ctx) {
const stages = {
contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")),
conversations: formatSyncStageSummary(
await getSyncStateRecord(ctx, "conversations")
),
messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")),
recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")),
reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")),
}
const ghlConfigured = Boolean(
String(
process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
).trim() && String(process.env.GHL_LOCATION_ID || "").trim()
)
const syncTokenConfigured = Boolean(
String(process.env.GHL_SYNC_CRON_TOKEN || "").trim()
)
const livekitConfigured = Boolean(
String(process.env.LIVEKIT_URL || "").trim() &&
String(process.env.LIVEKIT_API_KEY || "").trim() &&
String(process.env.LIVEKIT_API_SECRET || "").trim()
)
const latestSyncAt = Math.max(
...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0),
0
)
const hasFailures = Object.values(stages).some(
(stage: any) =>
stage.status === "failed" || stage.status === "missing_config"
)
const hasRunning = Object.values(stages).some(
(stage: any) => stage.status === "running"
)
return {
ghlConfigured,
syncTokenConfigured,
livekitConfigured,
latestSyncAt: latestSyncAt || null,
overallStatus: hasRunning
? "running"
: hasFailures
? "attention"
: latestSyncAt
? "healthy"
: "idle",
stages,
}
}
function extractGhlMessages(payload: any) {
if (Array.isArray(payload?.items)) {
return payload.items
}
if (Array.isArray(payload?.messages?.messages)) {
return payload.messages.messages
}
if (Array.isArray(payload?.data?.messages?.messages)) {
return payload.data.messages.messages
}
return Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages)
? payload.data.messages
: Array.isArray(payload)
? payload
: []
}
function parseGhlTimestamp(value: unknown) {
if (typeof value === "number" && Number.isFinite(value)) {
return value
}
if (typeof value === "string" && value.trim()) {
const numeric = Number(value)
if (Number.isFinite(numeric) && numeric > 0) {
return numeric
}
const parsed = new Date(value).getTime()
if (Number.isFinite(parsed) && parsed > 0) {
return parsed
}
}
return undefined
}
function normalizeGhlChannel(value?: unknown) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (
normalized.includes("sms") ||
normalized.includes("whatsapp") ||
normalized.includes("message")
) {
return "sms"
}
if (
normalized.includes("call") ||
normalized.includes("phone") ||
normalized.includes("voicemail")
) {
return "call"
}
if (normalized.includes("chat")) {
return "chat"
}
return "unknown"
}
function deriveGhlConversationChannel(payload: any) {
return (
(payload.channel ? normalizeGhlChannel(payload.channel) : undefined) ||
(payload.messageType ? normalizeGhlChannel(payload.messageType) : undefined) ||
(payload.lastMessageType
? normalizeGhlChannel(payload.lastMessageType)
: undefined) ||
(payload.type ? normalizeGhlChannel(payload.type) : undefined) ||
"unknown"
)
}
function deriveConversationStatus(payload: any) {
if (payload.status) {
return normalizeConversationStatus(payload.status)
}
if (typeof payload.inbox === "boolean") {
return payload.inbox ? "open" : "closed"
}
return "open"
}
function matchesSearch(values: Array<string | undefined>, search: string) {
if (!search) {
return true
}
const haystack = values
.map((value) => String(value || "").toLowerCase())
.join("\n")
return haystack.includes(search)
}
function normalizeConversationStatus(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (!normalized) {
return "open"
}
if (
[
"closed",
"completed",
"complete",
"ended",
"resolved",
"done",
].includes(normalized)
) {
return "closed"
}
if (
[
"archived",
"spam",
"blocked",
"blacklisted",
"do_not_contact",
"dnd",
].includes(normalized)
) {
return "archived"
}
return "open"
}
function normalizeConversationDirection(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (normalized === "inbound") {
return "inbound"
}
if (normalized === "outbound") {
return "outbound"
}
return "mixed"
}
function normalizeRecordingStatus(value?: string) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (!normalized) {
return "pending"
}
if (["completed", "complete", "ready", "available"].includes(normalized)) {
return "completed"
}
if (["starting", "queued"].includes(normalized)) {
return "starting"
}
if (["recording", "in_progress", "processing"].includes(normalized)) {
return "recording"
}
if (["failed", "error"].includes(normalized)) {
return "failed"
}
return "pending"
}
function buildContactDisplay(contact?: {
firstName?: string
lastName?: string
email?: string
phone?: string
}) {
const firstName = String(contact?.firstName || "").trim()
const lastName = String(contact?.lastName || "").trim()
const fullName = [firstName, lastName].filter(Boolean).join(" ").trim()
const displayName =
fullName || contact?.phone || contact?.email || "Unknown Contact"
const secondaryLine =
fullName && contact?.phone
? contact.phone
: fullName && contact?.email
? contact.email
: fullName
? undefined
: contact?.email || contact?.phone || undefined
return {
displayName,
secondaryLine,
}
}
function buildConversationDisplayTitle(
conversation: { title?: string },
contact?: {
firstName?: string
lastName?: string
email?: string
phone?: string
} | null
) {
const title = String(conversation.title || "").trim()
if (title && title.toLowerCase() !== "unknown contact") {
return title
}
return buildContactDisplay(contact || undefined).displayName
}
async function buildContactTimeline(ctx, contactId) {
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const callArtifacts = await ctx.db
.query("callArtifacts")
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
.collect()
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
(lead) => lead.contactId === contactId
)
const timeline = [
...conversations.map((item) => ({
id: item._id,
type: "conversation",
timestamp: item.lastMessageAt || item.startedAt || item.updatedAt,
title: item.title || item.channel,
body: item.lastMessagePreview || item.summaryText || "",
status: item.status,
})),
...messages.map((item) => ({
id: item._id,
type: "message",
timestamp: item.sentAt,
title: `${item.channel.toUpperCase()} ${item.direction || "system"}`,
body: item.body,
status: item.status,
})),
...callArtifacts.map((item) => ({
id: item._id,
type: "recording",
timestamp: item.endedAt || item.startedAt || item.updatedAt,
title: item.recordingStatus || "recording",
body: item.recordingUrl || item.transcriptionText || "",
status: item.recordingStatus,
})),
...leads.map((item) => ({
id: item._id,
type: "lead",
timestamp: item.createdAt,
title: item.type,
body: item.message || item.intent || "",
status: item.status,
})),
]
timeline.sort((a, b) => b.timestamp - a.timestamp)
return timeline
}
export const upsertContact = mutation({
args: {
firstName: v.string(),
lastName: v.string(),
email: v.optional(v.string()),
phone: v.optional(v.string()),
company: v.optional(v.string()),
tags: v.optional(v.array(v.string())),
status: v.optional(
v.union(
v.literal("active"),
v.literal("lead"),
v.literal("customer"),
v.literal("inactive")
)
),
source: v.optional(v.string()),
notes: v.optional(v.string()),
ghlContactId: v.optional(v.string()),
livekitIdentity: v.optional(v.string()),
lastActivityAt: v.optional(v.number()),
},
handler: async (ctx, args) => {
return await upsertContactRecord(ctx, args)
},
})
export const upsertConversation = mutation({
args: {
contactId: v.optional(v.id("contacts")),
title: v.optional(v.string()),
channel: v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
),
source: v.optional(v.string()),
status: v.optional(
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
),
direction: v.optional(
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed"))
),
startedAt: v.optional(v.number()),
endedAt: v.optional(v.number()),
lastMessageAt: v.optional(v.number()),
lastMessagePreview: v.optional(v.string()),
unreadCount: v.optional(v.number()),
summaryText: v.optional(v.string()),
ghlConversationId: v.optional(v.string()),
livekitRoomName: v.optional(v.string()),
voiceSessionId: v.optional(v.id("voiceSessions")),
},
handler: async (ctx, args) => {
return await upsertConversationRecord(ctx, args)
},
})
export const upsertMessage = mutation({
args: {
conversationId: v.id("conversations"),
contactId: v.optional(v.id("contacts")),
direction: v.optional(
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system"))
),
channel: v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
),
source: v.optional(v.string()),
messageType: v.optional(v.string()),
body: v.string(),
status: v.optional(v.string()),
sentAt: v.optional(v.number()),
ghlMessageId: v.optional(v.string()),
voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")),
voiceSessionId: v.optional(v.id("voiceSessions")),
livekitRoomName: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertMessageRecord(ctx, args)
},
})
export const upsertCallArtifact = mutation({
args: {
conversationId: v.id("conversations"),
contactId: v.optional(v.id("contacts")),
source: v.optional(v.string()),
recordingId: v.optional(v.string()),
recordingUrl: v.optional(v.string()),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
transcriptionText: v.optional(v.string()),
durationMs: v.optional(v.number()),
startedAt: v.optional(v.number()),
endedAt: v.optional(v.number()),
ghlMessageId: v.optional(v.string()),
voiceSessionId: v.optional(v.id("voiceSessions")),
livekitRoomName: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertCallArtifactRecord(ctx, args)
},
})
export const updateContact = mutation({
args: {
contactId: v.id("contacts"),
firstName: v.optional(v.string()),
lastName: v.optional(v.string()),
email: v.optional(v.string()),
phone: v.optional(v.string()),
company: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("active"),
v.literal("lead"),
v.literal("customer"),
v.literal("inactive")
)
),
tags: v.optional(v.array(v.string())),
notes: v.optional(v.string()),
},
handler: async (ctx, args) => {
const existing = await ctx.db.get(args.contactId)
if (!existing) {
throw new Error("Contact not found")
}
await ctx.db.patch(args.contactId, {
firstName: args.firstName ?? existing.firstName,
lastName: args.lastName ?? existing.lastName,
email: args.email ?? existing.email,
normalizedEmail: normalizeEmail(args.email ?? existing.email),
phone: args.phone ?? existing.phone,
normalizedPhone: normalizePhone(args.phone ?? existing.phone),
company: args.company ?? existing.company,
status: args.status ?? existing.status,
tags: args.tags ?? existing.tags,
notes: args.notes ?? existing.notes,
updatedAt: Date.now(),
})
return await ctx.db.get(args.contactId)
},
})
export const linkLeadSubmission = mutation({
args: {
leadId: v.id("leadSubmissions"),
contactId: v.optional(v.id("contacts")),
conversationId: v.optional(v.id("conversations")),
},
handler: async (ctx, args) => {
await ctx.db.patch(args.leadId, {
contactId: args.contactId,
conversationId: args.conversationId,
updatedAt: Date.now(),
})
return await ctx.db.get(args.leadId)
},
})
export const importContact = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName || payload.first_name,
lastName: payload.lastName || payload.last_name,
fullName: payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.name,
email: payload.email,
phone: payload.phone,
company: payload.company || payload.companyName,
tags: Array.isArray(payload.tags) ? payload.tags : [],
status: "lead",
source: `${args.provider}:mirror`,
ghlContactId: payload.id || args.entityId,
lastActivityAt:
parseGhlTimestamp(payload.dateUpdated) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "contact",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
contactId: contact?._id,
}),
})
return contact
},
})
export const importConversation = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName,
lastName: payload.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
email: payload.email,
phone: payload.phone || payload.contactPhone,
source: `${args.provider}:mirror`,
ghlContactId: payload.contactId,
status: "lead",
lastActivityAt:
parseGhlTimestamp(payload.lastMessageDate) ??
parseGhlTimestamp(payload.dateUpdated) ??
Date.now(),
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title:
payload.fullName ||
payload.title ||
payload.contactName ||
payload.phone ||
payload.email,
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
status: deriveConversationStatus(payload),
direction: normalizeConversationDirection(
payload.lastMessageDirection || payload.direction
),
startedAt:
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
endedAt: parseGhlTimestamp(payload.dateEnded),
lastMessageAt:
parseGhlTimestamp(payload.lastMessageDate) ??
parseGhlTimestamp(payload.lastMessageAt) ??
parseGhlTimestamp(payload.dateUpdated),
lastMessagePreview: payload.lastMessageBody || payload.snippet,
unreadCount:
typeof payload.unreadCount === "number" ? payload.unreadCount : undefined,
summaryText: payload.summary,
ghlConversationId:
payload.conversationId || args.entityId || payload.id,
})
await ensureConversationParticipant(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
role: "contact",
displayName:
[contact?.firstName, contact?.lastName].filter(Boolean).join(" ") ||
payload.contactName,
phone: contact?.phone,
email: contact?.email,
externalContactId: payload.contactId,
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "conversation",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
contactId: contact?._id,
conversationId: conversation?._id,
}),
})
return conversation
},
})
export const importMessage = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const name = sanitizeContactNameParts({
firstName: payload.firstName,
lastName: payload.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
})
const contact = await upsertContactRecord(ctx, {
firstName: name.firstName,
lastName: name.lastName,
fullName: payload.contactName || payload.fullName || payload.name,
email: payload.email,
phone: payload.phone,
source: `${args.provider}:mirror`,
ghlContactId: payload.contactId,
status: "lead",
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title:
payload.contactName || payload.fullName || payload.name || payload.phone,
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
status: normalizeConversationStatus(payload.conversationStatus),
direction: normalizeConversationDirection(payload.direction),
startedAt:
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
lastMessageAt:
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
lastMessagePreview: payload.body || payload.message,
ghlConversationId: payload.conversationId,
})
await ensureConversationParticipant(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
role: "contact",
displayName: payload.contactName,
phone: contact?.phone,
email: contact?.email,
externalContactId: payload.contactId,
})
const message = await upsertMessageRecord(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
direction:
payload.direction === "inbound"
? "inbound"
: payload.direction === "outbound"
? "outbound"
: "system",
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
messageType: payload.messageType || payload.type,
body: payload.body || payload.message || payload.transcript || "",
status: payload.status,
sentAt:
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
ghlMessageId: payload.id || args.entityId,
metadata: JSON.stringify(payload),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "message",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
conversationId: conversation?._id,
messageId: message?._id,
}),
})
return message
},
})
export const importRecording = mutation({
args: {
provider: v.string(),
entityId: v.string(),
payload: v.any(),
},
handler: async (ctx, args) => {
const payload = args.payload || {}
const conversation = await upsertConversationRecord(ctx, {
channel: "call",
source: `${args.provider}:mirror`,
status: "closed",
direction: normalizeConversationDirection(payload.direction),
startedAt:
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
lastMessageAt:
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
lastMessagePreview: payload.summary || payload.transcript,
ghlConversationId: payload.conversationId,
livekitRoomName: payload.livekitRoomName,
})
const artifact = await upsertCallArtifactRecord(ctx, {
conversationId: conversation._id,
source: `${args.provider}:mirror`,
recordingId: payload.recordingId || payload.id || args.entityId,
recordingUrl: payload.recordingUrl,
recordingStatus: normalizeRecordingStatus(payload.recordingStatus),
transcriptionText: payload.transcript,
durationMs:
typeof payload.durationMs === "number"
? payload.durationMs
: typeof payload.duration === "number"
? payload.duration * 1000
: undefined,
startedAt:
parseGhlTimestamp(payload.createdAt),
endedAt: parseGhlTimestamp(payload.endedAt),
ghlMessageId: payload.messageId,
livekitRoomName: payload.livekitRoomName,
metadata: JSON.stringify(payload),
})
await upsertExternalSyncState(ctx, {
provider: args.provider,
entityType: "recording",
entityId: args.entityId,
status: "synced",
lastAttemptAt: Date.now(),
lastSyncedAt: Date.now(),
metadata: JSON.stringify({
conversationId: conversation?._id,
callArtifactId: artifact?._id,
}),
})
return artifact
},
})
export const updateSyncCheckpoint = mutation({
args: {
provider: v.string(),
entityType: v.string(),
entityId: v.string(),
cursor: v.optional(v.string()),
checksum: v.optional(v.string()),
status: v.optional(
v.union(
v.literal("running"),
v.literal("pending"),
v.literal("synced"),
v.literal("failed"),
v.literal("missing_config"),
v.literal("reconciled"),
v.literal("mismatch")
)
),
error: v.optional(v.string()),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
return await upsertExternalSyncState(ctx, {
...args,
lastAttemptAt: Date.now(),
lastSyncedAt: args.status === "synced" ? Date.now() : undefined,
})
},
})
export const reconcileExternalState = mutation({
args: {
provider: v.string(),
},
handler: async (ctx, args) => {
const states = await ctx.db
.query("externalSyncState")
.withIndex("by_provider_entityType", (q) => q.eq("provider", args.provider))
.collect()
const mismatches = []
for (const state of states) {
const missing =
state.entityType === "contact"
? !(await ctx.db
.query("contacts")
.withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", state.entityId))
.unique())
: state.entityType === "conversation"
? !(await ctx.db
.query("conversations")
.withIndex("by_ghlConversationId", (q) =>
q.eq("ghlConversationId", state.entityId)
)
.unique())
: state.entityType === "message"
? !(await ctx.db
.query("messages")
.withIndex("by_ghlMessageId", (q) => q.eq("ghlMessageId", state.entityId))
.unique())
: state.entityType === "recording"
? !(await ctx.db
.query("callArtifacts")
.withIndex("by_recordingId", (q) => q.eq("recordingId", state.entityId))
.unique())
: false
if (missing) {
await ctx.db.patch(state._id, {
status: "mismatch",
error: "Referenced mirrored record is missing locally.",
updatedAt: Date.now(),
})
mismatches.push(state.entityId)
} else {
await ctx.db.patch(state._id, {
status: "reconciled",
error: undefined,
lastSyncedAt: state.lastSyncedAt ?? Date.now(),
updatedAt: Date.now(),
})
}
}
return {
provider: args.provider,
checked: states.length,
mismatches,
}
},
})
export const listConversationHistoryHydrationCandidates = query({
args: {
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const conversations = await ctx.db.query("conversations").collect()
return conversations
.filter((conversation) => conversation.ghlConversationId)
.sort(
(a, b) =>
(b.lastMessageAt || b.updatedAt || 0) - (a.lastMessageAt || a.updatedAt || 0)
)
.map((conversation) => ({
id: conversation._id,
ghlConversationId: conversation.ghlConversationId,
channel: conversation.channel,
lastMessageAt: conversation.lastMessageAt || conversation.updatedAt || 0,
}))
.slice(0, limit)
},
})
export const runGhlMirror = action({
args: {
reason: v.optional(v.string()),
forceFullBackfill: v.optional(v.boolean()),
maxPagesPerRun: v.optional(v.number()),
contactsLimit: v.optional(v.number()),
messagesLimit: v.optional(v.number()),
recordingsPageSize: v.optional(v.number()),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
const now = Date.now()
const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25))
const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100))
const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100))
const recordingsPageSize = Math.min(
50,
Math.max(1, args.recordingsPageSize || 50)
)
if (!config) {
for (const stage of GHL_SYNC_STAGES) {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: stage,
entityId: stage,
status: "missing_config",
error: "GHL credentials are not configured.",
metadata: JSON.stringify({
reason: args.reason || "cron",
checkedAt: now,
}),
})
}
return {
ok: false,
status: "missing_config",
message: "GHL credentials are not configured.",
}
}
const summary = {
ok: true,
status: "synced",
reason: args.reason || "cron",
contacts: 0,
conversations: 0,
messages: 0,
recordings: 0,
hydrated: 0,
mismatches: [] as string[],
}
const hydrationTargets = new Map<string, string>()
const updateRunning = async (entityType: string, metadata?: Record<string, any>) => {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "running",
error: undefined,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
startedAt: now,
}),
})
}
const failStage = async (entityType: string, error: unknown, metadata?: Record<string, any>) => {
const message =
error instanceof Error ? error.message : `Failed to sync ${entityType}`
summary.ok = false
summary.status = "failed"
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType,
entityId: entityType,
status: "failed",
error: message,
metadata: JSON.stringify({
...(metadata || {}),
reason: args.reason || "cron",
failedAt: Date.now(),
}),
})
}
try {
await updateRunning("contacts")
const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let contactsCursor =
!args.forceFullBackfill &&
contactsState.stages.contacts.metadata?.nextCursor
? String(contactsState.stages.contacts.metadata.nextCursor)
: undefined
let contactsPages = 0
while (contactsPages < maxPagesPerRun) {
const fetched = await fetchGhlContactsPage(config, {
limit: contactsLimit,
cursor: contactsCursor,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importContact, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
summary.contacts += 1
}
contactsPages += 1
contactsCursor = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "contacts",
entityId: "contacts",
cursor: contactsCursor,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.contacts,
pages: contactsPages,
nextCursor: contactsCursor,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("contacts", error)
return summary
}
try {
await updateRunning("conversations")
const fetched = await fetchGhlConversationsPage(config, {
limit: 100,
})
for (const item of fetched.items) {
const entityId = String(item.id || "")
if (!entityId) {
continue
}
await ctx.runMutation(api.crm.importConversation, {
provider: GHL_SYNC_PROVIDER,
entityId,
payload: item,
})
hydrationTargets.set(entityId, item.lastMessageType || item.type || "")
summary.conversations += 1
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "conversations",
entityId: "conversations",
cursor: undefined,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.conversations,
total: fetched.total,
source: "search",
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("conversations", error)
return summary
}
try {
await updateRunning("messages")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const messageCursors = {
SMS:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS
? String(previous.stages.messages.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call
? String(previous.stages.messages.metadata.cursors.Call)
: undefined,
}
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: messageCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
if (item.conversationId) {
hydrationTargets.set(String(item.conversationId), item.channel || "")
}
summary.messages += 1
}
pages += 1
messageCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "messages",
entityId: "messages",
cursor: messageCursors.Call || messageCursors.SMS,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.messages,
cursors: messageCursors,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("messages", error)
return summary
}
try {
const fallbackCandidates = await ctx.runQuery(
api.crm.listConversationHistoryHydrationCandidates,
{ limit: 25 }
)
for (const candidate of fallbackCandidates) {
if (candidate.ghlConversationId) {
hydrationTargets.set(
String(candidate.ghlConversationId),
candidate.channel || ""
)
}
}
for (const [ghlConversationId, channelHint] of Array.from(
hydrationTargets.entries()
).slice(0, 25)) {
const fetched = await fetchGhlConversationMessages(config, {
conversationId: ghlConversationId,
})
const items = extractGhlMessages(fetched).filter(Boolean)
if (!items.length) {
continue
}
for (const item of items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
conversationId: item.conversationId || ghlConversationId,
channel: item.channel || channelHint,
},
})
summary.hydrated += 1
}
}
} catch (error) {
await failStage("messages", error, {
hydrated: summary.hydrated,
})
return summary
}
try {
await updateRunning("recordings")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
let nextPage =
!args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage
? Number(previous.stages.recordings.metadata.nextPage)
: 1
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlCallLogsPage(config, {
page: nextPage,
pageSize: recordingsPageSize,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importRecording, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
recordingId: item.messageId || item.id,
transcript: item.transcript,
recordingUrl: item.recordingUrl,
recordingStatus: item.transcript ? "completed" : "pending",
},
})
summary.recordings += 1
}
pages += 1
const exhausted = fetched.page * fetched.pageSize >= fetched.total
nextPage = exhausted ? 1 : fetched.page + 1
if (exhausted) {
break
}
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "recordings",
entityId: "recordings",
cursor: String(nextPage),
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.recordings,
nextPage,
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("recordings", error)
return summary
}
try {
await updateRunning("reconcile")
const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, {
provider: GHL_SYNC_PROVIDER,
})
summary.mismatches = reconcile.mismatches || []
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "reconcile",
entityId: "reconcile",
status: reconcile.mismatches?.length ? "mismatch" : "reconciled",
error: reconcile.mismatches?.length
? "Some mirrored records are missing locally."
: "",
metadata: JSON.stringify({
checked: reconcile.checked,
mismatches: reconcile.mismatches || [],
completedAt: Date.now(),
reason: args.reason || "cron",
}),
})
} catch (error) {
await failStage("reconcile", error)
return summary
}
return summary
},
})
export const repairMirroredContacts = action({
args: {
reason: v.optional(v.string()),
maxPages: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
if (!config) {
return {
ok: false,
repaired: 0,
message: "GHL credentials are not configured.",
}
}
const maxPages = Math.min(250, Math.max(1, args.maxPages || 50))
const limit = Math.min(100, Math.max(1, args.limit || 100))
let cursor: string | undefined
let pages = 0
let repaired = 0
while (pages < maxPages) {
const fetched = await fetchGhlContactsPage(config, {
limit,
cursor,
})
if (!fetched.items.length) {
break
}
for (const item of fetched.items) {
await ctx.runMutation(api.crm.importContact, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || ""),
payload: item,
})
repaired += 1
}
pages += 1
cursor = fetched.nextCursor
if (!cursor) {
break
}
}
return {
ok: true,
repaired,
pages,
cursor: cursor || null,
reason: args.reason || "manual-repair",
}
},
})
export const hydrateConversationHistory = action({
args: {
conversationId: v.string(),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
if (!config) {
return {
ok: false,
imported: 0,
message: "GHL credentials are not configured.",
}
}
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
conversationId: args.conversationId,
})
if (!detail?.conversation?.ghlConversationId) {
return {
ok: false,
imported: 0,
message: "This conversation is not linked to GHL.",
}
}
try {
const fetched = await fetchGhlConversationMessages(config, {
conversationId: detail.conversation.ghlConversationId,
})
const items = extractGhlMessages(fetched).filter(Boolean)
let imported = 0
for (const item of items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
conversationId:
item.conversationId || detail.conversation.ghlConversationId,
channel: item.channel || detail.conversation.channel,
},
})
imported += 1
}
return {
ok: true,
imported,
ghlConversationId: detail.conversation.ghlConversationId,
}
} catch (error) {
return {
ok: false,
imported: 0,
ghlConversationId: detail.conversation.ghlConversationId,
message: error instanceof Error ? error.message : "Failed to hydrate history.",
}
}
},
})
export const sendAdminConversationMessage = action({
args: {
conversationId: v.string(),
body: v.string(),
},
handler: async (ctx, args) => {
const messageBody = String(args.body || "").trim()
if (!messageBody) {
throw new Error("Message body is required.")
}
const config = readGhlMirrorConfig()
if (!config) {
throw new Error("GHL credentials are not configured.")
}
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
conversationId: args.conversationId,
})
if (!detail) {
throw new Error("Conversation not found.")
}
const response = await sendGhlConversationMessage(config, {
conversationId: detail.conversation.ghlConversationId || undefined,
contactId: detail.contact?.ghlContactId || undefined,
message: messageBody,
type: "SMS",
})
const responseMessage =
response?.message ||
response?.data?.message ||
response?.messages?.[0] ||
response?.data?.messages?.[0] ||
null
if (responseMessage) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(
responseMessage.id || responseMessage.messageId || Date.now()
),
payload: {
...responseMessage,
conversationId:
responseMessage.conversationId || detail.conversation.ghlConversationId,
contactId: responseMessage.contactId || detail.contact?.ghlContactId,
channel: responseMessage.channel || detail.conversation.channel || "SMS",
direction: responseMessage.direction || "outbound",
body: responseMessage.body || responseMessage.message || messageBody,
status: responseMessage.status || "sent",
},
})
} else {
await ctx.runMutation(api.crm.upsertMessage, {
conversationId: args.conversationId as any,
contactId: detail.contact?.id as any,
direction: "outbound",
channel:
detail.conversation.channel === "sms" ? "sms" : "unknown",
source: "ghl:send",
body: messageBody,
status: "sent",
sentAt: Date.now(),
metadata: JSON.stringify(response || {}),
})
}
return {
ok: true,
response,
}
},
})
export const listAdminContacts = query({
args: {
search: v.optional(v.string()),
page: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const page = Math.max(1, args.page ?? 1)
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const search = String(args.search || "").trim().toLowerCase()
const contacts = await ctx.db.query("contacts").collect()
const filtered = contacts.filter((contact) =>
matchesSearch(
[
`${contact.firstName} ${contact.lastName}`,
contact.email,
contact.phone,
contact.company,
...(contact.tags || []),
],
search
)
)
filtered.sort(
(a, b) =>
(b.lastActivityAt || b.updatedAt || 0) - (a.lastActivityAt || a.updatedAt || 0)
)
const paged = filtered.slice((page - 1) * limit, page * limit)
const items = await Promise.all(
paged.map(async (contact) => {
const display = buildContactDisplay(contact)
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
.collect()
const leads = await ctx.db
.query("leadSubmissions")
.collect()
const contactLeads = leads.filter((lead) => lead.contactId === contact._id)
return {
id: contact._id,
firstName: contact.firstName,
lastName: contact.lastName,
displayName: display.displayName,
secondaryLine: display.secondaryLine,
email: contact.email,
phone: contact.phone,
company: contact.company,
tags: contact.tags || [],
status: contact.status || "lead",
source: contact.source,
ghlContactId: contact.ghlContactId,
lastActivityAt: contact.lastActivityAt,
conversationCount: conversations.length,
leadCount: contactLeads.length,
updatedAt: contact.updatedAt,
}
})
)
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
total: filtered.length,
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
},
}
},
})
export const getContactTimeline = query({
args: {
contactId: v.id("contacts"),
},
handler: async (ctx, args) => {
return await buildContactTimeline(ctx, args.contactId)
},
})
export const getAdminContactDetail = query({
args: {
contactId: v.string(),
},
handler: async (ctx, args) => {
const contact = await ctx.db.get(args.contactId as any)
if (!contact) {
return null
}
const conversations = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
.collect()
conversations.sort(
(a, b) => (b.lastMessageAt || b.updatedAt) - (a.lastMessageAt || a.updatedAt)
)
const timeline = await buildContactTimeline(ctx, contact._id)
return {
contact: {
id: contact._id,
firstName: contact.firstName,
lastName: contact.lastName,
displayName: buildContactDisplay(contact).displayName,
secondaryLine: buildContactDisplay(contact).secondaryLine,
email: contact.email,
phone: contact.phone,
company: contact.company,
tags: contact.tags || [],
status: contact.status || "lead",
source: contact.source,
notes: contact.notes,
ghlContactId: contact.ghlContactId,
lastActivityAt: contact.lastActivityAt,
updatedAt: contact.updatedAt,
},
conversations: conversations.map((conversation) => ({
id: conversation._id,
channel: conversation.channel,
status: conversation.status || "open",
title: buildConversationDisplayTitle(conversation, contact),
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId),
})),
timeline,
sync: await buildAdminSyncOverview(ctx),
}
},
})
export const listAdminConversations = query({
args: {
search: v.optional(v.string()),
channel: v.optional(
v.union(
v.literal("call"),
v.literal("sms"),
v.literal("chat"),
v.literal("unknown")
)
),
status: v.optional(
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
),
page: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const page = Math.max(1, args.page ?? 1)
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const search = String(args.search || "").trim().toLowerCase()
const conversations = await ctx.db.query("conversations").collect()
const filtered = []
const staleShellCutoff = Date.now() - 30 * 24 * 60 * 60 * 1000
for (const conversation of conversations) {
if (args.channel && conversation.channel !== args.channel) {
continue
}
if (args.status && (conversation.status || "open") !== args.status) {
continue
}
const contact = conversation.contactId
? await ctx.db.get(conversation.contactId)
: null
if (
!matchesSearch(
[
conversation.title,
conversation.lastMessagePreview,
conversation.ghlConversationId,
contact
? `${contact.firstName} ${contact.lastName}`
: undefined,
contact?.email,
contact?.phone,
],
search
)
) {
continue
}
if (
conversation.source === "ghl:mirror" &&
conversation.channel === "call" &&
!String(conversation.lastMessagePreview || "").trim() &&
(conversation.lastMessageAt ||
conversation.startedAt ||
conversation.updatedAt ||
0) < staleShellCutoff
) {
continue
}
filtered.push({ conversation, contact })
}
filtered.sort(
(a, b) =>
(b.conversation.lastMessageAt ||
b.conversation.startedAt ||
b.conversation.updatedAt) -
(a.conversation.lastMessageAt ||
a.conversation.startedAt ||
a.conversation.updatedAt)
)
const paged = filtered.slice((page - 1) * limit, page * limit)
const items = await Promise.all(
paged.map(async ({ conversation, contact }) => {
const display = buildContactDisplay(contact || undefined)
const recordings = await ctx.db
.query("callArtifacts")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
return {
id: conversation._id,
title: buildConversationDisplayTitle(conversation, contact),
channel: conversation.channel,
status: conversation.status || "open",
direction: conversation.direction || "mixed",
source: conversation.source,
startedAt: conversation.startedAt,
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
displayName: display.displayName,
secondaryLine: display.secondaryLine,
contact: contact
? {
id: contact._id,
name: display.displayName,
email: contact.email,
phone: contact.phone,
secondaryLine: display.secondaryLine,
}
: null,
messageCount: messages.length,
recordingCount: recordings.length,
}
})
)
return {
items,
sync: await buildAdminSyncOverview(ctx),
pagination: {
page,
limit,
total: filtered.length,
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
},
}
},
})
export const getAdminConversationDetail = query({
args: {
conversationId: v.string(),
},
handler: async (ctx, args) => {
const conversation = await ctx.db.get(args.conversationId as any)
if (!conversation) {
return null
}
const contact = conversation.contactId
? await ctx.db.get(conversation.contactId)
: null
const participants = await ctx.db
.query("conversationParticipants")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const messages = await ctx.db
.query("messages")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
messages.sort((a, b) => a.sentAt - b.sentAt)
const recordings = await ctx.db
.query("callArtifacts")
.withIndex("by_conversationId", (q) =>
q.eq("conversationId", conversation._id)
)
.collect()
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
(lead) =>
lead.conversationId === conversation._id ||
(contact && lead.contactId === contact._id)
)
return {
conversation: {
id: conversation._id,
title: buildConversationDisplayTitle(conversation, contact),
channel: conversation.channel,
status: conversation.status || "open",
direction: conversation.direction || "mixed",
source: conversation.source,
startedAt: conversation.startedAt,
endedAt: conversation.endedAt,
lastMessageAt: conversation.lastMessageAt,
lastMessagePreview: conversation.lastMessagePreview,
summaryText: conversation.summaryText,
ghlConversationId: conversation.ghlConversationId,
livekitRoomName: conversation.livekitRoomName,
},
contact: contact
? {
id: contact._id,
name: buildContactDisplay(contact).displayName,
email: contact.email,
phone: contact.phone,
company: contact.company,
ghlContactId: contact.ghlContactId,
secondaryLine: buildContactDisplay(contact).secondaryLine,
}
: null,
participants: participants.map((participant) => ({
id: participant._id,
role: participant.role || "unknown",
displayName: participant.displayName,
email: participant.email,
phone: participant.phone,
})),
messages: messages.map((message) => ({
id: message._id,
direction: message.direction || "system",
channel: message.channel,
source: message.source,
body: message.body,
status: message.status,
sentAt: message.sentAt,
})),
recordings: recordings.map((recording) => ({
id: recording._id,
recordingId: recording.recordingId,
recordingUrl: recording.recordingUrl,
recordingStatus: recording.recordingStatus,
transcriptionText: recording.transcriptionText,
durationMs: recording.durationMs,
startedAt: recording.startedAt,
endedAt: recording.endedAt,
})),
leads: leads.map((lead) => ({
id: lead._id,
type: lead.type,
status: lead.status,
message: lead.message,
intent: lead.intent,
createdAt: lead.createdAt,
})),
sync: await buildAdminSyncOverview(ctx),
}
},
})
export const getAdminSyncOverview = query({
args: {},
handler: async (ctx) => {
return await buildAdminSyncOverview(ctx)
},
})