fix: improve ghl conversation sync mapping

This commit is contained in:
DMleadgen 2026-04-16 14:23:25 -06:00
parent 013a908d92
commit c5e40c5caf
Signed by: matt
GPG key ID: C2720CF8CD701894
3 changed files with 223 additions and 118 deletions

View file

@ -15,6 +15,7 @@ import {
} from "./crmModel"
import {
fetchGhlCallLogsPage,
fetchGhlConversationsPage,
fetchGhlConversationMessages,
fetchGhlContactsPage,
fetchGhlMessagesPage,
@ -142,6 +143,12 @@ function extractGhlMessages(payload: any) {
if (Array.isArray(payload?.items)) {
return payload.items
}
if (Array.isArray(payload?.messages?.messages)) {
return payload.messages.messages
}
if (Array.isArray(payload?.data?.messages?.messages)) {
return payload.data.messages.messages
}
return Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages)
@ -151,6 +158,78 @@ function extractGhlMessages(payload: any) {
: []
}
function parseGhlTimestamp(value: unknown) {
if (typeof value === "number" && Number.isFinite(value)) {
return value
}
if (typeof value === "string" && value.trim()) {
const numeric = Number(value)
if (Number.isFinite(numeric) && numeric > 0) {
return numeric
}
const parsed = new Date(value).getTime()
if (Number.isFinite(parsed) && parsed > 0) {
return parsed
}
}
return undefined
}
function normalizeGhlChannel(value?: unknown) {
const normalized = String(value || "")
.trim()
.toLowerCase()
if (
normalized.includes("sms") ||
normalized.includes("whatsapp") ||
normalized.includes("message")
) {
return "sms"
}
if (
normalized.includes("call") ||
normalized.includes("phone") ||
normalized.includes("voicemail")
) {
return "call"
}
if (normalized.includes("chat")) {
return "chat"
}
return "unknown"
}
function deriveGhlConversationChannel(payload: any) {
return (
(payload.channel ? normalizeGhlChannel(payload.channel) : undefined) ||
(payload.messageType ? normalizeGhlChannel(payload.messageType) : undefined) ||
(payload.lastMessageType
? normalizeGhlChannel(payload.lastMessageType)
: undefined) ||
(payload.type ? normalizeGhlChannel(payload.type) : undefined) ||
"unknown"
)
}
function deriveConversationStatus(payload: any) {
if (payload.status) {
return normalizeConversationStatus(payload.status)
}
if (typeof payload.inbox === "boolean") {
return payload.inbox ? "open" : "closed"
}
return "open"
}
function matchesSearch(values: Array<string | undefined>, search: string) {
if (!search) {
return true
@ -546,9 +625,9 @@ export const importContact = mutation({
source: `${args.provider}:mirror`,
ghlContactId: payload.id || args.entityId,
lastActivityAt:
typeof payload.dateUpdated === "string"
? new Date(payload.dateUpdated).getTime()
: Date.now(),
parseGhlTimestamp(payload.dateUpdated) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
})
await upsertExternalSyncState(ctx, {
@ -590,34 +669,40 @@ export const importConversation = mutation({
ghlContactId: payload.contactId,
status: "lead",
lastActivityAt:
typeof payload.dateUpdated === "string"
? new Date(payload.dateUpdated).getTime()
: Date.now(),
parseGhlTimestamp(payload.lastMessageDate) ??
parseGhlTimestamp(payload.dateUpdated) ??
Date.now(),
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title: payload.fullName || payload.title || payload.contactName,
channel:
payload.channel === "SMS" || payload.type === "sms" ? "sms" : "call",
title:
payload.fullName ||
payload.title ||
payload.contactName ||
payload.phone ||
payload.email,
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
status: normalizeConversationStatus(payload.status),
direction: normalizeConversationDirection(payload.direction),
status: deriveConversationStatus(payload),
direction: normalizeConversationDirection(
payload.lastMessageDirection || payload.direction
),
startedAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
endedAt:
typeof payload.dateEnded === "string"
? new Date(payload.dateEnded).getTime()
: undefined,
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
endedAt: parseGhlTimestamp(payload.dateEnded),
lastMessageAt:
typeof payload.lastMessageAt === "string"
? new Date(payload.lastMessageAt).getTime()
: undefined,
parseGhlTimestamp(payload.lastMessageDate) ??
parseGhlTimestamp(payload.lastMessageAt) ??
parseGhlTimestamp(payload.dateUpdated),
lastMessagePreview: payload.lastMessageBody || payload.snippet,
unreadCount:
typeof payload.unreadCount === "number" ? payload.unreadCount : undefined,
summaryText: payload.summary,
ghlConversationId: payload.id || args.entityId,
ghlConversationId:
payload.conversationId || args.entityId || payload.id,
})
await ensureConversationParticipant(ctx, {
@ -675,24 +760,20 @@ export const importMessage = mutation({
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title: payload.contactName,
channel:
payload.channel === "SMS" || payload.messageType === "SMS"
? "sms"
: payload.channel === "Call"
? "call"
: "unknown",
title:
payload.contactName || payload.fullName || payload.name || payload.phone,
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
status: normalizeConversationStatus(payload.conversationStatus),
direction: normalizeConversationDirection(payload.direction),
startedAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
lastMessageAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
lastMessagePreview: payload.body || payload.message,
ghlConversationId: payload.conversationId,
})
@ -716,20 +797,15 @@ export const importMessage = mutation({
: payload.direction === "outbound"
? "outbound"
: "system",
channel:
payload.channel === "SMS" || payload.messageType === "SMS"
? "sms"
: payload.channel === "Call"
? "call"
: "unknown",
channel: deriveGhlConversationChannel(payload),
source: `${args.provider}:mirror`,
messageType: payload.messageType || payload.type,
body: payload.body || payload.message || payload.transcript || "",
status: payload.status,
sentAt:
typeof payload.dateAdded === "string"
? new Date(payload.dateAdded).getTime()
: Date.now(),
parseGhlTimestamp(payload.dateAdded) ??
parseGhlTimestamp(payload.lastMessageDate) ??
Date.now(),
ghlMessageId: payload.id || args.entityId,
metadata: JSON.stringify(payload),
})
@ -765,13 +841,9 @@ export const importRecording = mutation({
status: "closed",
direction: normalizeConversationDirection(payload.direction),
startedAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: Date.now(),
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
lastMessageAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: Date.now(),
parseGhlTimestamp(payload.createdAt) ?? Date.now(),
lastMessagePreview: payload.summary || payload.transcript,
ghlConversationId: payload.conversationId,
livekitRoomName: payload.livekitRoomName,
@ -791,13 +863,8 @@ export const importRecording = mutation({
? payload.duration * 1000
: undefined,
startedAt:
typeof payload.createdAt === "string"
? new Date(payload.createdAt).getTime()
: undefined,
endedAt:
typeof payload.endedAt === "string"
? new Date(payload.endedAt).getTime()
: undefined,
parseGhlTimestamp(payload.createdAt),
endedAt: parseGhlTimestamp(payload.endedAt),
ghlMessageId: payload.messageId,
livekitRoomName: payload.livekitRoomName,
metadata: JSON.stringify(payload),
@ -1082,70 +1149,36 @@ export const runGhlMirror = action({
try {
await updateRunning("conversations")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
const conversationCursors = {
SMS:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.SMS
? String(previous.stages.conversations.metadata.cursors.SMS)
: undefined,
Call:
!args.forceFullBackfill &&
previous.stages.conversations.metadata?.cursors?.Call
? String(previous.stages.conversations.metadata.cursors.Call)
: undefined,
}
const fetched = await fetchGhlConversationsPage(config, {
limit: 100,
})
for (const channel of ["SMS", "Call"] as const) {
let pages = 0
while (pages < maxPagesPerRun) {
const fetched = await fetchGhlMessagesPage(config, {
limit: messagesLimit,
cursor: conversationCursors[channel],
channel,
})
if (!fetched.items.length) {
break
}
const grouped = new Map<string, any>()
for (const item of fetched.items) {
const conversationId = String(item.conversationId || item.id || "")
if (!conversationId || grouped.has(conversationId)) {
continue
}
grouped.set(conversationId, item)
}
for (const [entityId, item] of grouped.entries()) {
await ctx.runMutation(api.crm.importConversation, {
provider: GHL_SYNC_PROVIDER,
entityId,
payload: item,
})
hydrationTargets.set(entityId, item.channel || "")
summary.conversations += 1
}
pages += 1
conversationCursors[channel] = fetched.nextCursor
if (!fetched.nextCursor) {
break
}
for (const item of fetched.items) {
const entityId = String(item.id || "")
if (!entityId) {
continue
}
await ctx.runMutation(api.crm.importConversation, {
provider: GHL_SYNC_PROVIDER,
entityId,
payload: item,
})
hydrationTargets.set(entityId, item.lastMessageType || item.type || "")
summary.conversations += 1
}
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
provider: GHL_SYNC_PROVIDER,
entityType: "conversations",
entityId: "conversations",
cursor: conversationCursors.Call || conversationCursors.SMS,
cursor: undefined,
status: "synced",
error: "",
metadata: JSON.stringify({
imported: summary.conversations,
cursors: conversationCursors,
total: fetched.total,
source: "search",
completedAt: Date.now(),
reason: args.reason || "cron",
}),
@ -1719,6 +1752,7 @@ export const listAdminConversations = query({
const conversations = await ctx.db.query("conversations").collect()
const filtered = []
const staleShellCutoff = Date.now() - 30 * 24 * 60 * 60 * 1000
for (const conversation of conversations) {
if (args.channel && conversation.channel !== args.channel) {
@ -1748,14 +1782,29 @@ export const listAdminConversations = query({
) {
continue
}
if (
conversation.source === "ghl:mirror" &&
conversation.channel === "call" &&
!String(conversation.lastMessagePreview || "").trim() &&
(conversation.lastMessageAt ||
conversation.startedAt ||
conversation.updatedAt ||
0) < staleShellCutoff
) {
continue
}
filtered.push({ conversation, contact })
}
filtered.sort(
(a, b) =>
(b.conversation.lastMessageAt || b.conversation.updatedAt) -
(a.conversation.lastMessageAt || a.conversation.updatedAt)
(b.conversation.lastMessageAt ||
b.conversation.startedAt ||
b.conversation.updatedAt) -
(a.conversation.lastMessageAt ||
a.conversation.startedAt ||
a.conversation.updatedAt)
)
const paged = filtered.slice((page - 1) * limit, page * limit)

View file

@ -225,6 +225,35 @@ export async function upsertConversationRecord(ctx, input) {
.unique()
}
if (!existing && input.contactId) {
const candidates = await ctx.db
.query("conversations")
.withIndex("by_contactId", (q) => q.eq("contactId", input.contactId))
.collect()
const targetMoment =
input.lastMessageAt ?? input.startedAt ?? input.updatedAt ?? now
existing =
candidates
.filter((candidate) => {
if (input.channel && candidate.channel !== input.channel) {
return false
}
const candidateMoment =
candidate.lastMessageAt ??
candidate.startedAt ??
candidate.updatedAt ??
0
return Math.abs(candidateMoment - targetMoment) <= 5 * 60 * 1000
})
.sort((a, b) => {
const aMoment = a.lastMessageAt ?? a.startedAt ?? a.updatedAt ?? 0
const bMoment = b.lastMessageAt ?? b.startedAt ?? b.updatedAt ?? 0
return Math.abs(aMoment - targetMoment) - Math.abs(bMoment - targetMoment)
})[0] || null
}
const patch = {
contactId: input.contactId ?? existing?.contactId,
title: input.title || existing?.title,

View file

@ -134,6 +134,29 @@ export async function fetchGhlMessagesPage(
}
}
export async function fetchGhlConversationsPage(
config: GhlMirrorConfig,
args?: {
limit?: number
}
) {
const url = new URL(`${config.baseUrl}/conversations/search`)
url.searchParams.set("locationId", config.locationId)
url.searchParams.set("limit", String(Math.min(100, Math.max(1, args?.limit || 100))))
const payload = await fetchGhlMirrorJson(config, url.pathname + url.search)
return {
items: Array.isArray(payload?.conversations) ? payload.conversations : [],
total:
typeof payload?.total === "number"
? payload.total
: Array.isArray(payload?.conversations)
? payload.conversations.length
: 0,
}
}
export async function fetchGhlConversationMessages(
config: GhlMirrorConfig,
args: {
@ -146,13 +169,17 @@ export async function fetchGhlConversationMessages(
)
return {
items: Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages)
? payload.data.messages
: Array.isArray(payload)
? payload
: [],
items: Array.isArray(payload?.messages?.messages)
? payload.messages.messages
: Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages?.messages)
? payload.data.messages.messages
: Array.isArray(payload?.data?.messages)
? payload.data.messages
: Array.isArray(payload)
? payload
: [],
}
}