feat: improve ghl conversation sync and inbox actions

This commit is contained in:
DMleadgen 2026-04-16 14:05:12 -06:00
parent e294117e6e
commit 013a908d92
Signed by: matt
GPG key ID: C2720CF8CD701894
6 changed files with 462 additions and 20 deletions

View file

@ -1,5 +1,5 @@
import Link from "next/link" import Link from "next/link"
import { fetchQuery } from "convex/nextjs" import { fetchAction, fetchQuery } from "convex/nextjs"
import { MessageSquare, Phone, Search } from "lucide-react" import { MessageSquare, Phone, Search } from "lucide-react"
import { api } from "@/convex/_generated/api" import { api } from "@/convex/_generated/api"
import { Badge } from "@/components/ui/badge" import { Badge } from "@/components/ui/badge"
@ -20,6 +20,7 @@ type PageProps = {
channel?: "call" | "sms" | "chat" | "unknown" channel?: "call" | "sms" | "chat" | "unknown"
status?: "open" | "closed" | "archived" status?: "open" | "closed" | "archived"
conversationId?: string conversationId?: string
error?: string
page?: string page?: string
}> }>
} }
@ -147,15 +148,31 @@ export default async function AdminConversationsPage({
}) })
: null : null
const timeline = detail const hydratedDetail =
detail &&
detail.messages.length === 0 &&
detail.conversation.ghlConversationId
? await fetchAction(api.crm.hydrateConversationHistory, {
conversationId: detail.conversation.id,
}).then(async (result) => {
if (result?.imported) {
return await fetchQuery(api.crm.getAdminConversationDetail, {
conversationId: detail.conversation.id,
})
}
return detail
})
: detail
const timeline = hydratedDetail
? [ ? [
...detail.messages.map((message: any) => ({ ...hydratedDetail.messages.map((message: any) => ({
id: `message-${message.id}`, id: `message-${message.id}`,
type: "message" as const, type: "message" as const,
timestamp: message.sentAt || 0, timestamp: message.sentAt || 0,
message, message,
})), })),
...detail.recordings.map((recording: any) => ({ ...hydratedDetail.recordings.map((recording: any) => ({
id: `recording-${recording.id}`, id: `recording-${recording.id}`,
type: "recording" as const, type: "recording" as const,
timestamp: recording.startedAt || recording.endedAt || 0, timestamp: recording.startedAt || recording.endedAt || 0,
@ -310,51 +327,71 @@ export default async function AdminConversationsPage({
</div> </div>
<div className="bg-[#faf8f3]"> <div className="bg-[#faf8f3]">
{detail ? ( {hydratedDetail ? (
<div className="flex h-full flex-col"> <div className="flex h-full flex-col">
<div className="border-b bg-white px-6 py-5"> <div className="border-b bg-white px-6 py-5">
<div className="flex flex-col gap-4 md:flex-row md:items-start md:justify-between"> <div className="flex flex-col gap-4 md:flex-row md:items-start md:justify-between">
<div className="space-y-2"> <div className="space-y-2">
<div> <div>
<h2 className="text-2xl font-semibold"> <h2 className="text-2xl font-semibold">
{detail.contact?.name || {hydratedDetail.contact?.name ||
detail.conversation.title || hydratedDetail.conversation.title ||
"Conversation"} "Conversation"}
</h2> </h2>
{detail.contact?.secondaryLine || {hydratedDetail.contact?.secondaryLine ||
detail.contact?.email || hydratedDetail.contact?.email ||
detail.contact?.phone ? ( hydratedDetail.contact?.phone ? (
<p className="text-sm text-muted-foreground"> <p className="text-sm text-muted-foreground">
{detail.contact?.secondaryLine || {hydratedDetail.contact?.secondaryLine ||
detail.contact?.phone || hydratedDetail.contact?.phone ||
detail.contact?.email} hydratedDetail.contact?.email}
</p> </p>
) : null} ) : null}
</div> </div>
<div className="flex flex-wrap gap-2"> <div className="flex flex-wrap gap-2">
<Badge variant="outline"> <Badge variant="outline">
{detail.conversation.channel} {hydratedDetail.conversation.channel}
</Badge> </Badge>
<Badge variant="secondary"> <Badge variant="secondary">
{detail.conversation.status} {hydratedDetail.conversation.status}
</Badge> </Badge>
<Badge variant="outline"> <Badge variant="outline">
{timeline.filter((item) => item.type === "message").length}{" "} {timeline.filter((item) => item.type === "message").length}{" "}
messages messages
</Badge> </Badge>
{detail.recordings.length ? ( {hydratedDetail.recordings.length ? (
<Badge variant="outline"> <Badge variant="outline">
{detail.recordings.length} recording {hydratedDetail.recordings.length} recording
{detail.recordings.length === 1 ? "" : "s"} {hydratedDetail.recordings.length === 1 ? "" : "s"}
</Badge> </Badge>
) : null} ) : null}
</div> </div>
</div> </div>
<div className="text-sm text-muted-foreground"> <div className="text-sm text-muted-foreground">
Last activity:{" "} Last activity:{" "}
{formatTimestamp(detail.conversation.lastMessageAt)} {formatTimestamp(hydratedDetail.conversation.lastMessageAt)}
</div> </div>
</div> </div>
<div className="mt-4 flex flex-wrap items-center gap-3">
<form
action={`/api/admin/conversations/${hydratedDetail.conversation.id}/sync`}
method="post"
>
<Button type="submit" variant="outline" size="sm">
Refresh history
</Button>
</form>
{params.error === "send" ? (
<p className="text-sm text-destructive">
Rocky could not send that message through GHL.
</p>
) : null}
{params.error === "sync" ? (
<p className="text-sm text-destructive">
Rocky could not refresh that conversation from GHL.
</p>
) : null}
</div>
</div> </div>
<ScrollArea className="h-[520px] px-4 py-5 lg:h-[640px] lg:px-6"> <ScrollArea className="h-[520px] px-4 py-5 lg:h-[640px] lg:px-6">
@ -362,7 +399,8 @@ export default async function AdminConversationsPage({
{timeline.length === 0 ? ( {timeline.length === 0 ? (
<div className="rounded-2xl border border-dashed bg-white/70 px-6 py-10 text-center text-sm text-muted-foreground"> <div className="rounded-2xl border border-dashed bg-white/70 px-6 py-10 text-center text-sm text-muted-foreground">
No messages or recordings have been mirrored into this No messages or recordings have been mirrored into this
conversation yet. conversation yet. Use refresh history to pull the latest
thread from GHL.
</div> </div>
) : ( ) : (
timeline.map((item: any) => { timeline.map((item: any) => {
@ -435,6 +473,26 @@ export default async function AdminConversationsPage({
)} )}
</div> </div>
</ScrollArea> </ScrollArea>
<div className="border-t bg-white px-4 py-4 lg:px-6">
<form
action={`/api/admin/conversations/${hydratedDetail.conversation.id}/messages`}
method="post"
className="space-y-3"
>
<textarea
name="body"
rows={4}
placeholder="Reply to this conversation"
className="border-input placeholder:text-muted-foreground focus-visible:border-ring focus-visible:ring-ring/50 min-h-24 w-full rounded-2xl border bg-background px-4 py-3 text-sm shadow-sm outline-none focus-visible:ring-[3px]"
/>
<div className="flex items-center justify-between gap-3">
<p className="text-xs text-muted-foreground">
Sends through GHL and mirrors the reply back into Rocky.
</p>
<Button type="submit">Send message</Button>
</div>
</form>
</div>
</div> </div>
) : ( ) : (
<div className="flex h-full min-h-[520px] items-center justify-center px-6 py-16"> <div className="flex h-full min-h-[520px] items-center justify-center px-6 py-16">

View file

@ -0,0 +1,49 @@
import { NextResponse } from "next/server"
import { fetchAction } from "convex/nextjs"
import { api } from "@/convex/_generated/api"
import { requireAdminSession } from "@/lib/server/admin-auth"
type RouteContext = {
params: Promise<{
id: string
}>
}
export async function POST(request: Request, { params }: RouteContext) {
const adminUser = await requireAdminSession(request)
if (!adminUser) {
return NextResponse.redirect(new URL("/sign-in", request.url))
}
const { id } = await params
const formData = await request.formData()
const body = String(formData.get("body") || "").trim()
if (!body) {
return NextResponse.redirect(
new URL(`/admin/conversations?conversationId=${encodeURIComponent(id)}`, request.url)
)
}
try {
await fetchAction(api.crm.sendAdminConversationMessage, {
conversationId: id,
body,
})
return NextResponse.redirect(
new URL(
`/admin/conversations?conversationId=${encodeURIComponent(id)}`,
request.url
)
)
} catch (error) {
console.error("Failed to send admin conversation message:", error)
return NextResponse.redirect(
new URL(
`/admin/conversations?conversationId=${encodeURIComponent(id)}&error=send`,
request.url
)
)
}
}

View file

@ -0,0 +1,40 @@
import { NextResponse } from "next/server"
import { fetchAction } from "convex/nextjs"
import { api } from "@/convex/_generated/api"
import { requireAdminSession } from "@/lib/server/admin-auth"
type RouteContext = {
params: Promise<{
id: string
}>
}
export async function POST(request: Request, { params }: RouteContext) {
const adminUser = await requireAdminSession(request)
if (!adminUser) {
return NextResponse.redirect(new URL("/sign-in", request.url))
}
const { id } = await params
try {
await fetchAction(api.crm.hydrateConversationHistory, {
conversationId: id,
})
return NextResponse.redirect(
new URL(
`/admin/conversations?conversationId=${encodeURIComponent(id)}`,
request.url
)
)
} catch (error) {
console.error("Failed to refresh conversation history:", error)
return NextResponse.redirect(
new URL(
`/admin/conversations?conversationId=${encodeURIComponent(id)}&error=sync`,
request.url
)
)
}
}

View file

@ -15,9 +15,11 @@ import {
} from "./crmModel" } from "./crmModel"
import { import {
fetchGhlCallLogsPage, fetchGhlCallLogsPage,
fetchGhlConversationMessages,
fetchGhlContactsPage, fetchGhlContactsPage,
fetchGhlMessagesPage, fetchGhlMessagesPage,
readGhlMirrorConfig, readGhlMirrorConfig,
sendGhlConversationMessage,
} from "./ghlMirror" } from "./ghlMirror"
const GHL_SYNC_PROVIDER = "ghl" const GHL_SYNC_PROVIDER = "ghl"
@ -136,6 +138,19 @@ async function buildAdminSyncOverview(ctx) {
} }
} }
function extractGhlMessages(payload: any) {
if (Array.isArray(payload?.items)) {
return payload.items
}
return Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages)
? payload.data.messages
: Array.isArray(payload)
? payload
: []
}
function matchesSearch(values: Array<string | undefined>, search: string) { function matchesSearch(values: Array<string | undefined>, search: string) {
if (!search) { if (!search) {
return true return true
@ -897,6 +912,29 @@ export const reconcileExternalState = mutation({
}, },
}) })
export const listConversationHistoryHydrationCandidates = query({
args: {
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const conversations = await ctx.db.query("conversations").collect()
return conversations
.filter((conversation) => conversation.ghlConversationId)
.sort(
(a, b) =>
(b.lastMessageAt || b.updatedAt || 0) - (a.lastMessageAt || a.updatedAt || 0)
)
.map((conversation) => ({
id: conversation._id,
ghlConversationId: conversation.ghlConversationId,
channel: conversation.channel,
lastMessageAt: conversation.lastMessageAt || conversation.updatedAt || 0,
}))
.slice(0, limit)
},
})
export const runGhlMirror = action({ export const runGhlMirror = action({
args: { args: {
reason: v.optional(v.string()), reason: v.optional(v.string()),
@ -947,8 +985,10 @@ export const runGhlMirror = action({
conversations: 0, conversations: 0,
messages: 0, messages: 0,
recordings: 0, recordings: 0,
hydrated: 0,
mismatches: [] as string[], mismatches: [] as string[],
} }
const hydrationTargets = new Map<string, string>()
const updateRunning = async (entityType: string, metadata?: Record<string, any>) => { const updateRunning = async (entityType: string, metadata?: Record<string, any>) => {
await ctx.runMutation(api.crm.updateSyncCheckpoint, { await ctx.runMutation(api.crm.updateSyncCheckpoint, {
@ -1084,6 +1124,7 @@ export const runGhlMirror = action({
entityId, entityId,
payload: item, payload: item,
}) })
hydrationTargets.set(entityId, item.channel || "")
summary.conversations += 1 summary.conversations += 1
} }
@ -1147,6 +1188,9 @@ export const runGhlMirror = action({
entityId: String(item.id || ""), entityId: String(item.id || ""),
payload: item, payload: item,
}) })
if (item.conversationId) {
hydrationTargets.set(String(item.conversationId), item.channel || "")
}
summary.messages += 1 summary.messages += 1
} }
@ -1177,6 +1221,51 @@ export const runGhlMirror = action({
return summary return summary
} }
try {
const fallbackCandidates = await ctx.runQuery(
api.crm.listConversationHistoryHydrationCandidates,
{ limit: 25 }
)
for (const candidate of fallbackCandidates) {
if (candidate.ghlConversationId) {
hydrationTargets.set(
String(candidate.ghlConversationId),
candidate.channel || ""
)
}
}
for (const [ghlConversationId, channelHint] of Array.from(
hydrationTargets.entries()
).slice(0, 25)) {
const fetched = await fetchGhlConversationMessages(config, {
conversationId: ghlConversationId,
})
const items = extractGhlMessages(fetched).filter(Boolean)
if (!items.length) {
continue
}
for (const item of items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
conversationId: item.conversationId || ghlConversationId,
channel: item.channel || channelHint,
},
})
summary.hydrated += 1
}
}
} catch (error) {
await failStage("messages", error, {
hydrated: summary.hydrated,
})
return summary
}
try { try {
await updateRunning("recordings") await updateRunning("recordings")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {}) const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
@ -1327,6 +1416,146 @@ export const repairMirroredContacts = action({
}, },
}) })
export const hydrateConversationHistory = action({
args: {
conversationId: v.string(),
},
handler: async (ctx, args) => {
const config = readGhlMirrorConfig()
if (!config) {
return {
ok: false,
imported: 0,
message: "GHL credentials are not configured.",
}
}
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
conversationId: args.conversationId,
})
if (!detail?.conversation?.ghlConversationId) {
return {
ok: false,
imported: 0,
message: "This conversation is not linked to GHL.",
}
}
try {
const fetched = await fetchGhlConversationMessages(config, {
conversationId: detail.conversation.ghlConversationId,
})
const items = extractGhlMessages(fetched).filter(Boolean)
let imported = 0
for (const item of items) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(item.id || item.messageId || ""),
payload: {
...item,
conversationId:
item.conversationId || detail.conversation.ghlConversationId,
channel: item.channel || detail.conversation.channel,
},
})
imported += 1
}
return {
ok: true,
imported,
ghlConversationId: detail.conversation.ghlConversationId,
}
} catch (error) {
return {
ok: false,
imported: 0,
ghlConversationId: detail.conversation.ghlConversationId,
message: error instanceof Error ? error.message : "Failed to hydrate history.",
}
}
},
})
export const sendAdminConversationMessage = action({
args: {
conversationId: v.string(),
body: v.string(),
},
handler: async (ctx, args) => {
const messageBody = String(args.body || "").trim()
if (!messageBody) {
throw new Error("Message body is required.")
}
const config = readGhlMirrorConfig()
if (!config) {
throw new Error("GHL credentials are not configured.")
}
const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
conversationId: args.conversationId,
})
if (!detail) {
throw new Error("Conversation not found.")
}
const response = await sendGhlConversationMessage(config, {
conversationId: detail.conversation.ghlConversationId || undefined,
contactId: detail.contact?.ghlContactId || undefined,
message: messageBody,
type: "SMS",
})
const responseMessage =
response?.message ||
response?.data?.message ||
response?.messages?.[0] ||
response?.data?.messages?.[0] ||
null
if (responseMessage) {
await ctx.runMutation(api.crm.importMessage, {
provider: GHL_SYNC_PROVIDER,
entityId: String(
responseMessage.id || responseMessage.messageId || Date.now()
),
payload: {
...responseMessage,
conversationId:
responseMessage.conversationId || detail.conversation.ghlConversationId,
contactId: responseMessage.contactId || detail.contact?.ghlContactId,
channel: responseMessage.channel || detail.conversation.channel || "SMS",
direction: responseMessage.direction || "outbound",
body: responseMessage.body || responseMessage.message || messageBody,
status: responseMessage.status || "sent",
},
})
} else {
await ctx.runMutation(api.crm.upsertMessage, {
conversationId: args.conversationId as any,
contactId: detail.contact?.id as any,
direction: "outbound",
channel:
detail.conversation.channel === "sms" ? "sms" : "unknown",
source: "ghl:send",
body: messageBody,
status: "sent",
sentAt: Date.now(),
metadata: JSON.stringify(response || {}),
})
}
return {
ok: true,
response,
}
},
})
export const listAdminContacts = query({ export const listAdminContacts = query({
args: { args: {
search: v.optional(v.string()), search: v.optional(v.string()),
@ -1647,6 +1876,7 @@ export const getAdminConversationDetail = query({
email: contact.email, email: contact.email,
phone: contact.phone, phone: contact.phone,
company: contact.company, company: contact.company,
ghlContactId: contact.ghlContactId,
secondaryLine: buildContactDisplay(contact).secondaryLine, secondaryLine: buildContactDisplay(contact).secondaryLine,
} }
: null, : null,

View file

@ -134,6 +134,48 @@ export async function fetchGhlMessagesPage(
} }
} }
export async function fetchGhlConversationMessages(
config: GhlMirrorConfig,
args: {
conversationId: string
}
) {
const payload = await fetchGhlMirrorJson(
config,
`/conversations/${encodeURIComponent(args.conversationId)}/messages`
)
return {
items: Array.isArray(payload?.messages)
? payload.messages
: Array.isArray(payload?.data?.messages)
? payload.data.messages
: Array.isArray(payload)
? payload
: [],
}
}
export async function sendGhlConversationMessage(
config: GhlMirrorConfig,
args: {
conversationId?: string
contactId?: string
message: string
type?: string
}
) {
return await fetchGhlMirrorJson(config, "/conversations/messages", {
method: "POST",
body: JSON.stringify({
type: args.type || "SMS",
message: args.message,
conversationId: args.conversationId,
contactId: args.contactId,
}),
})
}
export async function fetchGhlCallLogsPage( export async function fetchGhlCallLogsPage(
config: GhlMirrorConfig, config: GhlMirrorConfig,
args?: { args?: {

View file

@ -53,6 +53,17 @@ function hashAdminSessionToken(token: string) {
return createHash("sha256").update(token).digest("hex") return createHash("sha256").update(token).digest("hex")
} }
function readCookieFromHeader(cookieHeader: string, name: string) {
const cookies = cookieHeader.split(";")
for (const entry of cookies) {
const [cookieName, ...rest] = entry.trim().split("=")
if (cookieName === name) {
return rest.join("=")
}
}
return ""
}
export function isAdminCredentialLoginConfigured() { export function isAdminCredentialLoginConfigured() {
return Boolean( return Boolean(
isAdminUiEnabled() && isAdminUiEnabled() &&
@ -125,6 +136,18 @@ export async function validateAdminSession(rawToken?: string | null) {
} }
} }
export async function requireAdminSession(request: Request) {
const rawToken = readCookieFromHeader(
request.headers.get("cookie") || "",
ADMIN_SESSION_COOKIE
)
const session = await validateAdminSession(rawToken || null)
if (!session?.user) {
return null
}
return session.user
}
export async function getAdminUserFromCookies() { export async function getAdminUserFromCookies() {
if (!isAdminUiEnabled()) { if (!isAdminUiEnabled()) {
return null return null