1976 lines
56 KiB
TypeScript
1976 lines
56 KiB
TypeScript
// @ts-nocheck
|
|
import { action, mutation, query } from "./_generated/server"
|
|
import { v } from "convex/values"
|
|
import { api } from "./_generated/api"
|
|
import {
|
|
ensureConversationParticipant,
|
|
normalizeEmail,
|
|
normalizePhone,
|
|
sanitizeContactNameParts,
|
|
upsertCallArtifactRecord,
|
|
upsertContactRecord,
|
|
upsertConversationRecord,
|
|
upsertExternalSyncState,
|
|
upsertMessageRecord,
|
|
} from "./crmModel"
|
|
import {
|
|
fetchGhlCallLogsPage,
|
|
fetchGhlConversationsPage,
|
|
fetchGhlConversationMessages,
|
|
fetchGhlContactsPage,
|
|
fetchGhlMessagesPage,
|
|
readGhlMirrorConfig,
|
|
sendGhlConversationMessage,
|
|
} from "./ghlMirror"
|
|
|
|
const GHL_SYNC_PROVIDER = "ghl"
|
|
const GHL_SYNC_STAGES = [
|
|
"contacts",
|
|
"conversations",
|
|
"messages",
|
|
"recordings",
|
|
"reconcile",
|
|
] as const
|
|
|
|
function safeJsonParse(value?: string) {
|
|
if (!value) {
|
|
return null
|
|
}
|
|
|
|
try {
|
|
return JSON.parse(value)
|
|
} catch {
|
|
return null
|
|
}
|
|
}
|
|
|
|
async function getSyncStateRecord(ctx, entityType) {
|
|
return await ctx.db
|
|
.query("externalSyncState")
|
|
.withIndex("by_provider_entityType_entityId", (q) =>
|
|
q
|
|
.eq("provider", GHL_SYNC_PROVIDER)
|
|
.eq("entityType", entityType)
|
|
.eq("entityId", entityType)
|
|
)
|
|
.unique()
|
|
}
|
|
|
|
async function markSyncStage(ctx, args) {
|
|
return await upsertExternalSyncState(ctx, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: args.entityType,
|
|
entityId: args.entityType,
|
|
cursor: args.cursor,
|
|
checksum: args.checksum,
|
|
status: args.status,
|
|
error: args.error,
|
|
metadata: args.metadata,
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt:
|
|
args.status === "synced" || args.status === "reconciled"
|
|
? Date.now()
|
|
: undefined,
|
|
})
|
|
}
|
|
|
|
function formatSyncStageSummary(state) {
|
|
const metadata = safeJsonParse(state?.metadata)
|
|
return {
|
|
status: state?.status || "pending",
|
|
lastAttemptAt: state?.lastAttemptAt || null,
|
|
lastSyncedAt: state?.lastSyncedAt || null,
|
|
error: state?.error || null,
|
|
cursor: state?.cursor || null,
|
|
metadata,
|
|
}
|
|
}
|
|
|
|
async function buildAdminSyncOverview(ctx) {
|
|
const stages = {
|
|
contacts: formatSyncStageSummary(await getSyncStateRecord(ctx, "contacts")),
|
|
conversations: formatSyncStageSummary(
|
|
await getSyncStateRecord(ctx, "conversations")
|
|
),
|
|
messages: formatSyncStageSummary(await getSyncStateRecord(ctx, "messages")),
|
|
recordings: formatSyncStageSummary(await getSyncStateRecord(ctx, "recordings")),
|
|
reconcile: formatSyncStageSummary(await getSyncStateRecord(ctx, "reconcile")),
|
|
}
|
|
|
|
const ghlConfigured = Boolean(
|
|
String(
|
|
process.env.GHL_PRIVATE_INTEGRATION_TOKEN || process.env.GHL_API_TOKEN || ""
|
|
).trim() && String(process.env.GHL_LOCATION_ID || "").trim()
|
|
)
|
|
const syncTokenConfigured = Boolean(
|
|
String(process.env.GHL_SYNC_CRON_TOKEN || "").trim()
|
|
)
|
|
const livekitConfigured = Boolean(
|
|
String(process.env.LIVEKIT_URL || "").trim() &&
|
|
String(process.env.LIVEKIT_API_KEY || "").trim() &&
|
|
String(process.env.LIVEKIT_API_SECRET || "").trim()
|
|
)
|
|
|
|
const latestSyncAt = Math.max(
|
|
...Object.values(stages).map((stage: any) => stage.lastSyncedAt || 0),
|
|
0
|
|
)
|
|
const hasFailures = Object.values(stages).some(
|
|
(stage: any) =>
|
|
stage.status === "failed" || stage.status === "missing_config"
|
|
)
|
|
const hasRunning = Object.values(stages).some(
|
|
(stage: any) => stage.status === "running"
|
|
)
|
|
|
|
return {
|
|
ghlConfigured,
|
|
syncTokenConfigured,
|
|
livekitConfigured,
|
|
latestSyncAt: latestSyncAt || null,
|
|
overallStatus: hasRunning
|
|
? "running"
|
|
: hasFailures
|
|
? "attention"
|
|
: latestSyncAt
|
|
? "healthy"
|
|
: "idle",
|
|
stages,
|
|
}
|
|
}
|
|
|
|
function extractGhlMessages(payload: any) {
|
|
if (Array.isArray(payload?.items)) {
|
|
return payload.items
|
|
}
|
|
if (Array.isArray(payload?.messages?.messages)) {
|
|
return payload.messages.messages
|
|
}
|
|
if (Array.isArray(payload?.data?.messages?.messages)) {
|
|
return payload.data.messages.messages
|
|
}
|
|
return Array.isArray(payload?.messages)
|
|
? payload.messages
|
|
: Array.isArray(payload?.data?.messages)
|
|
? payload.data.messages
|
|
: Array.isArray(payload)
|
|
? payload
|
|
: []
|
|
}
|
|
|
|
function parseGhlTimestamp(value: unknown) {
|
|
if (typeof value === "number" && Number.isFinite(value)) {
|
|
return value
|
|
}
|
|
|
|
if (typeof value === "string" && value.trim()) {
|
|
const numeric = Number(value)
|
|
if (Number.isFinite(numeric) && numeric > 0) {
|
|
return numeric
|
|
}
|
|
|
|
const parsed = new Date(value).getTime()
|
|
if (Number.isFinite(parsed) && parsed > 0) {
|
|
return parsed
|
|
}
|
|
}
|
|
|
|
return undefined
|
|
}
|
|
|
|
function normalizeGhlChannel(value?: unknown) {
|
|
const normalized = String(value || "")
|
|
.trim()
|
|
.toLowerCase()
|
|
|
|
if (
|
|
normalized.includes("sms") ||
|
|
normalized.includes("whatsapp") ||
|
|
normalized.includes("message")
|
|
) {
|
|
return "sms"
|
|
}
|
|
|
|
if (
|
|
normalized.includes("call") ||
|
|
normalized.includes("phone") ||
|
|
normalized.includes("voicemail")
|
|
) {
|
|
return "call"
|
|
}
|
|
|
|
if (normalized.includes("chat")) {
|
|
return "chat"
|
|
}
|
|
|
|
return "unknown"
|
|
}
|
|
|
|
function deriveGhlConversationChannel(payload: any) {
|
|
return (
|
|
(payload.channel ? normalizeGhlChannel(payload.channel) : undefined) ||
|
|
(payload.messageType ? normalizeGhlChannel(payload.messageType) : undefined) ||
|
|
(payload.lastMessageType
|
|
? normalizeGhlChannel(payload.lastMessageType)
|
|
: undefined) ||
|
|
(payload.type ? normalizeGhlChannel(payload.type) : undefined) ||
|
|
"unknown"
|
|
)
|
|
}
|
|
|
|
function deriveConversationStatus(payload: any) {
|
|
if (payload.status) {
|
|
return normalizeConversationStatus(payload.status)
|
|
}
|
|
|
|
if (typeof payload.inbox === "boolean") {
|
|
return payload.inbox ? "open" : "closed"
|
|
}
|
|
|
|
return "open"
|
|
}
|
|
|
|
function matchesSearch(values: Array<string | undefined>, search: string) {
|
|
if (!search) {
|
|
return true
|
|
}
|
|
|
|
const haystack = values
|
|
.map((value) => String(value || "").toLowerCase())
|
|
.join("\n")
|
|
|
|
return haystack.includes(search)
|
|
}
|
|
|
|
function normalizeConversationStatus(value?: string) {
|
|
const normalized = String(value || "")
|
|
.trim()
|
|
.toLowerCase()
|
|
|
|
if (!normalized) {
|
|
return "open"
|
|
}
|
|
|
|
if (
|
|
[
|
|
"closed",
|
|
"completed",
|
|
"complete",
|
|
"ended",
|
|
"resolved",
|
|
"done",
|
|
].includes(normalized)
|
|
) {
|
|
return "closed"
|
|
}
|
|
|
|
if (
|
|
[
|
|
"archived",
|
|
"spam",
|
|
"blocked",
|
|
"blacklisted",
|
|
"do_not_contact",
|
|
"dnd",
|
|
].includes(normalized)
|
|
) {
|
|
return "archived"
|
|
}
|
|
|
|
return "open"
|
|
}
|
|
|
|
function normalizeConversationDirection(value?: string) {
|
|
const normalized = String(value || "")
|
|
.trim()
|
|
.toLowerCase()
|
|
|
|
if (normalized === "inbound") {
|
|
return "inbound"
|
|
}
|
|
|
|
if (normalized === "outbound") {
|
|
return "outbound"
|
|
}
|
|
|
|
return "mixed"
|
|
}
|
|
|
|
function normalizeRecordingStatus(value?: string) {
|
|
const normalized = String(value || "")
|
|
.trim()
|
|
.toLowerCase()
|
|
|
|
if (!normalized) {
|
|
return "pending"
|
|
}
|
|
|
|
if (["completed", "complete", "ready", "available"].includes(normalized)) {
|
|
return "completed"
|
|
}
|
|
|
|
if (["starting", "queued"].includes(normalized)) {
|
|
return "starting"
|
|
}
|
|
|
|
if (["recording", "in_progress", "processing"].includes(normalized)) {
|
|
return "recording"
|
|
}
|
|
|
|
if (["failed", "error"].includes(normalized)) {
|
|
return "failed"
|
|
}
|
|
|
|
return "pending"
|
|
}
|
|
|
|
function buildContactDisplay(contact?: {
|
|
firstName?: string
|
|
lastName?: string
|
|
email?: string
|
|
phone?: string
|
|
}) {
|
|
const firstName = String(contact?.firstName || "").trim()
|
|
const lastName = String(contact?.lastName || "").trim()
|
|
const fullName = [firstName, lastName].filter(Boolean).join(" ").trim()
|
|
const displayName =
|
|
fullName || contact?.phone || contact?.email || "Unknown Contact"
|
|
const secondaryLine =
|
|
fullName && contact?.phone
|
|
? contact.phone
|
|
: fullName && contact?.email
|
|
? contact.email
|
|
: fullName
|
|
? undefined
|
|
: contact?.email || contact?.phone || undefined
|
|
|
|
return {
|
|
displayName,
|
|
secondaryLine,
|
|
}
|
|
}
|
|
|
|
function buildConversationDisplayTitle(
|
|
conversation: { title?: string },
|
|
contact?: {
|
|
firstName?: string
|
|
lastName?: string
|
|
email?: string
|
|
phone?: string
|
|
} | null
|
|
) {
|
|
const title = String(conversation.title || "").trim()
|
|
if (title && title.toLowerCase() !== "unknown contact") {
|
|
return title
|
|
}
|
|
|
|
return buildContactDisplay(contact || undefined).displayName
|
|
}
|
|
|
|
async function buildContactTimeline(ctx, contactId) {
|
|
const conversations = await ctx.db
|
|
.query("conversations")
|
|
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
|
|
.collect()
|
|
const messages = await ctx.db
|
|
.query("messages")
|
|
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
|
|
.collect()
|
|
const callArtifacts = await ctx.db
|
|
.query("callArtifacts")
|
|
.withIndex("by_contactId", (q) => q.eq("contactId", contactId))
|
|
.collect()
|
|
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
|
|
(lead) => lead.contactId === contactId
|
|
)
|
|
|
|
const timeline = [
|
|
...conversations.map((item) => ({
|
|
id: item._id,
|
|
type: "conversation",
|
|
timestamp: item.lastMessageAt || item.startedAt || item.updatedAt,
|
|
title: item.title || item.channel,
|
|
body: item.lastMessagePreview || item.summaryText || "",
|
|
status: item.status,
|
|
})),
|
|
...messages.map((item) => ({
|
|
id: item._id,
|
|
type: "message",
|
|
timestamp: item.sentAt,
|
|
title: `${item.channel.toUpperCase()} ${item.direction || "system"}`,
|
|
body: item.body,
|
|
status: item.status,
|
|
})),
|
|
...callArtifacts.map((item) => ({
|
|
id: item._id,
|
|
type: "recording",
|
|
timestamp: item.endedAt || item.startedAt || item.updatedAt,
|
|
title: item.recordingStatus || "recording",
|
|
body: item.recordingUrl || item.transcriptionText || "",
|
|
status: item.recordingStatus,
|
|
})),
|
|
...leads.map((item) => ({
|
|
id: item._id,
|
|
type: "lead",
|
|
timestamp: item.createdAt,
|
|
title: item.type,
|
|
body: item.message || item.intent || "",
|
|
status: item.status,
|
|
})),
|
|
]
|
|
|
|
timeline.sort((a, b) => b.timestamp - a.timestamp)
|
|
return timeline
|
|
}
|
|
|
|
export const upsertContact = mutation({
|
|
args: {
|
|
firstName: v.string(),
|
|
lastName: v.string(),
|
|
email: v.optional(v.string()),
|
|
phone: v.optional(v.string()),
|
|
company: v.optional(v.string()),
|
|
tags: v.optional(v.array(v.string())),
|
|
status: v.optional(
|
|
v.union(
|
|
v.literal("active"),
|
|
v.literal("lead"),
|
|
v.literal("customer"),
|
|
v.literal("inactive")
|
|
)
|
|
),
|
|
source: v.optional(v.string()),
|
|
notes: v.optional(v.string()),
|
|
ghlContactId: v.optional(v.string()),
|
|
livekitIdentity: v.optional(v.string()),
|
|
lastActivityAt: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await upsertContactRecord(ctx, args)
|
|
},
|
|
})
|
|
|
|
export const upsertConversation = mutation({
|
|
args: {
|
|
contactId: v.optional(v.id("contacts")),
|
|
title: v.optional(v.string()),
|
|
channel: v.union(
|
|
v.literal("call"),
|
|
v.literal("sms"),
|
|
v.literal("chat"),
|
|
v.literal("unknown")
|
|
),
|
|
source: v.optional(v.string()),
|
|
status: v.optional(
|
|
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
|
|
),
|
|
direction: v.optional(
|
|
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed"))
|
|
),
|
|
startedAt: v.optional(v.number()),
|
|
endedAt: v.optional(v.number()),
|
|
lastMessageAt: v.optional(v.number()),
|
|
lastMessagePreview: v.optional(v.string()),
|
|
unreadCount: v.optional(v.number()),
|
|
summaryText: v.optional(v.string()),
|
|
ghlConversationId: v.optional(v.string()),
|
|
livekitRoomName: v.optional(v.string()),
|
|
voiceSessionId: v.optional(v.id("voiceSessions")),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await upsertConversationRecord(ctx, args)
|
|
},
|
|
})
|
|
|
|
export const upsertMessage = mutation({
|
|
args: {
|
|
conversationId: v.id("conversations"),
|
|
contactId: v.optional(v.id("contacts")),
|
|
direction: v.optional(
|
|
v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system"))
|
|
),
|
|
channel: v.union(
|
|
v.literal("call"),
|
|
v.literal("sms"),
|
|
v.literal("chat"),
|
|
v.literal("unknown")
|
|
),
|
|
source: v.optional(v.string()),
|
|
messageType: v.optional(v.string()),
|
|
body: v.string(),
|
|
status: v.optional(v.string()),
|
|
sentAt: v.optional(v.number()),
|
|
ghlMessageId: v.optional(v.string()),
|
|
voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")),
|
|
voiceSessionId: v.optional(v.id("voiceSessions")),
|
|
livekitRoomName: v.optional(v.string()),
|
|
metadata: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await upsertMessageRecord(ctx, args)
|
|
},
|
|
})
|
|
|
|
export const upsertCallArtifact = mutation({
|
|
args: {
|
|
conversationId: v.id("conversations"),
|
|
contactId: v.optional(v.id("contacts")),
|
|
source: v.optional(v.string()),
|
|
recordingId: v.optional(v.string()),
|
|
recordingUrl: v.optional(v.string()),
|
|
recordingStatus: v.optional(
|
|
v.union(
|
|
v.literal("pending"),
|
|
v.literal("starting"),
|
|
v.literal("recording"),
|
|
v.literal("completed"),
|
|
v.literal("failed")
|
|
)
|
|
),
|
|
transcriptionText: v.optional(v.string()),
|
|
durationMs: v.optional(v.number()),
|
|
startedAt: v.optional(v.number()),
|
|
endedAt: v.optional(v.number()),
|
|
ghlMessageId: v.optional(v.string()),
|
|
voiceSessionId: v.optional(v.id("voiceSessions")),
|
|
livekitRoomName: v.optional(v.string()),
|
|
metadata: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await upsertCallArtifactRecord(ctx, args)
|
|
},
|
|
})
|
|
|
|
export const updateContact = mutation({
|
|
args: {
|
|
contactId: v.id("contacts"),
|
|
firstName: v.optional(v.string()),
|
|
lastName: v.optional(v.string()),
|
|
email: v.optional(v.string()),
|
|
phone: v.optional(v.string()),
|
|
company: v.optional(v.string()),
|
|
status: v.optional(
|
|
v.union(
|
|
v.literal("active"),
|
|
v.literal("lead"),
|
|
v.literal("customer"),
|
|
v.literal("inactive")
|
|
)
|
|
),
|
|
tags: v.optional(v.array(v.string())),
|
|
notes: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const existing = await ctx.db.get(args.contactId)
|
|
if (!existing) {
|
|
throw new Error("Contact not found")
|
|
}
|
|
|
|
await ctx.db.patch(args.contactId, {
|
|
firstName: args.firstName ?? existing.firstName,
|
|
lastName: args.lastName ?? existing.lastName,
|
|
email: args.email ?? existing.email,
|
|
normalizedEmail: normalizeEmail(args.email ?? existing.email),
|
|
phone: args.phone ?? existing.phone,
|
|
normalizedPhone: normalizePhone(args.phone ?? existing.phone),
|
|
company: args.company ?? existing.company,
|
|
status: args.status ?? existing.status,
|
|
tags: args.tags ?? existing.tags,
|
|
notes: args.notes ?? existing.notes,
|
|
updatedAt: Date.now(),
|
|
})
|
|
|
|
return await ctx.db.get(args.contactId)
|
|
},
|
|
})
|
|
|
|
export const linkLeadSubmission = mutation({
|
|
args: {
|
|
leadId: v.id("leadSubmissions"),
|
|
contactId: v.optional(v.id("contacts")),
|
|
conversationId: v.optional(v.id("conversations")),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
await ctx.db.patch(args.leadId, {
|
|
contactId: args.contactId,
|
|
conversationId: args.conversationId,
|
|
updatedAt: Date.now(),
|
|
})
|
|
return await ctx.db.get(args.leadId)
|
|
},
|
|
})
|
|
|
|
export const importContact = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
entityId: v.string(),
|
|
payload: v.any(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const payload = args.payload || {}
|
|
const name = sanitizeContactNameParts({
|
|
firstName: payload.firstName || payload.first_name,
|
|
lastName: payload.lastName || payload.last_name,
|
|
fullName: payload.name,
|
|
})
|
|
const contact = await upsertContactRecord(ctx, {
|
|
firstName: name.firstName,
|
|
lastName: name.lastName,
|
|
fullName: payload.name,
|
|
email: payload.email,
|
|
phone: payload.phone,
|
|
company: payload.company || payload.companyName,
|
|
tags: Array.isArray(payload.tags) ? payload.tags : [],
|
|
status: "lead",
|
|
source: `${args.provider}:mirror`,
|
|
ghlContactId: payload.id || args.entityId,
|
|
lastActivityAt:
|
|
parseGhlTimestamp(payload.dateUpdated) ??
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
Date.now(),
|
|
})
|
|
|
|
await upsertExternalSyncState(ctx, {
|
|
provider: args.provider,
|
|
entityType: "contact",
|
|
entityId: args.entityId,
|
|
status: "synced",
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt: Date.now(),
|
|
metadata: JSON.stringify({
|
|
contactId: contact?._id,
|
|
}),
|
|
})
|
|
|
|
return contact
|
|
},
|
|
})
|
|
|
|
export const importConversation = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
entityId: v.string(),
|
|
payload: v.any(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const payload = args.payload || {}
|
|
const name = sanitizeContactNameParts({
|
|
firstName: payload.firstName,
|
|
lastName: payload.lastName,
|
|
fullName: payload.contactName || payload.fullName || payload.name,
|
|
})
|
|
const contact = await upsertContactRecord(ctx, {
|
|
firstName: name.firstName,
|
|
lastName: name.lastName,
|
|
fullName: payload.contactName || payload.fullName || payload.name,
|
|
email: payload.email,
|
|
phone: payload.phone || payload.contactPhone,
|
|
source: `${args.provider}:mirror`,
|
|
ghlContactId: payload.contactId,
|
|
status: "lead",
|
|
lastActivityAt:
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
parseGhlTimestamp(payload.dateUpdated) ??
|
|
Date.now(),
|
|
})
|
|
|
|
const conversation = await upsertConversationRecord(ctx, {
|
|
contactId: contact?._id,
|
|
title:
|
|
payload.fullName ||
|
|
payload.title ||
|
|
payload.contactName ||
|
|
payload.phone ||
|
|
payload.email,
|
|
channel: deriveGhlConversationChannel(payload),
|
|
source: `${args.provider}:mirror`,
|
|
status: deriveConversationStatus(payload),
|
|
direction: normalizeConversationDirection(
|
|
payload.lastMessageDirection || payload.direction
|
|
),
|
|
startedAt:
|
|
parseGhlTimestamp(payload.dateAdded) ??
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
Date.now(),
|
|
endedAt: parseGhlTimestamp(payload.dateEnded),
|
|
lastMessageAt:
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
parseGhlTimestamp(payload.lastMessageAt) ??
|
|
parseGhlTimestamp(payload.dateUpdated),
|
|
lastMessagePreview: payload.lastMessageBody || payload.snippet,
|
|
unreadCount:
|
|
typeof payload.unreadCount === "number" ? payload.unreadCount : undefined,
|
|
summaryText: payload.summary,
|
|
ghlConversationId:
|
|
payload.conversationId || args.entityId || payload.id,
|
|
})
|
|
|
|
await ensureConversationParticipant(ctx, {
|
|
conversationId: conversation._id,
|
|
contactId: contact?._id,
|
|
role: "contact",
|
|
displayName:
|
|
[contact?.firstName, contact?.lastName].filter(Boolean).join(" ") ||
|
|
payload.contactName,
|
|
phone: contact?.phone,
|
|
email: contact?.email,
|
|
externalContactId: payload.contactId,
|
|
})
|
|
|
|
await upsertExternalSyncState(ctx, {
|
|
provider: args.provider,
|
|
entityType: "conversation",
|
|
entityId: args.entityId,
|
|
status: "synced",
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt: Date.now(),
|
|
metadata: JSON.stringify({
|
|
contactId: contact?._id,
|
|
conversationId: conversation?._id,
|
|
}),
|
|
})
|
|
|
|
return conversation
|
|
},
|
|
})
|
|
|
|
export const importMessage = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
entityId: v.string(),
|
|
payload: v.any(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const payload = args.payload || {}
|
|
const name = sanitizeContactNameParts({
|
|
firstName: payload.firstName,
|
|
lastName: payload.lastName,
|
|
fullName: payload.contactName || payload.fullName || payload.name,
|
|
})
|
|
const contact = await upsertContactRecord(ctx, {
|
|
firstName: name.firstName,
|
|
lastName: name.lastName,
|
|
fullName: payload.contactName || payload.fullName || payload.name,
|
|
email: payload.email,
|
|
phone: payload.phone,
|
|
source: `${args.provider}:mirror`,
|
|
ghlContactId: payload.contactId,
|
|
status: "lead",
|
|
})
|
|
|
|
const conversation = await upsertConversationRecord(ctx, {
|
|
contactId: contact?._id,
|
|
title:
|
|
payload.contactName || payload.fullName || payload.name || payload.phone,
|
|
channel: deriveGhlConversationChannel(payload),
|
|
source: `${args.provider}:mirror`,
|
|
status: normalizeConversationStatus(payload.conversationStatus),
|
|
direction: normalizeConversationDirection(payload.direction),
|
|
startedAt:
|
|
parseGhlTimestamp(payload.dateAdded) ??
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
Date.now(),
|
|
lastMessageAt:
|
|
parseGhlTimestamp(payload.dateAdded) ??
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
Date.now(),
|
|
lastMessagePreview: payload.body || payload.message,
|
|
ghlConversationId: payload.conversationId,
|
|
})
|
|
|
|
await ensureConversationParticipant(ctx, {
|
|
conversationId: conversation._id,
|
|
contactId: contact?._id,
|
|
role: "contact",
|
|
displayName: payload.contactName,
|
|
phone: contact?.phone,
|
|
email: contact?.email,
|
|
externalContactId: payload.contactId,
|
|
})
|
|
|
|
const message = await upsertMessageRecord(ctx, {
|
|
conversationId: conversation._id,
|
|
contactId: contact?._id,
|
|
direction:
|
|
payload.direction === "inbound"
|
|
? "inbound"
|
|
: payload.direction === "outbound"
|
|
? "outbound"
|
|
: "system",
|
|
channel: deriveGhlConversationChannel(payload),
|
|
source: `${args.provider}:mirror`,
|
|
messageType: payload.messageType || payload.type,
|
|
body: payload.body || payload.message || payload.transcript || "",
|
|
status: payload.status,
|
|
sentAt:
|
|
parseGhlTimestamp(payload.dateAdded) ??
|
|
parseGhlTimestamp(payload.lastMessageDate) ??
|
|
Date.now(),
|
|
ghlMessageId: payload.id || args.entityId,
|
|
metadata: JSON.stringify(payload),
|
|
})
|
|
|
|
await upsertExternalSyncState(ctx, {
|
|
provider: args.provider,
|
|
entityType: "message",
|
|
entityId: args.entityId,
|
|
status: "synced",
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt: Date.now(),
|
|
metadata: JSON.stringify({
|
|
conversationId: conversation?._id,
|
|
messageId: message?._id,
|
|
}),
|
|
})
|
|
|
|
return message
|
|
},
|
|
})
|
|
|
|
export const importRecording = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
entityId: v.string(),
|
|
payload: v.any(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const payload = args.payload || {}
|
|
const conversation = await upsertConversationRecord(ctx, {
|
|
channel: "call",
|
|
source: `${args.provider}:mirror`,
|
|
status: "closed",
|
|
direction: normalizeConversationDirection(payload.direction),
|
|
startedAt:
|
|
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
|
|
lastMessageAt:
|
|
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
|
|
lastMessagePreview: payload.summary || payload.transcript,
|
|
ghlConversationId: payload.conversationId,
|
|
livekitRoomName: payload.livekitRoomName,
|
|
})
|
|
|
|
const artifact = await upsertCallArtifactRecord(ctx, {
|
|
conversationId: conversation._id,
|
|
source: `${args.provider}:mirror`,
|
|
recordingId: payload.recordingId || payload.id || args.entityId,
|
|
recordingUrl: payload.recordingUrl,
|
|
recordingStatus: normalizeRecordingStatus(payload.recordingStatus),
|
|
transcriptionText: payload.transcript,
|
|
durationMs:
|
|
typeof payload.durationMs === "number"
|
|
? payload.durationMs
|
|
: typeof payload.duration === "number"
|
|
? payload.duration * 1000
|
|
: undefined,
|
|
startedAt:
|
|
parseGhlTimestamp(payload.createdAt),
|
|
endedAt: parseGhlTimestamp(payload.endedAt),
|
|
ghlMessageId: payload.messageId,
|
|
livekitRoomName: payload.livekitRoomName,
|
|
metadata: JSON.stringify(payload),
|
|
})
|
|
|
|
await upsertExternalSyncState(ctx, {
|
|
provider: args.provider,
|
|
entityType: "recording",
|
|
entityId: args.entityId,
|
|
status: "synced",
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt: Date.now(),
|
|
metadata: JSON.stringify({
|
|
conversationId: conversation?._id,
|
|
callArtifactId: artifact?._id,
|
|
}),
|
|
})
|
|
|
|
return artifact
|
|
},
|
|
})
|
|
|
|
export const updateSyncCheckpoint = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
entityType: v.string(),
|
|
entityId: v.string(),
|
|
cursor: v.optional(v.string()),
|
|
checksum: v.optional(v.string()),
|
|
status: v.optional(
|
|
v.union(
|
|
v.literal("running"),
|
|
v.literal("pending"),
|
|
v.literal("synced"),
|
|
v.literal("failed"),
|
|
v.literal("missing_config"),
|
|
v.literal("reconciled"),
|
|
v.literal("mismatch")
|
|
)
|
|
),
|
|
error: v.optional(v.string()),
|
|
metadata: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await upsertExternalSyncState(ctx, {
|
|
...args,
|
|
lastAttemptAt: Date.now(),
|
|
lastSyncedAt: args.status === "synced" ? Date.now() : undefined,
|
|
})
|
|
},
|
|
})
|
|
|
|
export const reconcileExternalState = mutation({
|
|
args: {
|
|
provider: v.string(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const states = await ctx.db
|
|
.query("externalSyncState")
|
|
.withIndex("by_provider_entityType", (q) => q.eq("provider", args.provider))
|
|
.collect()
|
|
|
|
const mismatches = []
|
|
for (const state of states) {
|
|
const missing =
|
|
state.entityType === "contact"
|
|
? !(await ctx.db
|
|
.query("contacts")
|
|
.withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", state.entityId))
|
|
.unique())
|
|
: state.entityType === "conversation"
|
|
? !(await ctx.db
|
|
.query("conversations")
|
|
.withIndex("by_ghlConversationId", (q) =>
|
|
q.eq("ghlConversationId", state.entityId)
|
|
)
|
|
.unique())
|
|
: state.entityType === "message"
|
|
? !(await ctx.db
|
|
.query("messages")
|
|
.withIndex("by_ghlMessageId", (q) => q.eq("ghlMessageId", state.entityId))
|
|
.unique())
|
|
: state.entityType === "recording"
|
|
? !(await ctx.db
|
|
.query("callArtifacts")
|
|
.withIndex("by_recordingId", (q) => q.eq("recordingId", state.entityId))
|
|
.unique())
|
|
: false
|
|
|
|
if (missing) {
|
|
await ctx.db.patch(state._id, {
|
|
status: "mismatch",
|
|
error: "Referenced mirrored record is missing locally.",
|
|
updatedAt: Date.now(),
|
|
})
|
|
mismatches.push(state.entityId)
|
|
} else {
|
|
await ctx.db.patch(state._id, {
|
|
status: "reconciled",
|
|
error: undefined,
|
|
lastSyncedAt: state.lastSyncedAt ?? Date.now(),
|
|
updatedAt: Date.now(),
|
|
})
|
|
}
|
|
}
|
|
|
|
return {
|
|
provider: args.provider,
|
|
checked: states.length,
|
|
mismatches,
|
|
}
|
|
},
|
|
})
|
|
|
|
export const listConversationHistoryHydrationCandidates = query({
|
|
args: {
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
|
|
const conversations = await ctx.db.query("conversations").collect()
|
|
return conversations
|
|
.filter((conversation) => conversation.ghlConversationId)
|
|
.sort(
|
|
(a, b) =>
|
|
(b.lastMessageAt || b.updatedAt || 0) - (a.lastMessageAt || a.updatedAt || 0)
|
|
)
|
|
.map((conversation) => ({
|
|
id: conversation._id,
|
|
ghlConversationId: conversation.ghlConversationId,
|
|
channel: conversation.channel,
|
|
lastMessageAt: conversation.lastMessageAt || conversation.updatedAt || 0,
|
|
}))
|
|
.slice(0, limit)
|
|
},
|
|
})
|
|
|
|
export const runGhlMirror = action({
|
|
args: {
|
|
reason: v.optional(v.string()),
|
|
forceFullBackfill: v.optional(v.boolean()),
|
|
maxPagesPerRun: v.optional(v.number()),
|
|
contactsLimit: v.optional(v.number()),
|
|
messagesLimit: v.optional(v.number()),
|
|
recordingsPageSize: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const config = readGhlMirrorConfig()
|
|
const now = Date.now()
|
|
const maxPagesPerRun = Math.min(250, Math.max(1, args.maxPagesPerRun || 25))
|
|
const contactsLimit = Math.min(100, Math.max(1, args.contactsLimit || 100))
|
|
const messagesLimit = Math.min(100, Math.max(1, args.messagesLimit || 100))
|
|
const recordingsPageSize = Math.min(
|
|
50,
|
|
Math.max(1, args.recordingsPageSize || 50)
|
|
)
|
|
|
|
if (!config) {
|
|
for (const stage of GHL_SYNC_STAGES) {
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: stage,
|
|
entityId: stage,
|
|
status: "missing_config",
|
|
error: "GHL credentials are not configured.",
|
|
metadata: JSON.stringify({
|
|
reason: args.reason || "cron",
|
|
checkedAt: now,
|
|
}),
|
|
})
|
|
}
|
|
|
|
return {
|
|
ok: false,
|
|
status: "missing_config",
|
|
message: "GHL credentials are not configured.",
|
|
}
|
|
}
|
|
|
|
const summary = {
|
|
ok: true,
|
|
status: "synced",
|
|
reason: args.reason || "cron",
|
|
contacts: 0,
|
|
conversations: 0,
|
|
messages: 0,
|
|
recordings: 0,
|
|
hydrated: 0,
|
|
mismatches: [] as string[],
|
|
}
|
|
const hydrationTargets = new Map<string, string>()
|
|
|
|
const updateRunning = async (entityType: string, metadata?: Record<string, any>) => {
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType,
|
|
entityId: entityType,
|
|
status: "running",
|
|
error: undefined,
|
|
metadata: JSON.stringify({
|
|
...(metadata || {}),
|
|
reason: args.reason || "cron",
|
|
startedAt: now,
|
|
}),
|
|
})
|
|
}
|
|
|
|
const failStage = async (entityType: string, error: unknown, metadata?: Record<string, any>) => {
|
|
const message =
|
|
error instanceof Error ? error.message : `Failed to sync ${entityType}`
|
|
summary.ok = false
|
|
summary.status = "failed"
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType,
|
|
entityId: entityType,
|
|
status: "failed",
|
|
error: message,
|
|
metadata: JSON.stringify({
|
|
...(metadata || {}),
|
|
reason: args.reason || "cron",
|
|
failedAt: Date.now(),
|
|
}),
|
|
})
|
|
}
|
|
|
|
try {
|
|
await updateRunning("contacts")
|
|
const contactsState = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
|
|
let contactsCursor =
|
|
!args.forceFullBackfill &&
|
|
contactsState.stages.contacts.metadata?.nextCursor
|
|
? String(contactsState.stages.contacts.metadata.nextCursor)
|
|
: undefined
|
|
let contactsPages = 0
|
|
|
|
while (contactsPages < maxPagesPerRun) {
|
|
const fetched = await fetchGhlContactsPage(config, {
|
|
limit: contactsLimit,
|
|
cursor: contactsCursor,
|
|
})
|
|
|
|
if (!fetched.items.length) {
|
|
break
|
|
}
|
|
|
|
for (const item of fetched.items) {
|
|
await ctx.runMutation(api.crm.importContact, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || ""),
|
|
payload: item,
|
|
})
|
|
summary.contacts += 1
|
|
}
|
|
|
|
contactsPages += 1
|
|
contactsCursor = fetched.nextCursor
|
|
if (!fetched.nextCursor) {
|
|
break
|
|
}
|
|
}
|
|
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: "contacts",
|
|
entityId: "contacts",
|
|
cursor: contactsCursor,
|
|
status: "synced",
|
|
error: "",
|
|
metadata: JSON.stringify({
|
|
imported: summary.contacts,
|
|
pages: contactsPages,
|
|
nextCursor: contactsCursor,
|
|
completedAt: Date.now(),
|
|
reason: args.reason || "cron",
|
|
}),
|
|
})
|
|
} catch (error) {
|
|
await failStage("contacts", error)
|
|
return summary
|
|
}
|
|
|
|
try {
|
|
await updateRunning("conversations")
|
|
const fetched = await fetchGhlConversationsPage(config, {
|
|
limit: 100,
|
|
})
|
|
|
|
for (const item of fetched.items) {
|
|
const entityId = String(item.id || "")
|
|
if (!entityId) {
|
|
continue
|
|
}
|
|
|
|
await ctx.runMutation(api.crm.importConversation, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId,
|
|
payload: item,
|
|
})
|
|
hydrationTargets.set(entityId, item.lastMessageType || item.type || "")
|
|
summary.conversations += 1
|
|
}
|
|
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: "conversations",
|
|
entityId: "conversations",
|
|
cursor: undefined,
|
|
status: "synced",
|
|
error: "",
|
|
metadata: JSON.stringify({
|
|
imported: summary.conversations,
|
|
total: fetched.total,
|
|
source: "search",
|
|
completedAt: Date.now(),
|
|
reason: args.reason || "cron",
|
|
}),
|
|
})
|
|
} catch (error) {
|
|
await failStage("conversations", error)
|
|
return summary
|
|
}
|
|
|
|
try {
|
|
await updateRunning("messages")
|
|
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
|
|
const messageCursors = {
|
|
SMS:
|
|
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.SMS
|
|
? String(previous.stages.messages.metadata.cursors.SMS)
|
|
: undefined,
|
|
Call:
|
|
!args.forceFullBackfill && previous.stages.messages.metadata?.cursors?.Call
|
|
? String(previous.stages.messages.metadata.cursors.Call)
|
|
: undefined,
|
|
}
|
|
|
|
for (const channel of ["SMS", "Call"] as const) {
|
|
let pages = 0
|
|
while (pages < maxPagesPerRun) {
|
|
const fetched = await fetchGhlMessagesPage(config, {
|
|
limit: messagesLimit,
|
|
cursor: messageCursors[channel],
|
|
channel,
|
|
})
|
|
|
|
if (!fetched.items.length) {
|
|
break
|
|
}
|
|
|
|
for (const item of fetched.items) {
|
|
await ctx.runMutation(api.crm.importMessage, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || ""),
|
|
payload: item,
|
|
})
|
|
if (item.conversationId) {
|
|
hydrationTargets.set(String(item.conversationId), item.channel || "")
|
|
}
|
|
summary.messages += 1
|
|
}
|
|
|
|
pages += 1
|
|
messageCursors[channel] = fetched.nextCursor
|
|
if (!fetched.nextCursor) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: "messages",
|
|
entityId: "messages",
|
|
cursor: messageCursors.Call || messageCursors.SMS,
|
|
status: "synced",
|
|
error: "",
|
|
metadata: JSON.stringify({
|
|
imported: summary.messages,
|
|
cursors: messageCursors,
|
|
completedAt: Date.now(),
|
|
reason: args.reason || "cron",
|
|
}),
|
|
})
|
|
} catch (error) {
|
|
await failStage("messages", error)
|
|
return summary
|
|
}
|
|
|
|
try {
|
|
const fallbackCandidates = await ctx.runQuery(
|
|
api.crm.listConversationHistoryHydrationCandidates,
|
|
{ limit: 25 }
|
|
)
|
|
for (const candidate of fallbackCandidates) {
|
|
if (candidate.ghlConversationId) {
|
|
hydrationTargets.set(
|
|
String(candidate.ghlConversationId),
|
|
candidate.channel || ""
|
|
)
|
|
}
|
|
}
|
|
|
|
for (const [ghlConversationId, channelHint] of Array.from(
|
|
hydrationTargets.entries()
|
|
).slice(0, 25)) {
|
|
const fetched = await fetchGhlConversationMessages(config, {
|
|
conversationId: ghlConversationId,
|
|
})
|
|
const items = extractGhlMessages(fetched).filter(Boolean)
|
|
if (!items.length) {
|
|
continue
|
|
}
|
|
|
|
for (const item of items) {
|
|
await ctx.runMutation(api.crm.importMessage, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || item.messageId || ""),
|
|
payload: {
|
|
...item,
|
|
conversationId: item.conversationId || ghlConversationId,
|
|
channel: item.channel || channelHint,
|
|
},
|
|
})
|
|
summary.hydrated += 1
|
|
}
|
|
}
|
|
} catch (error) {
|
|
await failStage("messages", error, {
|
|
hydrated: summary.hydrated,
|
|
})
|
|
return summary
|
|
}
|
|
|
|
try {
|
|
await updateRunning("recordings")
|
|
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
|
|
let nextPage =
|
|
!args.forceFullBackfill && previous.stages.recordings.metadata?.nextPage
|
|
? Number(previous.stages.recordings.metadata.nextPage)
|
|
: 1
|
|
let pages = 0
|
|
|
|
while (pages < maxPagesPerRun) {
|
|
const fetched = await fetchGhlCallLogsPage(config, {
|
|
page: nextPage,
|
|
pageSize: recordingsPageSize,
|
|
})
|
|
|
|
if (!fetched.items.length) {
|
|
break
|
|
}
|
|
|
|
for (const item of fetched.items) {
|
|
await ctx.runMutation(api.crm.importRecording, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || item.messageId || ""),
|
|
payload: {
|
|
...item,
|
|
recordingId: item.messageId || item.id,
|
|
transcript: item.transcript,
|
|
recordingUrl: item.recordingUrl,
|
|
recordingStatus: item.transcript ? "completed" : "pending",
|
|
},
|
|
})
|
|
summary.recordings += 1
|
|
}
|
|
|
|
pages += 1
|
|
const exhausted = fetched.page * fetched.pageSize >= fetched.total
|
|
nextPage = exhausted ? 1 : fetched.page + 1
|
|
if (exhausted) {
|
|
break
|
|
}
|
|
}
|
|
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: "recordings",
|
|
entityId: "recordings",
|
|
cursor: String(nextPage),
|
|
status: "synced",
|
|
error: "",
|
|
metadata: JSON.stringify({
|
|
imported: summary.recordings,
|
|
nextPage,
|
|
completedAt: Date.now(),
|
|
reason: args.reason || "cron",
|
|
}),
|
|
})
|
|
} catch (error) {
|
|
await failStage("recordings", error)
|
|
return summary
|
|
}
|
|
|
|
try {
|
|
await updateRunning("reconcile")
|
|
const reconcile = await ctx.runMutation(api.crm.reconcileExternalState, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
})
|
|
summary.mismatches = reconcile.mismatches || []
|
|
|
|
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityType: "reconcile",
|
|
entityId: "reconcile",
|
|
status: reconcile.mismatches?.length ? "mismatch" : "reconciled",
|
|
error: reconcile.mismatches?.length
|
|
? "Some mirrored records are missing locally."
|
|
: "",
|
|
metadata: JSON.stringify({
|
|
checked: reconcile.checked,
|
|
mismatches: reconcile.mismatches || [],
|
|
completedAt: Date.now(),
|
|
reason: args.reason || "cron",
|
|
}),
|
|
})
|
|
} catch (error) {
|
|
await failStage("reconcile", error)
|
|
return summary
|
|
}
|
|
|
|
return summary
|
|
},
|
|
})
|
|
|
|
export const repairMirroredContacts = action({
|
|
args: {
|
|
reason: v.optional(v.string()),
|
|
maxPages: v.optional(v.number()),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const config = readGhlMirrorConfig()
|
|
if (!config) {
|
|
return {
|
|
ok: false,
|
|
repaired: 0,
|
|
message: "GHL credentials are not configured.",
|
|
}
|
|
}
|
|
|
|
const maxPages = Math.min(250, Math.max(1, args.maxPages || 50))
|
|
const limit = Math.min(100, Math.max(1, args.limit || 100))
|
|
let cursor: string | undefined
|
|
let pages = 0
|
|
let repaired = 0
|
|
|
|
while (pages < maxPages) {
|
|
const fetched = await fetchGhlContactsPage(config, {
|
|
limit,
|
|
cursor,
|
|
})
|
|
|
|
if (!fetched.items.length) {
|
|
break
|
|
}
|
|
|
|
for (const item of fetched.items) {
|
|
await ctx.runMutation(api.crm.importContact, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || ""),
|
|
payload: item,
|
|
})
|
|
repaired += 1
|
|
}
|
|
|
|
pages += 1
|
|
cursor = fetched.nextCursor
|
|
if (!cursor) {
|
|
break
|
|
}
|
|
}
|
|
|
|
return {
|
|
ok: true,
|
|
repaired,
|
|
pages,
|
|
cursor: cursor || null,
|
|
reason: args.reason || "manual-repair",
|
|
}
|
|
},
|
|
})
|
|
|
|
export const hydrateConversationHistory = action({
|
|
args: {
|
|
conversationId: v.string(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const config = readGhlMirrorConfig()
|
|
if (!config) {
|
|
return {
|
|
ok: false,
|
|
imported: 0,
|
|
message: "GHL credentials are not configured.",
|
|
}
|
|
}
|
|
|
|
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
|
|
conversationId: args.conversationId,
|
|
})
|
|
|
|
if (!detail?.conversation?.ghlConversationId) {
|
|
return {
|
|
ok: false,
|
|
imported: 0,
|
|
message: "This conversation is not linked to GHL.",
|
|
}
|
|
}
|
|
|
|
try {
|
|
const fetched = await fetchGhlConversationMessages(config, {
|
|
conversationId: detail.conversation.ghlConversationId,
|
|
})
|
|
const items = extractGhlMessages(fetched).filter(Boolean)
|
|
|
|
let imported = 0
|
|
for (const item of items) {
|
|
await ctx.runMutation(api.crm.importMessage, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(item.id || item.messageId || ""),
|
|
payload: {
|
|
...item,
|
|
conversationId:
|
|
item.conversationId || detail.conversation.ghlConversationId,
|
|
channel: item.channel || detail.conversation.channel,
|
|
},
|
|
})
|
|
imported += 1
|
|
}
|
|
|
|
return {
|
|
ok: true,
|
|
imported,
|
|
ghlConversationId: detail.conversation.ghlConversationId,
|
|
}
|
|
} catch (error) {
|
|
return {
|
|
ok: false,
|
|
imported: 0,
|
|
ghlConversationId: detail.conversation.ghlConversationId,
|
|
message: error instanceof Error ? error.message : "Failed to hydrate history.",
|
|
}
|
|
}
|
|
},
|
|
})
|
|
|
|
export const sendAdminConversationMessage = action({
|
|
args: {
|
|
conversationId: v.string(),
|
|
body: v.string(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const messageBody = String(args.body || "").trim()
|
|
if (!messageBody) {
|
|
throw new Error("Message body is required.")
|
|
}
|
|
|
|
const config = readGhlMirrorConfig()
|
|
if (!config) {
|
|
throw new Error("GHL credentials are not configured.")
|
|
}
|
|
|
|
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
|
|
conversationId: args.conversationId,
|
|
})
|
|
|
|
if (!detail) {
|
|
throw new Error("Conversation not found.")
|
|
}
|
|
|
|
const response = await sendGhlConversationMessage(config, {
|
|
conversationId: detail.conversation.ghlConversationId || undefined,
|
|
contactId: detail.contact?.ghlContactId || undefined,
|
|
message: messageBody,
|
|
type: "SMS",
|
|
})
|
|
|
|
const responseMessage =
|
|
response?.message ||
|
|
response?.data?.message ||
|
|
response?.messages?.[0] ||
|
|
response?.data?.messages?.[0] ||
|
|
null
|
|
|
|
if (responseMessage) {
|
|
await ctx.runMutation(api.crm.importMessage, {
|
|
provider: GHL_SYNC_PROVIDER,
|
|
entityId: String(
|
|
responseMessage.id || responseMessage.messageId || Date.now()
|
|
),
|
|
payload: {
|
|
...responseMessage,
|
|
conversationId:
|
|
responseMessage.conversationId || detail.conversation.ghlConversationId,
|
|
contactId: responseMessage.contactId || detail.contact?.ghlContactId,
|
|
channel: responseMessage.channel || detail.conversation.channel || "SMS",
|
|
direction: responseMessage.direction || "outbound",
|
|
body: responseMessage.body || responseMessage.message || messageBody,
|
|
status: responseMessage.status || "sent",
|
|
},
|
|
})
|
|
} else {
|
|
await ctx.runMutation(api.crm.upsertMessage, {
|
|
conversationId: args.conversationId as any,
|
|
contactId: detail.contact?.id as any,
|
|
direction: "outbound",
|
|
channel:
|
|
detail.conversation.channel === "sms" ? "sms" : "unknown",
|
|
source: "ghl:send",
|
|
body: messageBody,
|
|
status: "sent",
|
|
sentAt: Date.now(),
|
|
metadata: JSON.stringify(response || {}),
|
|
})
|
|
}
|
|
|
|
return {
|
|
ok: true,
|
|
response,
|
|
}
|
|
},
|
|
})
|
|
|
|
export const listAdminContacts = query({
|
|
args: {
|
|
search: v.optional(v.string()),
|
|
page: v.optional(v.number()),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const page = Math.max(1, args.page ?? 1)
|
|
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
|
|
const search = String(args.search || "").trim().toLowerCase()
|
|
|
|
const contacts = await ctx.db.query("contacts").collect()
|
|
const filtered = contacts.filter((contact) =>
|
|
matchesSearch(
|
|
[
|
|
`${contact.firstName} ${contact.lastName}`,
|
|
contact.email,
|
|
contact.phone,
|
|
contact.company,
|
|
...(contact.tags || []),
|
|
],
|
|
search
|
|
)
|
|
)
|
|
|
|
filtered.sort(
|
|
(a, b) =>
|
|
(b.lastActivityAt || b.updatedAt || 0) - (a.lastActivityAt || a.updatedAt || 0)
|
|
)
|
|
|
|
const paged = filtered.slice((page - 1) * limit, page * limit)
|
|
const items = await Promise.all(
|
|
paged.map(async (contact) => {
|
|
const display = buildContactDisplay(contact)
|
|
const conversations = await ctx.db
|
|
.query("conversations")
|
|
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
|
|
.collect()
|
|
const leads = await ctx.db
|
|
.query("leadSubmissions")
|
|
.collect()
|
|
const contactLeads = leads.filter((lead) => lead.contactId === contact._id)
|
|
|
|
return {
|
|
id: contact._id,
|
|
firstName: contact.firstName,
|
|
lastName: contact.lastName,
|
|
displayName: display.displayName,
|
|
secondaryLine: display.secondaryLine,
|
|
email: contact.email,
|
|
phone: contact.phone,
|
|
company: contact.company,
|
|
tags: contact.tags || [],
|
|
status: contact.status || "lead",
|
|
source: contact.source,
|
|
ghlContactId: contact.ghlContactId,
|
|
lastActivityAt: contact.lastActivityAt,
|
|
conversationCount: conversations.length,
|
|
leadCount: contactLeads.length,
|
|
updatedAt: contact.updatedAt,
|
|
}
|
|
})
|
|
)
|
|
|
|
return {
|
|
items,
|
|
sync: await buildAdminSyncOverview(ctx),
|
|
pagination: {
|
|
page,
|
|
limit,
|
|
total: filtered.length,
|
|
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
|
|
},
|
|
}
|
|
},
|
|
})
|
|
|
|
export const getContactTimeline = query({
|
|
args: {
|
|
contactId: v.id("contacts"),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await buildContactTimeline(ctx, args.contactId)
|
|
},
|
|
})
|
|
|
|
export const getAdminContactDetail = query({
|
|
args: {
|
|
contactId: v.string(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const contact = await ctx.db.get(args.contactId as any)
|
|
if (!contact) {
|
|
return null
|
|
}
|
|
|
|
const conversations = await ctx.db
|
|
.query("conversations")
|
|
.withIndex("by_contactId", (q) => q.eq("contactId", contact._id))
|
|
.collect()
|
|
conversations.sort(
|
|
(a, b) => (b.lastMessageAt || b.updatedAt) - (a.lastMessageAt || a.updatedAt)
|
|
)
|
|
|
|
const timeline = await buildContactTimeline(ctx, contact._id)
|
|
|
|
return {
|
|
contact: {
|
|
id: contact._id,
|
|
firstName: contact.firstName,
|
|
lastName: contact.lastName,
|
|
displayName: buildContactDisplay(contact).displayName,
|
|
secondaryLine: buildContactDisplay(contact).secondaryLine,
|
|
email: contact.email,
|
|
phone: contact.phone,
|
|
company: contact.company,
|
|
tags: contact.tags || [],
|
|
status: contact.status || "lead",
|
|
source: contact.source,
|
|
notes: contact.notes,
|
|
ghlContactId: contact.ghlContactId,
|
|
lastActivityAt: contact.lastActivityAt,
|
|
updatedAt: contact.updatedAt,
|
|
},
|
|
conversations: conversations.map((conversation) => ({
|
|
id: conversation._id,
|
|
channel: conversation.channel,
|
|
status: conversation.status || "open",
|
|
title: buildConversationDisplayTitle(conversation, contact),
|
|
lastMessageAt: conversation.lastMessageAt,
|
|
lastMessagePreview: conversation.lastMessagePreview,
|
|
recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId),
|
|
})),
|
|
timeline,
|
|
sync: await buildAdminSyncOverview(ctx),
|
|
}
|
|
},
|
|
})
|
|
|
|
export const listAdminConversations = query({
|
|
args: {
|
|
search: v.optional(v.string()),
|
|
channel: v.optional(
|
|
v.union(
|
|
v.literal("call"),
|
|
v.literal("sms"),
|
|
v.literal("chat"),
|
|
v.literal("unknown")
|
|
)
|
|
),
|
|
status: v.optional(
|
|
v.union(v.literal("open"), v.literal("closed"), v.literal("archived"))
|
|
),
|
|
page: v.optional(v.number()),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const page = Math.max(1, args.page ?? 1)
|
|
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
|
|
const search = String(args.search || "").trim().toLowerCase()
|
|
|
|
const conversations = await ctx.db.query("conversations").collect()
|
|
const filtered = []
|
|
const staleShellCutoff = Date.now() - 30 * 24 * 60 * 60 * 1000
|
|
|
|
for (const conversation of conversations) {
|
|
if (args.channel && conversation.channel !== args.channel) {
|
|
continue
|
|
}
|
|
if (args.status && (conversation.status || "open") !== args.status) {
|
|
continue
|
|
}
|
|
|
|
const contact = conversation.contactId
|
|
? await ctx.db.get(conversation.contactId)
|
|
: null
|
|
if (
|
|
!matchesSearch(
|
|
[
|
|
conversation.title,
|
|
conversation.lastMessagePreview,
|
|
conversation.ghlConversationId,
|
|
contact
|
|
? `${contact.firstName} ${contact.lastName}`
|
|
: undefined,
|
|
contact?.email,
|
|
contact?.phone,
|
|
],
|
|
search
|
|
)
|
|
) {
|
|
continue
|
|
}
|
|
if (
|
|
conversation.source === "ghl:mirror" &&
|
|
conversation.channel === "call" &&
|
|
!String(conversation.lastMessagePreview || "").trim() &&
|
|
(conversation.lastMessageAt ||
|
|
conversation.startedAt ||
|
|
conversation.updatedAt ||
|
|
0) < staleShellCutoff
|
|
) {
|
|
continue
|
|
}
|
|
|
|
filtered.push({ conversation, contact })
|
|
}
|
|
|
|
filtered.sort(
|
|
(a, b) =>
|
|
(b.conversation.lastMessageAt ||
|
|
b.conversation.startedAt ||
|
|
b.conversation.updatedAt) -
|
|
(a.conversation.lastMessageAt ||
|
|
a.conversation.startedAt ||
|
|
a.conversation.updatedAt)
|
|
)
|
|
|
|
const paged = filtered.slice((page - 1) * limit, page * limit)
|
|
const items = await Promise.all(
|
|
paged.map(async ({ conversation, contact }) => {
|
|
const display = buildContactDisplay(contact || undefined)
|
|
const recordings = await ctx.db
|
|
.query("callArtifacts")
|
|
.withIndex("by_conversationId", (q) =>
|
|
q.eq("conversationId", conversation._id)
|
|
)
|
|
.collect()
|
|
const messages = await ctx.db
|
|
.query("messages")
|
|
.withIndex("by_conversationId", (q) =>
|
|
q.eq("conversationId", conversation._id)
|
|
)
|
|
.collect()
|
|
|
|
return {
|
|
id: conversation._id,
|
|
title: buildConversationDisplayTitle(conversation, contact),
|
|
channel: conversation.channel,
|
|
status: conversation.status || "open",
|
|
direction: conversation.direction || "mixed",
|
|
source: conversation.source,
|
|
startedAt: conversation.startedAt,
|
|
lastMessageAt: conversation.lastMessageAt,
|
|
lastMessagePreview: conversation.lastMessagePreview,
|
|
displayName: display.displayName,
|
|
secondaryLine: display.secondaryLine,
|
|
contact: contact
|
|
? {
|
|
id: contact._id,
|
|
name: display.displayName,
|
|
email: contact.email,
|
|
phone: contact.phone,
|
|
secondaryLine: display.secondaryLine,
|
|
}
|
|
: null,
|
|
messageCount: messages.length,
|
|
recordingCount: recordings.length,
|
|
}
|
|
})
|
|
)
|
|
|
|
return {
|
|
items,
|
|
sync: await buildAdminSyncOverview(ctx),
|
|
pagination: {
|
|
page,
|
|
limit,
|
|
total: filtered.length,
|
|
totalPages: Math.max(1, Math.ceil(filtered.length / limit)),
|
|
},
|
|
}
|
|
},
|
|
})
|
|
|
|
export const getAdminConversationDetail = query({
|
|
args: {
|
|
conversationId: v.string(),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const conversation = await ctx.db.get(args.conversationId as any)
|
|
if (!conversation) {
|
|
return null
|
|
}
|
|
|
|
const contact = conversation.contactId
|
|
? await ctx.db.get(conversation.contactId)
|
|
: null
|
|
const participants = await ctx.db
|
|
.query("conversationParticipants")
|
|
.withIndex("by_conversationId", (q) =>
|
|
q.eq("conversationId", conversation._id)
|
|
)
|
|
.collect()
|
|
const messages = await ctx.db
|
|
.query("messages")
|
|
.withIndex("by_conversationId", (q) =>
|
|
q.eq("conversationId", conversation._id)
|
|
)
|
|
.collect()
|
|
messages.sort((a, b) => a.sentAt - b.sentAt)
|
|
const recordings = await ctx.db
|
|
.query("callArtifacts")
|
|
.withIndex("by_conversationId", (q) =>
|
|
q.eq("conversationId", conversation._id)
|
|
)
|
|
.collect()
|
|
const leads = (await ctx.db.query("leadSubmissions").collect()).filter(
|
|
(lead) =>
|
|
lead.conversationId === conversation._id ||
|
|
(contact && lead.contactId === contact._id)
|
|
)
|
|
|
|
return {
|
|
conversation: {
|
|
id: conversation._id,
|
|
title: buildConversationDisplayTitle(conversation, contact),
|
|
channel: conversation.channel,
|
|
status: conversation.status || "open",
|
|
direction: conversation.direction || "mixed",
|
|
source: conversation.source,
|
|
startedAt: conversation.startedAt,
|
|
endedAt: conversation.endedAt,
|
|
lastMessageAt: conversation.lastMessageAt,
|
|
lastMessagePreview: conversation.lastMessagePreview,
|
|
summaryText: conversation.summaryText,
|
|
ghlConversationId: conversation.ghlConversationId,
|
|
livekitRoomName: conversation.livekitRoomName,
|
|
},
|
|
contact: contact
|
|
? {
|
|
id: contact._id,
|
|
name: buildContactDisplay(contact).displayName,
|
|
email: contact.email,
|
|
phone: contact.phone,
|
|
company: contact.company,
|
|
ghlContactId: contact.ghlContactId,
|
|
secondaryLine: buildContactDisplay(contact).secondaryLine,
|
|
}
|
|
: null,
|
|
participants: participants.map((participant) => ({
|
|
id: participant._id,
|
|
role: participant.role || "unknown",
|
|
displayName: participant.displayName,
|
|
email: participant.email,
|
|
phone: participant.phone,
|
|
})),
|
|
messages: messages.map((message) => ({
|
|
id: message._id,
|
|
direction: message.direction || "system",
|
|
channel: message.channel,
|
|
source: message.source,
|
|
body: message.body,
|
|
status: message.status,
|
|
sentAt: message.sentAt,
|
|
})),
|
|
recordings: recordings.map((recording) => ({
|
|
id: recording._id,
|
|
recordingId: recording.recordingId,
|
|
recordingUrl: recording.recordingUrl,
|
|
recordingStatus: recording.recordingStatus,
|
|
transcriptionText: recording.transcriptionText,
|
|
durationMs: recording.durationMs,
|
|
startedAt: recording.startedAt,
|
|
endedAt: recording.endedAt,
|
|
})),
|
|
leads: leads.map((lead) => ({
|
|
id: lead._id,
|
|
type: lead.type,
|
|
status: lead.status,
|
|
message: lead.message,
|
|
intent: lead.intent,
|
|
createdAt: lead.createdAt,
|
|
})),
|
|
sync: await buildAdminSyncOverview(ctx),
|
|
}
|
|
},
|
|
})
|
|
|
|
export const getAdminSyncOverview = query({
|
|
args: {},
|
|
handler: async (ctx) => {
|
|
return await buildAdminSyncOverview(ctx)
|
|
},
|
|
})
|