diff --git a/convex/crm.ts b/convex/crm.ts index 63f48e2c..5be70e8e 100644 --- a/convex/crm.ts +++ b/convex/crm.ts @@ -15,6 +15,7 @@ import { } from "./crmModel" import { fetchGhlCallLogsPage, + fetchGhlConversationsPage, fetchGhlConversationMessages, fetchGhlContactsPage, fetchGhlMessagesPage, @@ -142,6 +143,12 @@ function extractGhlMessages(payload: any) { if (Array.isArray(payload?.items)) { return payload.items } + if (Array.isArray(payload?.messages?.messages)) { + return payload.messages.messages + } + if (Array.isArray(payload?.data?.messages?.messages)) { + return payload.data.messages.messages + } return Array.isArray(payload?.messages) ? payload.messages : Array.isArray(payload?.data?.messages) @@ -151,6 +158,78 @@ function extractGhlMessages(payload: any) { : [] } +function parseGhlTimestamp(value: unknown) { + if (typeof value === "number" && Number.isFinite(value)) { + return value + } + + if (typeof value === "string" && value.trim()) { + const numeric = Number(value) + if (Number.isFinite(numeric) && numeric > 0) { + return numeric + } + + const parsed = new Date(value).getTime() + if (Number.isFinite(parsed) && parsed > 0) { + return parsed + } + } + + return undefined +} + +function normalizeGhlChannel(value?: unknown) { + const normalized = String(value || "") + .trim() + .toLowerCase() + + if ( + normalized.includes("sms") || + normalized.includes("whatsapp") || + normalized.includes("message") + ) { + return "sms" + } + + if ( + normalized.includes("call") || + normalized.includes("phone") || + normalized.includes("voicemail") + ) { + return "call" + } + + if (normalized.includes("chat")) { + return "chat" + } + + return "unknown" +} + +function deriveGhlConversationChannel(payload: any) { + return ( + (payload.channel ? normalizeGhlChannel(payload.channel) : undefined) || + (payload.messageType ? normalizeGhlChannel(payload.messageType) : undefined) || + (payload.lastMessageType + ? normalizeGhlChannel(payload.lastMessageType) + : undefined) || + (payload.type ? normalizeGhlChannel(payload.type) : undefined) || + "unknown" + ) +} + +function deriveConversationStatus(payload: any) { + if (payload.status) { + return normalizeConversationStatus(payload.status) + } + + if (typeof payload.inbox === "boolean") { + return payload.inbox ? "open" : "closed" + } + + return "open" +} + function matchesSearch(values: Array, search: string) { if (!search) { return true @@ -546,9 +625,9 @@ export const importContact = mutation({ source: `${args.provider}:mirror`, ghlContactId: payload.id || args.entityId, lastActivityAt: - typeof payload.dateUpdated === "string" - ? new Date(payload.dateUpdated).getTime() - : Date.now(), + parseGhlTimestamp(payload.dateUpdated) ?? + parseGhlTimestamp(payload.lastMessageDate) ?? + Date.now(), }) await upsertExternalSyncState(ctx, { @@ -590,34 +669,40 @@ export const importConversation = mutation({ ghlContactId: payload.contactId, status: "lead", lastActivityAt: - typeof payload.dateUpdated === "string" - ? new Date(payload.dateUpdated).getTime() - : Date.now(), + parseGhlTimestamp(payload.lastMessageDate) ?? + parseGhlTimestamp(payload.dateUpdated) ?? + Date.now(), }) const conversation = await upsertConversationRecord(ctx, { contactId: contact?._id, - title: payload.fullName || payload.title || payload.contactName, - channel: - payload.channel === "SMS" || payload.type === "sms" ? "sms" : "call", + title: + payload.fullName || + payload.title || + payload.contactName || + payload.phone || + payload.email, + channel: deriveGhlConversationChannel(payload), source: `${args.provider}:mirror`, - status: normalizeConversationStatus(payload.status), - direction: normalizeConversationDirection(payload.direction), + status: deriveConversationStatus(payload), + direction: normalizeConversationDirection( + payload.lastMessageDirection || payload.direction + ), startedAt: - typeof payload.dateAdded === "string" - ? new Date(payload.dateAdded).getTime() - : Date.now(), - endedAt: - typeof payload.dateEnded === "string" - ? new Date(payload.dateEnded).getTime() - : undefined, + parseGhlTimestamp(payload.dateAdded) ?? + parseGhlTimestamp(payload.lastMessageDate) ?? + Date.now(), + endedAt: parseGhlTimestamp(payload.dateEnded), lastMessageAt: - typeof payload.lastMessageAt === "string" - ? new Date(payload.lastMessageAt).getTime() - : undefined, + parseGhlTimestamp(payload.lastMessageDate) ?? + parseGhlTimestamp(payload.lastMessageAt) ?? + parseGhlTimestamp(payload.dateUpdated), lastMessagePreview: payload.lastMessageBody || payload.snippet, + unreadCount: + typeof payload.unreadCount === "number" ? payload.unreadCount : undefined, summaryText: payload.summary, - ghlConversationId: payload.id || args.entityId, + ghlConversationId: + payload.conversationId || args.entityId || payload.id, }) await ensureConversationParticipant(ctx, { @@ -675,24 +760,20 @@ export const importMessage = mutation({ const conversation = await upsertConversationRecord(ctx, { contactId: contact?._id, - title: payload.contactName, - channel: - payload.channel === "SMS" || payload.messageType === "SMS" - ? "sms" - : payload.channel === "Call" - ? "call" - : "unknown", + title: + payload.contactName || payload.fullName || payload.name || payload.phone, + channel: deriveGhlConversationChannel(payload), source: `${args.provider}:mirror`, status: normalizeConversationStatus(payload.conversationStatus), direction: normalizeConversationDirection(payload.direction), startedAt: - typeof payload.dateAdded === "string" - ? new Date(payload.dateAdded).getTime() - : Date.now(), + parseGhlTimestamp(payload.dateAdded) ?? + parseGhlTimestamp(payload.lastMessageDate) ?? + Date.now(), lastMessageAt: - typeof payload.dateAdded === "string" - ? new Date(payload.dateAdded).getTime() - : Date.now(), + parseGhlTimestamp(payload.dateAdded) ?? + parseGhlTimestamp(payload.lastMessageDate) ?? + Date.now(), lastMessagePreview: payload.body || payload.message, ghlConversationId: payload.conversationId, }) @@ -716,20 +797,15 @@ export const importMessage = mutation({ : payload.direction === "outbound" ? "outbound" : "system", - channel: - payload.channel === "SMS" || payload.messageType === "SMS" - ? "sms" - : payload.channel === "Call" - ? "call" - : "unknown", + channel: deriveGhlConversationChannel(payload), source: `${args.provider}:mirror`, messageType: payload.messageType || payload.type, body: payload.body || payload.message || payload.transcript || "", status: payload.status, sentAt: - typeof payload.dateAdded === "string" - ? new Date(payload.dateAdded).getTime() - : Date.now(), + parseGhlTimestamp(payload.dateAdded) ?? + parseGhlTimestamp(payload.lastMessageDate) ?? + Date.now(), ghlMessageId: payload.id || args.entityId, metadata: JSON.stringify(payload), }) @@ -765,13 +841,9 @@ export const importRecording = mutation({ status: "closed", direction: normalizeConversationDirection(payload.direction), startedAt: - typeof payload.createdAt === "string" - ? new Date(payload.createdAt).getTime() - : Date.now(), + parseGhlTimestamp(payload.createdAt) ?? Date.now(), lastMessageAt: - typeof payload.createdAt === "string" - ? new Date(payload.createdAt).getTime() - : Date.now(), + parseGhlTimestamp(payload.createdAt) ?? Date.now(), lastMessagePreview: payload.summary || payload.transcript, ghlConversationId: payload.conversationId, livekitRoomName: payload.livekitRoomName, @@ -791,13 +863,8 @@ export const importRecording = mutation({ ? payload.duration * 1000 : undefined, startedAt: - typeof payload.createdAt === "string" - ? new Date(payload.createdAt).getTime() - : undefined, - endedAt: - typeof payload.endedAt === "string" - ? new Date(payload.endedAt).getTime() - : undefined, + parseGhlTimestamp(payload.createdAt), + endedAt: parseGhlTimestamp(payload.endedAt), ghlMessageId: payload.messageId, livekitRoomName: payload.livekitRoomName, metadata: JSON.stringify(payload), @@ -1082,70 +1149,36 @@ export const runGhlMirror = action({ try { await updateRunning("conversations") - const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) - const conversationCursors = { - SMS: - !args.forceFullBackfill && - previous.stages.conversations.metadata?.cursors?.SMS - ? String(previous.stages.conversations.metadata.cursors.SMS) - : undefined, - Call: - !args.forceFullBackfill && - previous.stages.conversations.metadata?.cursors?.Call - ? String(previous.stages.conversations.metadata.cursors.Call) - : undefined, - } + const fetched = await fetchGhlConversationsPage(config, { + limit: 100, + }) - for (const channel of ["SMS", "Call"] as const) { - let pages = 0 - while (pages < maxPagesPerRun) { - const fetched = await fetchGhlMessagesPage(config, { - limit: messagesLimit, - cursor: conversationCursors[channel], - channel, - }) - - if (!fetched.items.length) { - break - } - - const grouped = new Map() - for (const item of fetched.items) { - const conversationId = String(item.conversationId || item.id || "") - if (!conversationId || grouped.has(conversationId)) { - continue - } - grouped.set(conversationId, item) - } - - for (const [entityId, item] of grouped.entries()) { - await ctx.runMutation(api.crm.importConversation, { - provider: GHL_SYNC_PROVIDER, - entityId, - payload: item, - }) - hydrationTargets.set(entityId, item.channel || "") - summary.conversations += 1 - } - - pages += 1 - conversationCursors[channel] = fetched.nextCursor - if (!fetched.nextCursor) { - break - } + for (const item of fetched.items) { + const entityId = String(item.id || "") + if (!entityId) { + continue } + + await ctx.runMutation(api.crm.importConversation, { + provider: GHL_SYNC_PROVIDER, + entityId, + payload: item, + }) + hydrationTargets.set(entityId, item.lastMessageType || item.type || "") + summary.conversations += 1 } await ctx.runMutation(api.crm.updateSyncCheckpoint, { provider: GHL_SYNC_PROVIDER, entityType: "conversations", entityId: "conversations", - cursor: conversationCursors.Call || conversationCursors.SMS, + cursor: undefined, status: "synced", error: "", metadata: JSON.stringify({ imported: summary.conversations, - cursors: conversationCursors, + total: fetched.total, + source: "search", completedAt: Date.now(), reason: args.reason || "cron", }), @@ -1719,6 +1752,7 @@ export const listAdminConversations = query({ const conversations = await ctx.db.query("conversations").collect() const filtered = [] + const staleShellCutoff = Date.now() - 30 * 24 * 60 * 60 * 1000 for (const conversation of conversations) { if (args.channel && conversation.channel !== args.channel) { @@ -1748,14 +1782,29 @@ export const listAdminConversations = query({ ) { continue } + if ( + conversation.source === "ghl:mirror" && + conversation.channel === "call" && + !String(conversation.lastMessagePreview || "").trim() && + (conversation.lastMessageAt || + conversation.startedAt || + conversation.updatedAt || + 0) < staleShellCutoff + ) { + continue + } filtered.push({ conversation, contact }) } filtered.sort( (a, b) => - (b.conversation.lastMessageAt || b.conversation.updatedAt) - - (a.conversation.lastMessageAt || a.conversation.updatedAt) + (b.conversation.lastMessageAt || + b.conversation.startedAt || + b.conversation.updatedAt) - + (a.conversation.lastMessageAt || + a.conversation.startedAt || + a.conversation.updatedAt) ) const paged = filtered.slice((page - 1) * limit, page * limit) diff --git a/convex/crmModel.ts b/convex/crmModel.ts index 7192745a..0c71b56d 100644 --- a/convex/crmModel.ts +++ b/convex/crmModel.ts @@ -225,6 +225,35 @@ export async function upsertConversationRecord(ctx, input) { .unique() } + if (!existing && input.contactId) { + const candidates = await ctx.db + .query("conversations") + .withIndex("by_contactId", (q) => q.eq("contactId", input.contactId)) + .collect() + + const targetMoment = + input.lastMessageAt ?? input.startedAt ?? input.updatedAt ?? now + + existing = + candidates + .filter((candidate) => { + if (input.channel && candidate.channel !== input.channel) { + return false + } + const candidateMoment = + candidate.lastMessageAt ?? + candidate.startedAt ?? + candidate.updatedAt ?? + 0 + return Math.abs(candidateMoment - targetMoment) <= 5 * 60 * 1000 + }) + .sort((a, b) => { + const aMoment = a.lastMessageAt ?? a.startedAt ?? a.updatedAt ?? 0 + const bMoment = b.lastMessageAt ?? b.startedAt ?? b.updatedAt ?? 0 + return Math.abs(aMoment - targetMoment) - Math.abs(bMoment - targetMoment) + })[0] || null + } + const patch = { contactId: input.contactId ?? existing?.contactId, title: input.title || existing?.title, diff --git a/convex/ghlMirror.ts b/convex/ghlMirror.ts index fead3822..4a7f8601 100644 --- a/convex/ghlMirror.ts +++ b/convex/ghlMirror.ts @@ -134,6 +134,29 @@ export async function fetchGhlMessagesPage( } } +export async function fetchGhlConversationsPage( + config: GhlMirrorConfig, + args?: { + limit?: number + } +) { + const url = new URL(`${config.baseUrl}/conversations/search`) + url.searchParams.set("locationId", config.locationId) + url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100)))) + + const payload = await fetchGhlMirrorJson(config, url.pathname + url.search) + + return { + items: Array.isArray(payload?.conversations) ? payload.conversations : [], + total: + typeof payload?.total === "number" + ? payload.total + : Array.isArray(payload?.conversations) + ? payload.conversations.length + : 0, + } +} + export async function fetchGhlConversationMessages( config: GhlMirrorConfig, args: { @@ -146,13 +169,17 @@ export async function fetchGhlConversationMessages( ) return { - items: Array.isArray(payload?.messages) - ? payload.messages - : Array.isArray(payload?.data?.messages) - ? payload.data.messages - : Array.isArray(payload) - ? payload - : [], + items: Array.isArray(payload?.messages?.messages) + ? payload.messages.messages + : Array.isArray(payload?.messages) + ? payload.messages + : Array.isArray(payload?.data?.messages?.messages) + ? payload.data.messages.messages + : Array.isArray(payload?.data?.messages) + ? payload.data.messages + : Array.isArray(payload) + ? payload + : [], } }