// @ts-nocheck import { mutation, query } from "./_generated/server" import { v } from "convex/values" import { ensureConversationParticipant, normalizeEmail, normalizePhone, upsertCallArtifactRecord, upsertContactRecord, upsertConversationRecord, upsertExternalSyncState, upsertMessageRecord, } from "./crmModel" function matchesSearch(values: Array, search: string) { if (!search) { return true } const haystack = values .map((value) => String(value || "").toLowerCase()) .join("\n") return haystack.includes(search) } async function buildContactTimeline(ctx, contactId) { const conversations = await ctx.db .query("conversations") .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) .collect() const messages = await ctx.db .query("messages") .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) .collect() const callArtifacts = await ctx.db .query("callArtifacts") .withIndex("by_contactId", (q) => q.eq("contactId", contactId)) .collect() const leads = (await ctx.db.query("leadSubmissions").collect()).filter( (lead) => lead.contactId === contactId ) const timeline = [ ...conversations.map((item) => ({ id: item._id, type: "conversation", timestamp: item.lastMessageAt || item.startedAt || item.updatedAt, title: item.title || item.channel, body: item.lastMessagePreview || item.summaryText || "", status: item.status, })), ...messages.map((item) => ({ id: item._id, type: "message", timestamp: item.sentAt, title: `${item.channel.toUpperCase()} ${item.direction || "system"}`, body: item.body, status: item.status, })), ...callArtifacts.map((item) => ({ id: item._id, type: "recording", timestamp: item.endedAt || item.startedAt || item.updatedAt, title: item.recordingStatus || "recording", body: item.recordingUrl || item.transcriptionText || "", status: item.recordingStatus, })), ...leads.map((item) => ({ id: item._id, type: "lead", timestamp: item.createdAt, title: item.type, body: item.message || item.intent || "", status: item.status, })), ] timeline.sort((a, b) => b.timestamp - a.timestamp) return timeline } export const upsertContact = mutation({ args: { firstName: v.string(), lastName: v.string(), email: v.optional(v.string()), phone: v.optional(v.string()), company: v.optional(v.string()), tags: v.optional(v.array(v.string())), status: v.optional( v.union( v.literal("active"), v.literal("lead"), v.literal("customer"), v.literal("inactive") ) ), source: v.optional(v.string()), notes: v.optional(v.string()), ghlContactId: v.optional(v.string()), livekitIdentity: v.optional(v.string()), lastActivityAt: v.optional(v.number()), }, handler: async (ctx, args) => { return await upsertContactRecord(ctx, args) }, }) export const upsertConversation = mutation({ args: { contactId: v.optional(v.id("contacts")), title: v.optional(v.string()), channel: v.union( v.literal("call"), v.literal("sms"), v.literal("chat"), v.literal("unknown") ), source: v.optional(v.string()), status: v.optional( v.union(v.literal("open"), v.literal("closed"), v.literal("archived")) ), direction: v.optional( v.union(v.literal("inbound"), v.literal("outbound"), v.literal("mixed")) ), startedAt: v.optional(v.number()), endedAt: v.optional(v.number()), lastMessageAt: v.optional(v.number()), lastMessagePreview: v.optional(v.string()), unreadCount: v.optional(v.number()), summaryText: v.optional(v.string()), ghlConversationId: v.optional(v.string()), livekitRoomName: v.optional(v.string()), voiceSessionId: v.optional(v.id("voiceSessions")), }, handler: async (ctx, args) => { return await upsertConversationRecord(ctx, args) }, }) export const upsertMessage = mutation({ args: { conversationId: v.id("conversations"), contactId: v.optional(v.id("contacts")), direction: v.optional( v.union(v.literal("inbound"), v.literal("outbound"), v.literal("system")) ), channel: v.union( v.literal("call"), v.literal("sms"), v.literal("chat"), v.literal("unknown") ), source: v.optional(v.string()), messageType: v.optional(v.string()), body: v.string(), status: v.optional(v.string()), sentAt: v.optional(v.number()), ghlMessageId: v.optional(v.string()), voiceTranscriptTurnId: v.optional(v.id("voiceTranscriptTurns")), voiceSessionId: v.optional(v.id("voiceSessions")), livekitRoomName: v.optional(v.string()), metadata: v.optional(v.string()), }, handler: async (ctx, args) => { return await upsertMessageRecord(ctx, args) }, }) export const upsertCallArtifact = mutation({ args: { conversationId: v.id("conversations"), contactId: v.optional(v.id("contacts")), source: v.optional(v.string()), recordingId: v.optional(v.string()), recordingUrl: v.optional(v.string()), recordingStatus: v.optional( v.union( v.literal("pending"), v.literal("starting"), v.literal("recording"), v.literal("completed"), v.literal("failed") ) ), transcriptionText: v.optional(v.string()), durationMs: v.optional(v.number()), startedAt: v.optional(v.number()), endedAt: v.optional(v.number()), ghlMessageId: v.optional(v.string()), voiceSessionId: v.optional(v.id("voiceSessions")), livekitRoomName: v.optional(v.string()), metadata: v.optional(v.string()), }, handler: async (ctx, args) => { return await upsertCallArtifactRecord(ctx, args) }, }) export const updateContact = mutation({ args: { contactId: v.id("contacts"), firstName: v.optional(v.string()), lastName: v.optional(v.string()), email: v.optional(v.string()), phone: v.optional(v.string()), company: v.optional(v.string()), status: v.optional( v.union( v.literal("active"), v.literal("lead"), v.literal("customer"), v.literal("inactive") ) ), tags: v.optional(v.array(v.string())), notes: v.optional(v.string()), }, handler: async (ctx, args) => { const existing = await ctx.db.get(args.contactId) if (!existing) { throw new Error("Contact not found") } await ctx.db.patch(args.contactId, { firstName: args.firstName ?? existing.firstName, lastName: args.lastName ?? existing.lastName, email: args.email ?? existing.email, normalizedEmail: normalizeEmail(args.email ?? existing.email), phone: args.phone ?? existing.phone, normalizedPhone: normalizePhone(args.phone ?? existing.phone), company: args.company ?? existing.company, status: args.status ?? existing.status, tags: args.tags ?? existing.tags, notes: args.notes ?? existing.notes, updatedAt: Date.now(), }) return await ctx.db.get(args.contactId) }, }) export const linkLeadSubmission = mutation({ args: { leadId: v.id("leadSubmissions"), contactId: v.optional(v.id("contacts")), conversationId: v.optional(v.id("conversations")), }, handler: async (ctx, args) => { await ctx.db.patch(args.leadId, { contactId: args.contactId, conversationId: args.conversationId, updatedAt: Date.now(), }) return await ctx.db.get(args.leadId) }, }) export const importContact = mutation({ args: { provider: v.string(), entityId: v.string(), payload: v.any(), }, handler: async (ctx, args) => { const payload = args.payload || {} const contact = await upsertContactRecord(ctx, { firstName: payload.firstName || payload.first_name || payload.name || "Unknown", lastName: payload.lastName || payload.last_name || "Contact", email: payload.email, phone: payload.phone, company: payload.company || payload.companyName, tags: Array.isArray(payload.tags) ? payload.tags : [], status: "lead", source: `${args.provider}:mirror`, ghlContactId: payload.id || args.entityId, lastActivityAt: typeof payload.dateUpdated === "string" ? new Date(payload.dateUpdated).getTime() : Date.now(), }) await upsertExternalSyncState(ctx, { provider: args.provider, entityType: "contact", entityId: args.entityId, status: "synced", lastAttemptAt: Date.now(), lastSyncedAt: Date.now(), metadata: JSON.stringify({ contactId: contact?._id, }), }) return contact }, }) export const importConversation = mutation({ args: { provider: v.string(), entityId: v.string(), payload: v.any(), }, handler: async (ctx, args) => { const payload = args.payload || {} const contact = await upsertContactRecord(ctx, { firstName: payload.firstName || payload.contactName || "Unknown", lastName: payload.lastName || "Contact", email: payload.email, phone: payload.phone || payload.contactPhone, source: `${args.provider}:mirror`, ghlContactId: payload.contactId, status: "lead", lastActivityAt: typeof payload.dateUpdated === "string" ? new Date(payload.dateUpdated).getTime() : Date.now(), }) const conversation = await upsertConversationRecord(ctx, { contactId: contact?._id, title: payload.fullName || payload.title || payload.contactName, channel: payload.channel === "SMS" || payload.type === "sms" ? "sms" : "call", source: `${args.provider}:mirror`, status: payload.status || "open", direction: payload.direction || "mixed", startedAt: typeof payload.dateAdded === "string" ? new Date(payload.dateAdded).getTime() : Date.now(), endedAt: typeof payload.dateEnded === "string" ? new Date(payload.dateEnded).getTime() : undefined, lastMessageAt: typeof payload.lastMessageAt === "string" ? new Date(payload.lastMessageAt).getTime() : undefined, lastMessagePreview: payload.lastMessageBody || payload.snippet, summaryText: payload.summary, ghlConversationId: payload.id || args.entityId, }) await ensureConversationParticipant(ctx, { conversationId: conversation._id, contactId: contact?._id, role: "contact", displayName: [contact?.firstName, contact?.lastName].filter(Boolean).join(" ") || payload.contactName, phone: contact?.phone, email: contact?.email, externalContactId: payload.contactId, }) await upsertExternalSyncState(ctx, { provider: args.provider, entityType: "conversation", entityId: args.entityId, status: "synced", lastAttemptAt: Date.now(), lastSyncedAt: Date.now(), metadata: JSON.stringify({ contactId: contact?._id, conversationId: conversation?._id, }), }) return conversation }, }) export const importMessage = mutation({ args: { provider: v.string(), entityId: v.string(), payload: v.any(), }, handler: async (ctx, args) => { const payload = args.payload || {} const contact = await upsertContactRecord(ctx, { firstName: payload.firstName || payload.contactName || "Unknown", lastName: payload.lastName || "Contact", email: payload.email, phone: payload.phone, source: `${args.provider}:mirror`, ghlContactId: payload.contactId, status: "lead", }) const conversation = await upsertConversationRecord(ctx, { contactId: contact?._id, title: payload.contactName, channel: payload.channel === "SMS" || payload.messageType === "SMS" ? "sms" : payload.channel === "Call" ? "call" : "unknown", source: `${args.provider}:mirror`, status: "open", direction: payload.direction || "mixed", startedAt: typeof payload.dateAdded === "string" ? new Date(payload.dateAdded).getTime() : Date.now(), lastMessageAt: typeof payload.dateAdded === "string" ? new Date(payload.dateAdded).getTime() : Date.now(), lastMessagePreview: payload.body || payload.message, ghlConversationId: payload.conversationId, }) await ensureConversationParticipant(ctx, { conversationId: conversation._id, contactId: contact?._id, role: "contact", displayName: payload.contactName, phone: contact?.phone, email: contact?.email, externalContactId: payload.contactId, }) const message = await upsertMessageRecord(ctx, { conversationId: conversation._id, contactId: contact?._id, direction: payload.direction === "inbound" ? "inbound" : payload.direction === "outbound" ? "outbound" : "system", channel: payload.channel === "SMS" || payload.messageType === "SMS" ? "sms" : payload.channel === "Call" ? "call" : "unknown", source: `${args.provider}:mirror`, messageType: payload.messageType || payload.type, body: payload.body || payload.message || payload.transcript || "", status: payload.status, sentAt: typeof payload.dateAdded === "string" ? new Date(payload.dateAdded).getTime() : Date.now(), ghlMessageId: payload.id || args.entityId, metadata: JSON.stringify(payload), }) await upsertExternalSyncState(ctx, { provider: args.provider, entityType: "message", entityId: args.entityId, status: "synced", lastAttemptAt: Date.now(), lastSyncedAt: Date.now(), metadata: JSON.stringify({ conversationId: conversation?._id, messageId: message?._id, }), }) return message }, }) export const importRecording = mutation({ args: { provider: v.string(), entityId: v.string(), payload: v.any(), }, handler: async (ctx, args) => { const payload = args.payload || {} const conversation = await upsertConversationRecord(ctx, { channel: "call", source: `${args.provider}:mirror`, status: "closed", direction: payload.direction || "mixed", startedAt: typeof payload.createdAt === "string" ? new Date(payload.createdAt).getTime() : Date.now(), lastMessageAt: typeof payload.createdAt === "string" ? new Date(payload.createdAt).getTime() : Date.now(), lastMessagePreview: payload.summary || payload.transcript, ghlConversationId: payload.conversationId, livekitRoomName: payload.livekitRoomName, }) const artifact = await upsertCallArtifactRecord(ctx, { conversationId: conversation._id, source: `${args.provider}:mirror`, recordingId: payload.recordingId || payload.id || args.entityId, recordingUrl: payload.recordingUrl, recordingStatus: payload.recordingStatus || "completed", transcriptionText: payload.transcript, durationMs: typeof payload.durationMs === "number" ? payload.durationMs : typeof payload.duration === "number" ? payload.duration * 1000 : undefined, startedAt: typeof payload.createdAt === "string" ? new Date(payload.createdAt).getTime() : undefined, endedAt: typeof payload.endedAt === "string" ? new Date(payload.endedAt).getTime() : undefined, ghlMessageId: payload.messageId, livekitRoomName: payload.livekitRoomName, metadata: JSON.stringify(payload), }) await upsertExternalSyncState(ctx, { provider: args.provider, entityType: "recording", entityId: args.entityId, status: "synced", lastAttemptAt: Date.now(), lastSyncedAt: Date.now(), metadata: JSON.stringify({ conversationId: conversation?._id, callArtifactId: artifact?._id, }), }) return artifact }, }) export const updateSyncCheckpoint = mutation({ args: { provider: v.string(), entityType: v.string(), entityId: v.string(), cursor: v.optional(v.string()), checksum: v.optional(v.string()), status: v.optional( v.union( v.literal("pending"), v.literal("synced"), v.literal("failed"), v.literal("reconciled"), v.literal("mismatch") ) ), error: v.optional(v.string()), metadata: v.optional(v.string()), }, handler: async (ctx, args) => { return await upsertExternalSyncState(ctx, { ...args, lastAttemptAt: Date.now(), lastSyncedAt: args.status === "synced" ? Date.now() : undefined, }) }, }) export const reconcileExternalState = mutation({ args: { provider: v.string(), }, handler: async (ctx, args) => { const states = await ctx.db .query("externalSyncState") .withIndex("by_provider_entityType", (q) => q.eq("provider", args.provider)) .collect() const mismatches = [] for (const state of states) { const missing = state.entityType === "contact" ? !(await ctx.db .query("contacts") .withIndex("by_ghlContactId", (q) => q.eq("ghlContactId", state.entityId)) .unique()) : state.entityType === "conversation" ? !(await ctx.db .query("conversations") .withIndex("by_ghlConversationId", (q) => q.eq("ghlConversationId", state.entityId) ) .unique()) : state.entityType === "message" ? !(await ctx.db .query("messages") .withIndex("by_ghlMessageId", (q) => q.eq("ghlMessageId", state.entityId)) .unique()) : state.entityType === "recording" ? !(await ctx.db .query("callArtifacts") .withIndex("by_recordingId", (q) => q.eq("recordingId", state.entityId)) .unique()) : false if (missing) { await ctx.db.patch(state._id, { status: "mismatch", error: "Referenced mirrored record is missing locally.", updatedAt: Date.now(), }) mismatches.push(state.entityId) } else { await ctx.db.patch(state._id, { status: "reconciled", error: undefined, lastSyncedAt: state.lastSyncedAt ?? Date.now(), updatedAt: Date.now(), }) } } return { provider: args.provider, checked: states.length, mismatches, } }, }) export const listAdminContacts = query({ args: { search: v.optional(v.string()), page: v.optional(v.number()), limit: v.optional(v.number()), }, handler: async (ctx, args) => { const page = Math.max(1, args.page ?? 1) const limit = Math.min(100, Math.max(1, args.limit ?? 25)) const search = String(args.search || "").trim().toLowerCase() const contacts = await ctx.db.query("contacts").collect() const filtered = contacts.filter((contact) => matchesSearch( [ `${contact.firstName} ${contact.lastName}`, contact.email, contact.phone, contact.company, ...(contact.tags || []), ], search ) ) filtered.sort( (a, b) => (b.lastActivityAt || b.updatedAt || 0) - (a.lastActivityAt || a.updatedAt || 0) ) const paged = filtered.slice((page - 1) * limit, page * limit) const items = await Promise.all( paged.map(async (contact) => { const conversations = await ctx.db .query("conversations") .withIndex("by_contactId", (q) => q.eq("contactId", contact._id)) .collect() const leads = await ctx.db .query("leadSubmissions") .collect() const contactLeads = leads.filter((lead) => lead.contactId === contact._id) return { id: contact._id, firstName: contact.firstName, lastName: contact.lastName, email: contact.email, phone: contact.phone, company: contact.company, tags: contact.tags || [], status: contact.status || "lead", source: contact.source, ghlContactId: contact.ghlContactId, lastActivityAt: contact.lastActivityAt, conversationCount: conversations.length, leadCount: contactLeads.length, updatedAt: contact.updatedAt, } }) ) return { items, pagination: { page, limit, total: filtered.length, totalPages: Math.max(1, Math.ceil(filtered.length / limit)), }, } }, }) export const getContactTimeline = query({ args: { contactId: v.id("contacts"), }, handler: async (ctx, args) => { return await buildContactTimeline(ctx, args.contactId) }, }) export const getAdminContactDetail = query({ args: { contactId: v.string(), }, handler: async (ctx, args) => { const contact = await ctx.db.get(args.contactId as any) if (!contact) { return null } const conversations = await ctx.db .query("conversations") .withIndex("by_contactId", (q) => q.eq("contactId", contact._id)) .collect() conversations.sort( (a, b) => (b.lastMessageAt || b.updatedAt) - (a.lastMessageAt || a.updatedAt) ) const timeline = await buildContactTimeline(ctx, contact._id) return { contact: { id: contact._id, firstName: contact.firstName, lastName: contact.lastName, email: contact.email, phone: contact.phone, company: contact.company, tags: contact.tags || [], status: contact.status || "lead", source: contact.source, notes: contact.notes, ghlContactId: contact.ghlContactId, lastActivityAt: contact.lastActivityAt, updatedAt: contact.updatedAt, }, conversations: conversations.map((conversation) => ({ id: conversation._id, channel: conversation.channel, status: conversation.status || "open", title: conversation.title, lastMessageAt: conversation.lastMessageAt, lastMessagePreview: conversation.lastMessagePreview, recordingReady: Boolean(conversation.livekitRoomName || conversation.voiceSessionId), })), timeline, } }, }) export const listAdminConversations = query({ args: { search: v.optional(v.string()), channel: v.optional( v.union( v.literal("call"), v.literal("sms"), v.literal("chat"), v.literal("unknown") ) ), status: v.optional( v.union(v.literal("open"), v.literal("closed"), v.literal("archived")) ), page: v.optional(v.number()), limit: v.optional(v.number()), }, handler: async (ctx, args) => { const page = Math.max(1, args.page ?? 1) const limit = Math.min(100, Math.max(1, args.limit ?? 25)) const search = String(args.search || "").trim().toLowerCase() const conversations = await ctx.db.query("conversations").collect() const filtered = [] for (const conversation of conversations) { if (args.channel && conversation.channel !== args.channel) { continue } if (args.status && (conversation.status || "open") !== args.status) { continue } const contact = conversation.contactId ? await ctx.db.get(conversation.contactId) : null if ( !matchesSearch( [ conversation.title, conversation.lastMessagePreview, conversation.ghlConversationId, contact ? `${contact.firstName} ${contact.lastName}` : undefined, contact?.email, contact?.phone, ], search ) ) { continue } filtered.push({ conversation, contact }) } filtered.sort( (a, b) => (b.conversation.lastMessageAt || b.conversation.updatedAt) - (a.conversation.lastMessageAt || a.conversation.updatedAt) ) const paged = filtered.slice((page - 1) * limit, page * limit) const items = await Promise.all( paged.map(async ({ conversation, contact }) => { const recordings = await ctx.db .query("callArtifacts") .withIndex("by_conversationId", (q) => q.eq("conversationId", conversation._id) ) .collect() const messages = await ctx.db .query("messages") .withIndex("by_conversationId", (q) => q.eq("conversationId", conversation._id) ) .collect() return { id: conversation._id, title: conversation.title || (contact ? `${contact.firstName} ${contact.lastName}`.trim() : "Unnamed conversation"), channel: conversation.channel, status: conversation.status || "open", direction: conversation.direction || "mixed", source: conversation.source, startedAt: conversation.startedAt, lastMessageAt: conversation.lastMessageAt, lastMessagePreview: conversation.lastMessagePreview, contact: contact ? { id: contact._id, name: `${contact.firstName} ${contact.lastName}`.trim(), email: contact.email, phone: contact.phone, } : null, messageCount: messages.length, recordingCount: recordings.length, } }) ) return { items, pagination: { page, limit, total: filtered.length, totalPages: Math.max(1, Math.ceil(filtered.length / limit)), }, } }, }) export const getAdminConversationDetail = query({ args: { conversationId: v.string(), }, handler: async (ctx, args) => { const conversation = await ctx.db.get(args.conversationId as any) if (!conversation) { return null } const contact = conversation.contactId ? await ctx.db.get(conversation.contactId) : null const participants = await ctx.db .query("conversationParticipants") .withIndex("by_conversationId", (q) => q.eq("conversationId", conversation._id) ) .collect() const messages = await ctx.db .query("messages") .withIndex("by_conversationId", (q) => q.eq("conversationId", conversation._id) ) .collect() messages.sort((a, b) => a.sentAt - b.sentAt) const recordings = await ctx.db .query("callArtifacts") .withIndex("by_conversationId", (q) => q.eq("conversationId", conversation._id) ) .collect() const leads = (await ctx.db.query("leadSubmissions").collect()).filter( (lead) => lead.conversationId === conversation._id || (contact && lead.contactId === contact._id) ) return { conversation: { id: conversation._id, title: conversation.title, channel: conversation.channel, status: conversation.status || "open", direction: conversation.direction || "mixed", source: conversation.source, startedAt: conversation.startedAt, endedAt: conversation.endedAt, lastMessageAt: conversation.lastMessageAt, lastMessagePreview: conversation.lastMessagePreview, summaryText: conversation.summaryText, ghlConversationId: conversation.ghlConversationId, livekitRoomName: conversation.livekitRoomName, }, contact: contact ? { id: contact._id, name: `${contact.firstName} ${contact.lastName}`.trim(), email: contact.email, phone: contact.phone, company: contact.company, } : null, participants: participants.map((participant) => ({ id: participant._id, role: participant.role || "unknown", displayName: participant.displayName, email: participant.email, phone: participant.phone, })), messages: messages.map((message) => ({ id: message._id, direction: message.direction || "system", channel: message.channel, source: message.source, body: message.body, status: message.status, sentAt: message.sentAt, })), recordings: recordings.map((recording) => ({ id: recording._id, recordingId: recording.recordingId, recordingUrl: recording.recordingUrl, recordingStatus: recording.recordingStatus, transcriptionText: recording.transcriptionText, durationMs: recording.durationMs, startedAt: recording.startedAt, endedAt: recording.endedAt, })), leads: leads.map((lead) => ({ id: lead._id, type: lead.type, status: lead.status, message: lead.message, intent: lead.intent, createdAt: lead.createdAt, })), } }, })