Rocky_Mountain_Vending/convex/voiceSessions.ts

625 lines
18 KiB
TypeScript

// @ts-nocheck
import { mutation, query } from "./_generated/server"
import { v } from "convex/values"
import {
ensureConversationParticipant,
upsertCallArtifactRecord,
upsertContactRecord,
upsertConversationRecord,
upsertMessageRecord,
} from "./crmModel"
async function syncPhoneConversation(ctx, session, overrides = {}) {
const contact = await upsertContactRecord(ctx, {
firstName: "Phone",
lastName: "Caller",
phone: session.participantIdentity,
livekitIdentity: session.participantIdentity,
source: "phone-agent",
status: "lead",
lastActivityAt:
overrides.lastActivityAt ?? session.updatedAt ?? session.startedAt ?? Date.now(),
})
const conversation = await upsertConversationRecord(ctx, {
contactId: contact?._id,
title: `Phone call ${session.roomName}`,
channel: "call",
source: "phone-agent",
status:
session.callStatus === "completed"
? "closed"
: session.callStatus === "failed"
? "archived"
: "open",
direction: "inbound",
startedAt: session.startedAt,
endedAt: session.endedAt,
lastMessageAt: overrides.lastActivityAt ?? session.updatedAt ?? session.startedAt,
lastMessagePreview:
overrides.lastMessagePreview ?? session.summaryText ?? session.handoffReason,
summaryText: session.summaryText,
livekitRoomName: session.roomName,
voiceSessionId: session._id,
})
await ensureConversationParticipant(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
role: "contact",
displayName: contact ? `${contact.firstName} ${contact.lastName}`.trim() : "Phone caller",
phone: contact?.phone || session.participantIdentity,
email: contact?.email,
})
await ctx.db.patch(session._id, {
contactId: contact?._id,
conversationId: conversation?._id,
updatedAt: Date.now(),
})
return {
contact,
conversation,
}
}
export const getByRoom = query({
args: {
roomName: v.string(),
},
handler: async (ctx, args) => {
return await ctx.db
.query("voiceSessions")
.withIndex("by_roomName", (q) => q.eq("roomName", args.roomName))
.unique()
},
})
export const getSessionWithTurnsBySessionId = query({
args: {
sessionId: v.string(),
},
handler: async (ctx, args) => {
const session = await ctx.db.get(args.sessionId as any)
if (!session) {
return null
}
const turns = await ctx.db
.query("voiceTranscriptTurns")
.withIndex("by_sessionId", (q) => q.eq("sessionId", session._id))
.collect()
turns.sort((a, b) => a.createdAt - b.createdAt)
return { session, turns }
},
})
export const getSessionWithTurnsByRoom = query({
args: {
roomName: v.string(),
},
handler: async (ctx, args) => {
const session = await ctx.db
.query("voiceSessions")
.withIndex("by_roomName", (q) => q.eq("roomName", args.roomName))
.unique()
if (!session) {
return null
}
const turns = await ctx.db
.query("voiceTranscriptTurns")
.withIndex("by_sessionId", (q) => q.eq("sessionId", session._id))
.collect()
turns.sort((a, b) => a.createdAt - b.createdAt)
return { session, turns }
},
})
export const createSession = mutation({
args: {
roomName: v.string(),
participantIdentity: v.string(),
siteUrl: v.optional(v.string()),
pathname: v.optional(v.string()),
pageUrl: v.optional(v.string()),
source: v.optional(v.string()),
metadata: v.optional(v.string()),
startedAt: v.optional(v.number()),
recordingDisclosureAt: v.optional(v.number()),
callStatus: v.optional(
v.union(v.literal("started"), v.literal("completed"), v.literal("failed"))
),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
},
handler: async (ctx, args) => {
const now = args.startedAt ?? Date.now()
const id = await ctx.db.insert("voiceSessions", {
...args,
startedAt: now,
callStatus: args.callStatus,
transcriptTurnCount: 0,
leadOutcome: "none",
handoffRequested: false,
notificationStatus: "pending",
createdAt: now,
updatedAt: now,
})
const session = await ctx.db.get(id)
if (session) {
await syncPhoneConversation(ctx, session)
}
return id
},
})
export const upsertPhoneCallSession = mutation({
args: {
roomName: v.string(),
participantIdentity: v.string(),
siteUrl: v.optional(v.string()),
pathname: v.optional(v.string()),
pageUrl: v.optional(v.string()),
source: v.optional(v.string()),
metadata: v.optional(v.string()),
startedAt: v.optional(v.number()),
recordingDisclosureAt: v.optional(v.number()),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
},
handler: async (ctx, args) => {
const now = args.startedAt ?? Date.now()
const existing = await ctx.db
.query("voiceSessions")
.withIndex("by_roomName", (q) => q.eq("roomName", args.roomName))
.unique()
if (existing) {
await ctx.db.patch(existing._id, {
participantIdentity: args.participantIdentity,
siteUrl: args.siteUrl,
pathname: args.pathname,
pageUrl: args.pageUrl,
source: args.source,
metadata: args.metadata,
startedAt: existing.startedAt || now,
recordingDisclosureAt:
args.recordingDisclosureAt ?? existing.recordingDisclosureAt,
recordingStatus: args.recordingStatus ?? existing.recordingStatus,
callStatus: existing.callStatus || "started",
notificationStatus: existing.notificationStatus || "pending",
updatedAt: Date.now(),
})
const updated = await ctx.db.get(existing._id)
if (updated) {
await syncPhoneConversation(ctx, updated)
}
return updated
}
const id = await ctx.db.insert("voiceSessions", {
roomName: args.roomName,
participantIdentity: args.participantIdentity,
siteUrl: args.siteUrl,
pathname: args.pathname,
pageUrl: args.pageUrl,
source: args.source,
metadata: args.metadata,
startedAt: now,
recordingDisclosureAt: args.recordingDisclosureAt,
recordingStatus: args.recordingStatus,
callStatus: "started",
transcriptTurnCount: 0,
leadOutcome: "none",
handoffRequested: false,
notificationStatus: "pending",
createdAt: now,
updatedAt: now,
})
const session = await ctx.db.get(id)
if (session) {
await syncPhoneConversation(ctx, session)
}
return session
},
})
export const addTranscriptTurn = mutation({
args: {
sessionId: v.id("voiceSessions"),
roomName: v.string(),
participantIdentity: v.string(),
role: v.union(
v.literal("user"),
v.literal("assistant"),
v.literal("system")
),
text: v.string(),
kind: v.optional(v.string()),
isFinal: v.optional(v.boolean()),
language: v.optional(v.string()),
source: v.optional(v.string()),
createdAt: v.optional(v.number()),
},
handler: async (ctx, args) => {
const createdAt = args.createdAt ?? Date.now()
const turnId = await ctx.db.insert("voiceTranscriptTurns", {
...args,
text: args.text.trim(),
createdAt,
})
const session = await ctx.db.get(args.sessionId)
if (session) {
await ctx.db.patch(args.sessionId, {
transcriptTurnCount: (session.transcriptTurnCount ?? 0) + 1,
agentAnsweredAt:
args.role === "assistant" && !session.agentAnsweredAt
? createdAt
: session.agentAnsweredAt,
updatedAt: Date.now(),
})
const { contact, conversation } = await syncPhoneConversation(ctx, {
...session,
updatedAt: createdAt,
}, {
lastActivityAt: createdAt,
lastMessagePreview: args.text,
})
await upsertMessageRecord(ctx, {
conversationId: conversation._id,
contactId: args.role === "user" ? contact?._id : undefined,
direction:
args.role === "user"
? "inbound"
: args.role === "assistant"
? "outbound"
: "system",
channel: "call",
source: args.source || "phone-agent",
messageType: args.kind || "transcript",
body: args.text,
sentAt: createdAt,
voiceTranscriptTurnId: turnId,
voiceSessionId: args.sessionId,
livekitRoomName: args.roomName,
})
}
return turnId
},
})
export const linkPhoneCallLead = mutation({
args: {
sessionId: v.id("voiceSessions"),
linkedLeadId: v.optional(v.string()),
leadOutcome: v.optional(
v.union(
v.literal("none"),
v.literal("contact"),
v.literal("requestMachine")
)
),
handoffRequested: v.optional(v.boolean()),
handoffReason: v.optional(v.string()),
},
handler: async (ctx, args) => {
await ctx.db.patch(args.sessionId, {
linkedLeadId: args.linkedLeadId,
leadOutcome: args.leadOutcome,
handoffRequested: args.handoffRequested,
handoffReason: args.handoffReason,
updatedAt: Date.now(),
})
const session = await ctx.db.get(args.sessionId)
if (session) {
const { conversation } = await syncPhoneConversation(ctx, session)
if (args.linkedLeadId || args.leadOutcome || args.handoffReason) {
await ctx.db.patch(conversation._id, {
summaryText:
session.summaryText ||
args.handoffReason ||
conversation.summaryText,
updatedAt: Date.now(),
})
}
}
return session
},
})
export const updateRecording = mutation({
args: {
sessionId: v.id("voiceSessions"),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
recordingId: v.optional(v.string()),
recordingUrl: v.optional(v.string()),
recordingError: v.optional(v.string()),
},
handler: async (ctx, args) => {
await ctx.db.patch(args.sessionId, {
recordingStatus: args.recordingStatus,
recordingId: args.recordingId,
recordingUrl: args.recordingUrl,
recordingError: args.recordingError,
updatedAt: Date.now(),
})
const session = await ctx.db.get(args.sessionId)
if (session) {
const { contact, conversation } = await syncPhoneConversation(ctx, session)
await upsertCallArtifactRecord(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
source: "phone-agent",
recordingId: args.recordingId,
recordingUrl: args.recordingUrl,
recordingStatus: args.recordingStatus,
voiceSessionId: session._id,
livekitRoomName: session.roomName,
})
}
return session
},
})
export const completeSession = mutation({
args: {
sessionId: v.id("voiceSessions"),
endedAt: v.optional(v.number()),
callStatus: v.optional(
v.union(v.literal("started"), v.literal("completed"), v.literal("failed"))
),
recordingStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("starting"),
v.literal("recording"),
v.literal("completed"),
v.literal("failed")
)
),
recordingId: v.optional(v.string()),
recordingUrl: v.optional(v.string()),
recordingError: v.optional(v.string()),
summaryText: v.optional(v.string()),
notificationStatus: v.optional(
v.union(
v.literal("pending"),
v.literal("sent"),
v.literal("failed"),
v.literal("disabled")
)
),
notificationSentAt: v.optional(v.number()),
notificationError: v.optional(v.string()),
},
handler: async (ctx, args) => {
const endedAt = args.endedAt ?? Date.now()
await ctx.db.patch(args.sessionId, {
endedAt,
callStatus: args.callStatus,
recordingStatus: args.recordingStatus,
recordingId: args.recordingId,
recordingUrl: args.recordingUrl,
recordingError: args.recordingError,
summaryText: args.summaryText,
notificationStatus: args.notificationStatus,
notificationSentAt: args.notificationSentAt,
notificationError: args.notificationError,
updatedAt: endedAt,
})
const session = await ctx.db.get(args.sessionId)
if (session) {
const { contact, conversation } = await syncPhoneConversation(ctx, session, {
lastActivityAt: endedAt,
lastMessagePreview: args.summaryText || session.summaryText,
})
await upsertCallArtifactRecord(ctx, {
conversationId: conversation._id,
contactId: contact?._id,
source: "phone-agent",
recordingId: args.recordingId,
recordingUrl: args.recordingUrl,
recordingStatus: args.recordingStatus,
transcriptionText: args.summaryText,
durationMs:
typeof session.startedAt === "number"
? Math.max(0, endedAt - session.startedAt)
: undefined,
startedAt: session.startedAt,
endedAt,
voiceSessionId: session._id,
livekitRoomName: session.roomName,
})
}
return session
},
})
function normalizePhoneCallForAdmin(session: any) {
const durationMs =
typeof session.endedAt === "number" && typeof session.startedAt === "number"
? Math.max(0, session.endedAt - session.startedAt)
: null
return {
id: session._id,
roomName: session.roomName,
participantIdentity: session.participantIdentity,
pathname: session.pathname,
pageUrl: session.pageUrl,
source: session.source,
startedAt: session.startedAt,
endedAt: session.endedAt,
durationMs,
callStatus: session.callStatus || "started",
transcriptTurnCount: session.transcriptTurnCount ?? 0,
answered: Boolean(session.agentAnsweredAt),
agentAnsweredAt: session.agentAnsweredAt,
linkedLeadId: session.linkedLeadId,
leadOutcome: session.leadOutcome || "none",
handoffRequested: Boolean(session.handoffRequested),
handoffReason: session.handoffReason,
summaryText: session.summaryText,
notificationStatus: session.notificationStatus || "pending",
notificationSentAt: session.notificationSentAt,
notificationError: session.notificationError,
recordingStatus: session.recordingStatus,
recordingUrl: session.recordingUrl,
recordingError: session.recordingError,
}
}
export const listAdminPhoneCalls = query({
args: {
search: v.optional(v.string()),
status: v.optional(
v.union(v.literal("started"), v.literal("completed"), v.literal("failed"))
),
page: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const page = Math.max(1, args.page ?? 1)
const limit = Math.min(100, Math.max(1, args.limit ?? 25))
const search = String(args.search || "")
.trim()
.toLowerCase()
const sessions = await ctx.db
.query("voiceSessions")
.withIndex("by_source_startedAt", (q) => q.eq("source", "phone-agent"))
.collect()
const filtered = sessions.filter((session) => {
if (args.status && (session.callStatus || "started") !== args.status) {
return false
}
if (!search) {
return true
}
const haystack = [
session.roomName,
session.participantIdentity,
session.pathname,
session.linkedLeadId,
session.summaryText,
session.handoffReason,
]
.map((value) => String(value || "").toLowerCase())
.join("\n")
return haystack.includes(search)
})
filtered.sort((a, b) => (b.startedAt || 0) - (a.startedAt || 0))
const total = filtered.length
const items = filtered
.slice((page - 1) * limit, page * limit)
.map(normalizePhoneCallForAdmin)
return {
items,
pagination: {
page,
limit,
total,
totalPages: Math.max(1, Math.ceil(total / limit)),
},
}
},
})
export const getAdminPhoneCallDetail = query({
args: {
callId: v.string(),
},
handler: async (ctx, args) => {
let session = await ctx.db.get(args.callId as any)
if (!session) {
session = await ctx.db
.query("voiceSessions")
.withIndex("by_roomName", (q) => q.eq("roomName", args.callId))
.unique()
}
if (!session || session.source !== "phone-agent") {
return null
}
const turns = await ctx.db
.query("voiceTranscriptTurns")
.withIndex("by_sessionId", (q) => q.eq("sessionId", session._id))
.collect()
turns.sort((a, b) => a.createdAt - b.createdAt)
const linkedLead = session.linkedLeadId
? await ctx.db.get(session.linkedLeadId as any)
: null
return {
call: normalizePhoneCallForAdmin(session),
linkedLead: linkedLead
? {
id: linkedLead._id,
type: linkedLead.type,
status: linkedLead.status,
firstName: linkedLead.firstName,
lastName: linkedLead.lastName,
email: linkedLead.email,
phone: linkedLead.phone,
company: linkedLead.company,
intent: linkedLead.intent,
message: linkedLead.message,
createdAt: linkedLead.createdAt,
}
: null,
turns: turns.map((turn) => ({
id: turn._id,
role: turn.role,
text: turn.text,
source: turn.source,
kind: turn.kind,
createdAt: turn.createdAt,
})),
}
},
})