diff --git a/app/admin/conversations/page.tsx b/app/admin/conversations/page.tsx
index 1bfafa54..ba2a6f28 100644
--- a/app/admin/conversations/page.tsx
+++ b/app/admin/conversations/page.tsx
@@ -1,5 +1,5 @@
import Link from "next/link"
-import { fetchQuery } from "convex/nextjs"
+import { fetchAction, fetchQuery } from "convex/nextjs"
import { MessageSquare, Phone, Search } from "lucide-react"
import { api } from "@/convex/_generated/api"
import { Badge } from "@/components/ui/badge"
@@ -20,6 +20,7 @@ type PageProps = {
channel?: "call" | "sms" | "chat" | "unknown"
status?: "open" | "closed" | "archived"
conversationId?: string
+ error?: string
page?: string
}>
}
@@ -147,15 +148,31 @@ export default async function AdminConversationsPage({
})
: null
- const timeline = detail
+ const hydratedDetail =
+ detail &&
+ detail.messages.length === 0 &&
+ detail.conversation.ghlConversationId
+ ? await fetchAction(api.crm.hydrateConversationHistory, {
+ conversationId: detail.conversation.id,
+ }).then(async (result) => {
+ if (result?.imported) {
+ return await fetchQuery(api.crm.getAdminConversationDetail, {
+ conversationId: detail.conversation.id,
+ })
+ }
+ return detail
+ })
+ : detail
+
+ const timeline = hydratedDetail
? [
- ...detail.messages.map((message: any) => ({
+ ...hydratedDetail.messages.map((message: any) => ({
id: `message-${message.id}`,
type: "message" as const,
timestamp: message.sentAt || 0,
message,
})),
- ...detail.recordings.map((recording: any) => ({
+ ...hydratedDetail.recordings.map((recording: any) => ({
id: `recording-${recording.id}`,
type: "recording" as const,
timestamp: recording.startedAt || recording.endedAt || 0,
@@ -310,51 +327,71 @@ export default async function AdminConversationsPage({
- {detail ? (
+ {hydratedDetail ? (
- {detail.contact?.name ||
- detail.conversation.title ||
+ {hydratedDetail.contact?.name ||
+ hydratedDetail.conversation.title ||
"Conversation"}
- {detail.contact?.secondaryLine ||
- detail.contact?.email ||
- detail.contact?.phone ? (
+ {hydratedDetail.contact?.secondaryLine ||
+ hydratedDetail.contact?.email ||
+ hydratedDetail.contact?.phone ? (
- {detail.contact?.secondaryLine ||
- detail.contact?.phone ||
- detail.contact?.email}
+ {hydratedDetail.contact?.secondaryLine ||
+ hydratedDetail.contact?.phone ||
+ hydratedDetail.contact?.email}
) : null}
- {detail.conversation.channel}
+ {hydratedDetail.conversation.channel}
- {detail.conversation.status}
+ {hydratedDetail.conversation.status}
{timeline.filter((item) => item.type === "message").length}{" "}
messages
- {detail.recordings.length ? (
+ {hydratedDetail.recordings.length ? (
- {detail.recordings.length} recording
- {detail.recordings.length === 1 ? "" : "s"}
+ {hydratedDetail.recordings.length} recording
+ {hydratedDetail.recordings.length === 1 ? "" : "s"}
) : null}
Last activity:{" "}
- {formatTimestamp(detail.conversation.lastMessageAt)}
+ {formatTimestamp(hydratedDetail.conversation.lastMessageAt)}
+
+
+ {params.error === "send" ? (
+
+ Rocky could not send that message through GHL.
+
+ ) : null}
+ {params.error === "sync" ? (
+
+ Rocky could not refresh that conversation from GHL.
+
+ ) : null}
+
@@ -362,7 +399,8 @@ export default async function AdminConversationsPage({
{timeline.length === 0 ? (
No messages or recordings have been mirrored into this
- conversation yet.
+ conversation yet. Use refresh history to pull the latest
+ thread from GHL.
) : (
timeline.map((item: any) => {
@@ -435,6 +473,26 @@ export default async function AdminConversationsPage({
)}
+
) : (
diff --git a/app/api/admin/conversations/[id]/messages/route.ts b/app/api/admin/conversations/[id]/messages/route.ts
new file mode 100644
index 00000000..b05e0f19
--- /dev/null
+++ b/app/api/admin/conversations/[id]/messages/route.ts
@@ -0,0 +1,49 @@
+import { NextResponse } from "next/server"
+import { fetchAction } from "convex/nextjs"
+import { api } from "@/convex/_generated/api"
+import { requireAdminSession } from "@/lib/server/admin-auth"
+
+type RouteContext = {
+ params: Promise<{
+ id: string
+ }>
+}
+
+export async function POST(request: Request, { params }: RouteContext) {
+ const adminUser = await requireAdminSession(request)
+ if (!adminUser) {
+ return NextResponse.redirect(new URL("/sign-in", request.url))
+ }
+
+ const { id } = await params
+ const formData = await request.formData()
+ const body = String(formData.get("body") || "").trim()
+
+ if (!body) {
+ return NextResponse.redirect(
+ new URL(`/admin/conversations?conversationId=${encodeURIComponent(id)}`, request.url)
+ )
+ }
+
+ try {
+ await fetchAction(api.crm.sendAdminConversationMessage, {
+ conversationId: id,
+ body,
+ })
+
+ return NextResponse.redirect(
+ new URL(
+ `/admin/conversations?conversationId=${encodeURIComponent(id)}`,
+ request.url
+ )
+ )
+ } catch (error) {
+ console.error("Failed to send admin conversation message:", error)
+ return NextResponse.redirect(
+ new URL(
+ `/admin/conversations?conversationId=${encodeURIComponent(id)}&error=send`,
+ request.url
+ )
+ )
+ }
+}
diff --git a/app/api/admin/conversations/[id]/sync/route.ts b/app/api/admin/conversations/[id]/sync/route.ts
new file mode 100644
index 00000000..bf0b3dd0
--- /dev/null
+++ b/app/api/admin/conversations/[id]/sync/route.ts
@@ -0,0 +1,40 @@
+import { NextResponse } from "next/server"
+import { fetchAction } from "convex/nextjs"
+import { api } from "@/convex/_generated/api"
+import { requireAdminSession } from "@/lib/server/admin-auth"
+
+type RouteContext = {
+ params: Promise<{
+ id: string
+ }>
+}
+
+export async function POST(request: Request, { params }: RouteContext) {
+ const adminUser = await requireAdminSession(request)
+ if (!adminUser) {
+ return NextResponse.redirect(new URL("/sign-in", request.url))
+ }
+
+ const { id } = await params
+
+ try {
+ await fetchAction(api.crm.hydrateConversationHistory, {
+ conversationId: id,
+ })
+
+ return NextResponse.redirect(
+ new URL(
+ `/admin/conversations?conversationId=${encodeURIComponent(id)}`,
+ request.url
+ )
+ )
+ } catch (error) {
+ console.error("Failed to refresh conversation history:", error)
+ return NextResponse.redirect(
+ new URL(
+ `/admin/conversations?conversationId=${encodeURIComponent(id)}&error=sync`,
+ request.url
+ )
+ )
+ }
+}
diff --git a/convex/crm.ts b/convex/crm.ts
index 7591213b..63f48e2c 100644
--- a/convex/crm.ts
+++ b/convex/crm.ts
@@ -15,9 +15,11 @@ import {
} from "./crmModel"
import {
fetchGhlCallLogsPage,
+ fetchGhlConversationMessages,
fetchGhlContactsPage,
fetchGhlMessagesPage,
readGhlMirrorConfig,
+ sendGhlConversationMessage,
} from "./ghlMirror"
const GHL_SYNC_PROVIDER = "ghl"
@@ -136,6 +138,19 @@ async function buildAdminSyncOverview(ctx) {
}
}
+function extractGhlMessages(payload: any) {
+ if (Array.isArray(payload?.items)) {
+ return payload.items
+ }
+ return Array.isArray(payload?.messages)
+ ? payload.messages
+ : Array.isArray(payload?.data?.messages)
+ ? payload.data.messages
+ : Array.isArray(payload)
+ ? payload
+ : []
+}
+
function matchesSearch(values: Array, search: string) {
if (!search) {
return true
@@ -897,6 +912,29 @@ export const reconcileExternalState = mutation({
},
})
+export const listConversationHistoryHydrationCandidates = query({
+ args: {
+ limit: v.optional(v.number()),
+ },
+ handler: async (ctx, args) => {
+ const limit = Math.min(100, Math.max(1, args.limit ?? 25))
+ const conversations = await ctx.db.query("conversations").collect()
+ return conversations
+ .filter((conversation) => conversation.ghlConversationId)
+ .sort(
+ (a, b) =>
+ (b.lastMessageAt || b.updatedAt || 0) - (a.lastMessageAt || a.updatedAt || 0)
+ )
+ .map((conversation) => ({
+ id: conversation._id,
+ ghlConversationId: conversation.ghlConversationId,
+ channel: conversation.channel,
+ lastMessageAt: conversation.lastMessageAt || conversation.updatedAt || 0,
+ }))
+ .slice(0, limit)
+ },
+})
+
export const runGhlMirror = action({
args: {
reason: v.optional(v.string()),
@@ -947,8 +985,10 @@ export const runGhlMirror = action({
conversations: 0,
messages: 0,
recordings: 0,
+ hydrated: 0,
mismatches: [] as string[],
}
+ const hydrationTargets = new Map()
const updateRunning = async (entityType: string, metadata?: Record) => {
await ctx.runMutation(api.crm.updateSyncCheckpoint, {
@@ -1084,6 +1124,7 @@ export const runGhlMirror = action({
entityId,
payload: item,
})
+ hydrationTargets.set(entityId, item.channel || "")
summary.conversations += 1
}
@@ -1147,6 +1188,9 @@ export const runGhlMirror = action({
entityId: String(item.id || ""),
payload: item,
})
+ if (item.conversationId) {
+ hydrationTargets.set(String(item.conversationId), item.channel || "")
+ }
summary.messages += 1
}
@@ -1177,6 +1221,51 @@ export const runGhlMirror = action({
return summary
}
+ try {
+ const fallbackCandidates = await ctx.runQuery(
+ api.crm.listConversationHistoryHydrationCandidates,
+ { limit: 25 }
+ )
+ for (const candidate of fallbackCandidates) {
+ if (candidate.ghlConversationId) {
+ hydrationTargets.set(
+ String(candidate.ghlConversationId),
+ candidate.channel || ""
+ )
+ }
+ }
+
+ for (const [ghlConversationId, channelHint] of Array.from(
+ hydrationTargets.entries()
+ ).slice(0, 25)) {
+ const fetched = await fetchGhlConversationMessages(config, {
+ conversationId: ghlConversationId,
+ })
+ const items = extractGhlMessages(fetched).filter(Boolean)
+ if (!items.length) {
+ continue
+ }
+
+ for (const item of items) {
+ await ctx.runMutation(api.crm.importMessage, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(item.id || item.messageId || ""),
+ payload: {
+ ...item,
+ conversationId: item.conversationId || ghlConversationId,
+ channel: item.channel || channelHint,
+ },
+ })
+ summary.hydrated += 1
+ }
+ }
+ } catch (error) {
+ await failStage("messages", error, {
+ hydrated: summary.hydrated,
+ })
+ return summary
+ }
+
try {
await updateRunning("recordings")
const previous = await ctx.runQuery(api.crm.getAdminSyncOverview, {})
@@ -1327,6 +1416,146 @@ export const repairMirroredContacts = action({
},
})
+export const hydrateConversationHistory = action({
+ args: {
+ conversationId: v.string(),
+ },
+ handler: async (ctx, args) => {
+ const config = readGhlMirrorConfig()
+ if (!config) {
+ return {
+ ok: false,
+ imported: 0,
+ message: "GHL credentials are not configured.",
+ }
+ }
+
+ const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
+ conversationId: args.conversationId,
+ })
+
+ if (!detail?.conversation?.ghlConversationId) {
+ return {
+ ok: false,
+ imported: 0,
+ message: "This conversation is not linked to GHL.",
+ }
+ }
+
+ try {
+ const fetched = await fetchGhlConversationMessages(config, {
+ conversationId: detail.conversation.ghlConversationId,
+ })
+ const items = extractGhlMessages(fetched).filter(Boolean)
+
+ let imported = 0
+ for (const item of items) {
+ await ctx.runMutation(api.crm.importMessage, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(item.id || item.messageId || ""),
+ payload: {
+ ...item,
+ conversationId:
+ item.conversationId || detail.conversation.ghlConversationId,
+ channel: item.channel || detail.conversation.channel,
+ },
+ })
+ imported += 1
+ }
+
+ return {
+ ok: true,
+ imported,
+ ghlConversationId: detail.conversation.ghlConversationId,
+ }
+ } catch (error) {
+ return {
+ ok: false,
+ imported: 0,
+ ghlConversationId: detail.conversation.ghlConversationId,
+ message: error instanceof Error ? error.message : "Failed to hydrate history.",
+ }
+ }
+ },
+})
+
+export const sendAdminConversationMessage = action({
+ args: {
+ conversationId: v.string(),
+ body: v.string(),
+ },
+ handler: async (ctx, args) => {
+ const messageBody = String(args.body || "").trim()
+ if (!messageBody) {
+ throw new Error("Message body is required.")
+ }
+
+ const config = readGhlMirrorConfig()
+ if (!config) {
+ throw new Error("GHL credentials are not configured.")
+ }
+
+ const detail = await ctx.runQuery(api.crm.getAdminConversationDetail, {
+ conversationId: args.conversationId,
+ })
+
+ if (!detail) {
+ throw new Error("Conversation not found.")
+ }
+
+ const response = await sendGhlConversationMessage(config, {
+ conversationId: detail.conversation.ghlConversationId || undefined,
+ contactId: detail.contact?.ghlContactId || undefined,
+ message: messageBody,
+ type: "SMS",
+ })
+
+ const responseMessage =
+ response?.message ||
+ response?.data?.message ||
+ response?.messages?.[0] ||
+ response?.data?.messages?.[0] ||
+ null
+
+ if (responseMessage) {
+ await ctx.runMutation(api.crm.importMessage, {
+ provider: GHL_SYNC_PROVIDER,
+ entityId: String(
+ responseMessage.id || responseMessage.messageId || Date.now()
+ ),
+ payload: {
+ ...responseMessage,
+ conversationId:
+ responseMessage.conversationId || detail.conversation.ghlConversationId,
+ contactId: responseMessage.contactId || detail.contact?.ghlContactId,
+ channel: responseMessage.channel || detail.conversation.channel || "SMS",
+ direction: responseMessage.direction || "outbound",
+ body: responseMessage.body || responseMessage.message || messageBody,
+ status: responseMessage.status || "sent",
+ },
+ })
+ } else {
+ await ctx.runMutation(api.crm.upsertMessage, {
+ conversationId: args.conversationId as any,
+ contactId: detail.contact?.id as any,
+ direction: "outbound",
+ channel:
+ detail.conversation.channel === "sms" ? "sms" : "unknown",
+ source: "ghl:send",
+ body: messageBody,
+ status: "sent",
+ sentAt: Date.now(),
+ metadata: JSON.stringify(response || {}),
+ })
+ }
+
+ return {
+ ok: true,
+ response,
+ }
+ },
+})
+
export const listAdminContacts = query({
args: {
search: v.optional(v.string()),
@@ -1647,6 +1876,7 @@ export const getAdminConversationDetail = query({
email: contact.email,
phone: contact.phone,
company: contact.company,
+ ghlContactId: contact.ghlContactId,
secondaryLine: buildContactDisplay(contact).secondaryLine,
}
: null,
diff --git a/convex/ghlMirror.ts b/convex/ghlMirror.ts
index 4f0b1c60..fead3822 100644
--- a/convex/ghlMirror.ts
+++ b/convex/ghlMirror.ts
@@ -134,6 +134,48 @@ export async function fetchGhlMessagesPage(
}
}
+export async function fetchGhlConversationMessages(
+ config: GhlMirrorConfig,
+ args: {
+ conversationId: string
+ }
+) {
+ const payload = await fetchGhlMirrorJson(
+ config,
+ `/conversations/${encodeURIComponent(args.conversationId)}/messages`
+ )
+
+ return {
+ items: Array.isArray(payload?.messages)
+ ? payload.messages
+ : Array.isArray(payload?.data?.messages)
+ ? payload.data.messages
+ : Array.isArray(payload)
+ ? payload
+ : [],
+ }
+}
+
+export async function sendGhlConversationMessage(
+ config: GhlMirrorConfig,
+ args: {
+ conversationId?: string
+ contactId?: string
+ message: string
+ type?: string
+ }
+) {
+ return await fetchGhlMirrorJson(config, "/conversations/messages", {
+ method: "POST",
+ body: JSON.stringify({
+ type: args.type || "SMS",
+ message: args.message,
+ conversationId: args.conversationId,
+ contactId: args.contactId,
+ }),
+ })
+}
+
export async function fetchGhlCallLogsPage(
config: GhlMirrorConfig,
args?: {
diff --git a/lib/server/admin-auth.ts b/lib/server/admin-auth.ts
index 56cf4edd..58de5ba3 100644
--- a/lib/server/admin-auth.ts
+++ b/lib/server/admin-auth.ts
@@ -53,6 +53,17 @@ function hashAdminSessionToken(token: string) {
return createHash("sha256").update(token).digest("hex")
}
+function readCookieFromHeader(cookieHeader: string, name: string) {
+ const cookies = cookieHeader.split(";")
+ for (const entry of cookies) {
+ const [cookieName, ...rest] = entry.trim().split("=")
+ if (cookieName === name) {
+ return rest.join("=")
+ }
+ }
+ return ""
+}
+
export function isAdminCredentialLoginConfigured() {
return Boolean(
isAdminUiEnabled() &&
@@ -125,6 +136,18 @@ export async function validateAdminSession(rawToken?: string | null) {
}
}
+export async function requireAdminSession(request: Request) {
+ const rawToken = readCookieFromHeader(
+ request.headers.get("cookie") || "",
+ ADMIN_SESSION_COOKIE
+ )
+ const session = await validateAdminSession(rawToken || null)
+ if (!session?.user) {
+ return null
+ }
+ return session.user
+}
+
export async function getAdminUserFromCookies() {
if (!isAdminUiEnabled()) {
return null