import { fileURLToPath } from "node:url" import { config as loadEnv } from "dotenv" import { AutoSubscribe, cli, defineAgent, llm, type JobContext, voice, WorkerOptions, } from "@livekit/agents" import * as xai from "@livekit/agents-plugin-xai" import { z } from "zod" import { SMS_CONSENT_VERSION } from "../lib/sms-compliance" import { addVoiceTranscriptTurn, completeVoiceSession, createVoiceSessionRecord, getVoicePersistenceServices, startVoiceRecording, stopVoiceRecording, updateVoiceRecording, } from "../lib/voice-assistant/persistence" import { buildHandoffMessage, VOICE_ASSISTANT_SYSTEM_PROMPT, } from "../lib/voice-assistant/prompt" import { getVoiceAssistantServerEnv } from "../lib/voice-assistant/server" import { VOICE_ASSISTANT_AGENT_NAME, VOICE_ASSISTANT_SOURCE, XAI_REALTIME_VOICE, type AssistantContactLeadPayload, type AssistantMachineRequestPayload, type VoiceAssistantVisitorMetadata, } from "../lib/voice-assistant/types" loadEnv({ path: ".env.local" }) loadEnv({ path: ".env.voice-agent.local", override: true }) const env = getVoiceAssistantServerEnv() const persistenceServices = getVoicePersistenceServices({ livekitUrl: env.livekitUrl, livekitApiKey: env.livekitApiKey, livekitApiSecret: env.livekitApiSecret, }) type WorkerUserData = { siteUrl: string pathname: string pageUrl: string startedAt: string } type ApiSuccess = { success: boolean message?: string error?: string } type VoiceSessionPersistenceState = { recordingId: string | null recordingStatus: "pending" | "starting" | "recording" | "completed" | "failed" recordingUrl: string | null sessionId: string | null } function serializeError(error: unknown) { if (error instanceof Error) { return { name: error.name, message: error.message, } } return { message: String(error), } } function truncateText(value: string, maxLength = 160) { const normalized = value.replace(/\s+/g, " ").trim() if (normalized.length <= maxLength) { return normalized } return `${normalized.slice(0, maxLength - 1)}...` } function logWorker(event: string, payload: Record = {}) { console.info("[voice-agent]", event, payload) } function parseVisitorMetadata( metadata: string | undefined ): VoiceAssistantVisitorMetadata { if (!metadata) { return { source: VOICE_ASSISTANT_SOURCE, pathname: "/", pageUrl: env.siteUrl, startedAt: new Date().toISOString(), } } try { const parsed = JSON.parse( metadata ) as Partial return { source: VOICE_ASSISTANT_SOURCE, pathname: typeof parsed.pathname === "string" && parsed.pathname ? parsed.pathname : "/", pageUrl: typeof parsed.pageUrl === "string" && parsed.pageUrl ? parsed.pageUrl : env.siteUrl, startedAt: typeof parsed.startedAt === "string" && parsed.startedAt ? parsed.startedAt : new Date().toISOString(), } } catch { return { source: VOICE_ASSISTANT_SOURCE, pathname: "/", pageUrl: env.siteUrl, startedAt: new Date().toISOString(), } } } function hasExplicitConfirmation(confirmationMessage: string) { return /\b(yes|yeah|yep|correct|confirm|confirmed|go ahead|submit|sounds good|please do)\b/i.test( confirmationMessage ) } function assertExplicitConfirmation(confirmationMessage: string) { if (!hasExplicitConfirmation(confirmationMessage)) { throw new Error("The visitor has not clearly confirmed the submission yet.") } } async function postJson( pathname: string, payload: TPayload ) { logWorker("api_request_start", { pathname }) const response = await fetch(new URL(pathname, env.siteUrl), { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(payload), }) const data = (await response.json().catch(() => ({}))) as ApiSuccess if (!response.ok || !data.success) { throw new Error( data.error || data.message || `Request to ${pathname} failed.` ) } logWorker("api_request_success", { pathname }) return data } function buildContactPayload( args: AssistantContactLeadPayload, userData: WorkerUserData ) { return { ...args, source: VOICE_ASSISTANT_SOURCE, page: userData.pathname, timestamp: new Date().toISOString(), url: userData.pageUrl, } } function buildMachineRequestPayload( args: AssistantMachineRequestPayload, userData: WorkerUserData ) { return { ...args, machineType: args.machineType .split(",") .map((value) => value.trim()) .filter(Boolean) .join(","), source: VOICE_ASSISTANT_SOURCE, page: userData.pathname, timestamp: new Date().toISOString(), url: userData.pageUrl, } } function buildConsentMetadata(userData: WorkerUserData) { return { consentVersion: SMS_CONSENT_VERSION, consentCapturedAt: new Date().toISOString(), consentSourcePage: userData.pathname, } } function extractConversationItemText(item: unknown): string { if (typeof item === "string") { return item.replace(/\s+/g, " ").trim() } if (!item || typeof item !== "object") { return "" } if ("text" in item && typeof item.text === "string") { return item.text.replace(/\s+/g, " ").trim() } if ("transcript" in item && typeof item.transcript === "string") { return item.transcript.replace(/\s+/g, " ").trim() } if ("content" in item) { const { content } = item as { content?: unknown } if (Array.isArray(content)) { return content .map((part) => extractConversationItemText(part)) .filter(Boolean) .join(" ") .replace(/\s+/g, " ") .trim() } if (typeof content === "string") { return content.replace(/\s+/g, " ").trim() } } return "" } function createTools() { return { submit_contact_lead: llm.tool({ description: "Submit a general contact or callback request after the visitor clearly confirms the details.", parameters: z.object({ firstName: z.string().min(1).describe("Visitor first name"), lastName: z.string().min(1).describe("Visitor last name"), email: z.string().email().describe("Visitor email address"), phone: z.string().min(7).describe("Visitor phone number"), company: z .string() .optional() .describe("Visitor company name if they shared one"), message: z .string() .min(10) .describe("Short summary of what the visitor needs"), serviceTextConsent: z .literal(true) .describe( "Set to true only if the visitor explicitly agrees to service-related SMS follow-up." ), marketingTextConsent: z .boolean() .optional() .describe( "Optional marketing SMS consent when the visitor clearly opts in." ), confirmed: z .literal(true) .describe( "Set to true only after the visitor explicitly confirms submission." ), confirmationMessage: z .string() .min(2) .describe( "The visitor's confirmation words, such as 'yes, go ahead and submit that'." ), }), execute: async ( { confirmed: _confirmed, confirmationMessage, ...args }, { ctx } ) => { assertExplicitConfirmation(confirmationMessage) const userData = ctx.userData as WorkerUserData const payload = buildContactPayload( { ...args, ...buildConsentMetadata(userData), }, userData ) logWorker("tool_start", { tool: "submit_contact_lead", pathname: userData.pathname, email: args.email, }) try { const result = await postJson("/api/contact", payload) logWorker("tool_success", { tool: "submit_contact_lead", pathname: userData.pathname, email: args.email, }) return { success: true, message: result.message || "Thanks, your contact request has been sent to Rocky Mountain Vending.", } } catch (error) { console.error("[voice-agent] tool_failure", { tool: "submit_contact_lead", pathname: userData.pathname, email: args.email, error: serializeError(error), }) throw error } }, }), submit_machine_request: llm.tool({ description: "Submit a free machine placement or machine request lead after the visitor explicitly confirms and agrees to service-related follow-up.", parameters: z.object({ firstName: z.string().min(1).describe("Visitor first name"), lastName: z.string().min(1).describe("Visitor last name"), email: z.string().email().describe("Visitor email address"), phone: z.string().min(7).describe("Visitor phone number"), company: z.string().min(1).describe("Business or organization name"), employeeCount: z .string() .min(1) .describe( "Approximate employee count or audience size as a plain number string" ), machineType: z .string() .min(1) .describe( "Desired machine type or a comma-separated list like snack, beverage, combo" ), machineCount: z .string() .min(1) .describe("Desired number of machines as a plain number string"), message: z .string() .optional() .describe("Optional visitor notes or placement details"), serviceTextConsent: z .literal(true) .describe( "Set to true only if the visitor explicitly agrees to service-related SMS follow-up." ), marketingTextConsent: z .boolean() .optional() .describe( "Optional marketing SMS consent when the visitor clearly opts in." ), confirmed: z .literal(true) .describe( "Set to true only after the visitor explicitly confirms submission." ), confirmationMessage: z .string() .min(2) .describe( "The visitor's confirmation words, such as 'yes, please submit it'." ), }), execute: async ( { confirmed: _confirmed, confirmationMessage, ...args }, { ctx } ) => { assertExplicitConfirmation(confirmationMessage) const userData = ctx.userData as WorkerUserData const payload = buildMachineRequestPayload( { ...args, ...buildConsentMetadata(userData), }, userData ) logWorker("tool_start", { tool: "submit_machine_request", pathname: userData.pathname, email: args.email, company: args.company, }) try { const result = await postJson("/api/request-machine", payload) logWorker("tool_success", { tool: "submit_machine_request", pathname: userData.pathname, email: args.email, company: args.company, }) return { success: true, message: result.message || "Thanks, your machine request has been sent to Rocky Mountain Vending.", } } catch (error) { console.error("[voice-agent] tool_failure", { tool: "submit_machine_request", pathname: userData.pathname, email: args.email, company: args.company, error: serializeError(error), }) throw error } }, }), handoff_to_human: llm.tool({ description: "Use this when the visitor wants a human right away or asks for something uncertain, legal, pricing-related, or highly specific.", parameters: z.object({ reason: z .string() .min(2) .describe("Short reason for the human handoff"), }), execute: async ({ reason }) => { logWorker("tool_success", { tool: "handoff_to_human", reason: truncateText(reason, 120), }) return { success: true, message: `${buildHandoffMessage()} Reason: ${reason}`, } }, }), } } async function startSession(ctx: JobContext) { const roomName = ctx.room.name || `rmv-voice-${ctx.workerId}` logWorker("job_connect_start", { room: roomName, workerId: ctx.workerId, }) await ctx.connect(undefined, AutoSubscribe.AUDIO_ONLY) logWorker("job_connect_ready", { room: roomName, workerId: ctx.workerId, }) const participant = await ctx.waitForParticipant() const visitor = parseVisitorMetadata(participant.metadata) const userData: WorkerUserData = { siteUrl: env.siteUrl, pathname: visitor.pathname, pageUrl: visitor.pageUrl, startedAt: visitor.startedAt, } const persistenceState: VoiceSessionPersistenceState = { sessionId: null, recordingId: null, recordingStatus: persistenceServices.recording ? "pending" : "failed", recordingUrl: null, } logWorker("participant_joined", { identity: participant.identity, room: ctx.room.name, pathname: userData.pathname, pageUrl: userData.pageUrl, }) ctx.room.on("trackPublished", (publication, remoteParticipant) => { if (remoteParticipant.identity !== participant.identity) { return } try { publication.setSubscribed(true) logWorker("remote_track_subscription_requested", { participantIdentity: remoteParticipant.identity, source: publication.source, kind: publication.kind, trackSid: publication.sid, }) } catch (error) { console.error("[voice-agent] remote_track_subscription_failed", { participantIdentity: remoteParticipant.identity, source: publication.source, kind: publication.kind, trackSid: publication.sid, error: serializeError(error), }) } logWorker("remote_track_published", { participantIdentity: remoteParticipant.identity, source: publication.source, kind: publication.kind, trackSid: publication.sid, }) }) ctx.room.on("trackSubscribed", (_track, publication, remoteParticipant) => { if (remoteParticipant.identity !== participant.identity) { return } logWorker("remote_track_subscribed", { participantIdentity: remoteParticipant.identity, source: publication.source, kind: publication.kind, trackSid: publication.sid, }) }) try { const sessionId = await createVoiceSessionRecord(persistenceServices, { roomName, participantIdentity: participant.identity, siteUrl: userData.siteUrl, pathname: userData.pathname, pageUrl: userData.pageUrl, source: VOICE_ASSISTANT_SOURCE, metadata: JSON.stringify(visitor), recordingDisclosureAt: Date.now(), recordingStatus: persistenceServices.recording ? "pending" : undefined, startedAt: Date.parse(userData.startedAt) || Date.now(), }) persistenceState.sessionId = typeof sessionId === "string" ? sessionId : null logWorker("voice_session_persisted", { room: ctx.room.name, participantIdentity: participant.identity, persisted: Boolean(persistenceState.sessionId), }) } catch (error) { console.error("[voice-agent] session_persistence_failed", { room: roomName, participantIdentity: participant.identity, error: serializeError(error), }) } const session = new voice.AgentSession({ llm: new xai.realtime.RealtimeModel({ apiKey: env.xaiApiKey, model: env.realtimeModel, voice: XAI_REALTIME_VOICE, }), userData, turnHandling: { interruption: { enabled: true, }, }, }) session.on(voice.AgentSessionEventTypes.AgentStateChanged, (event) => { logWorker("agent_state_changed", { oldState: event.oldState, newState: event.newState, }) }) session.on(voice.AgentSessionEventTypes.UserStateChanged, (event) => { logWorker("user_state_changed", { oldState: event.oldState, newState: event.newState, }) }) session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (event) => { logWorker("user_input_transcribed", { isFinal: event.isFinal, language: event.language, transcript: truncateText(event.transcript, 160), }) if (!event.isFinal || !persistenceState.sessionId) { return } void addVoiceTranscriptTurn(persistenceServices, { sessionId: persistenceState.sessionId, roomName, participantIdentity: participant.identity, role: "user", text: event.transcript, kind: "transcription", isFinal: event.isFinal, language: event.language ?? undefined, }).catch((error) => { console.error("[voice-agent] transcript_persist_failed", { role: "user", error: serializeError(error), }) }) }) session.on(voice.AgentSessionEventTypes.ConversationItemAdded, (event) => { const role = "role" in event.item ? event.item.role : "unknown" const text = extractConversationItemText(event.item) logWorker("conversation_item_added", { role, preview: truncateText(JSON.stringify(event.item), 180), }) if (role !== "assistant" || !text || !persistenceState.sessionId) { return } void addVoiceTranscriptTurn(persistenceServices, { sessionId: persistenceState.sessionId, roomName, participantIdentity: participant.identity, role: "assistant", text, kind: "response", source: VOICE_ASSISTANT_SOURCE, }).catch((error) => { console.error("[voice-agent] transcript_persist_failed", { role: "assistant", error: serializeError(error), }) }) }) session.on(voice.AgentSessionEventTypes.FunctionToolsExecuted, (event) => { logWorker("function_tools_executed", { toolNames: event.functionCalls.map((call) => call.name), outputs: event.functionCallOutputs.length, }) }) session.on(voice.AgentSessionEventTypes.SpeechCreated, (event) => { logWorker("speech_created", { source: event.source, userInitiated: event.userInitiated, }) }) session.on(voice.AgentSessionEventTypes.MetricsCollected, (event) => { logWorker("metrics_collected", { metricType: event.metrics.type, }) }) session.on(voice.AgentSessionEventTypes.Error, (event) => { console.error("[voice-agent] session_error", { source: event.source?.constructor?.name || "unknown", error: serializeError(event.error), }) }) session.on(voice.AgentSessionEventTypes.Close, (event) => { logWorker("session_closed", { reason: event.reason, error: event.error ? serializeError(event.error) : null, }) void (async () => { try { if (persistenceState.recordingId) { await stopVoiceRecording( persistenceServices, persistenceState.recordingId ) } } catch (error) { console.error("[voice-agent] recording_stop_failed", { recordingId: persistenceState.recordingId, error: serializeError(error), }) } if (!persistenceState.sessionId) { return } try { await completeVoiceSession(persistenceServices, { sessionId: persistenceState.sessionId, endedAt: Date.now(), recordingId: persistenceState.recordingId ?? undefined, recordingStatus: persistenceState.recordingId ? "completed" : persistenceState.recordingStatus, recordingUrl: persistenceState.recordingUrl ?? undefined, recordingError: event.error ? serializeError(event.error).message : undefined, }) } catch (error) { console.error("[voice-agent] session_complete_failed", { sessionId: persistenceState.sessionId, error: serializeError(error), }) } })() }) const agent = new voice.Agent({ instructions: VOICE_ASSISTANT_SYSTEM_PROMPT, tools: createTools(), }) logWorker("session_starting", { room: roomName, participantIdentity: participant.identity, pathname: userData.pathname, model: env.realtimeModel, }) await session.start({ agent, room: ctx.room }) logWorker("session_started", { room: roomName, participantIdentity: participant.identity, }) if (persistenceServices.recording && persistenceState.sessionId) { try { persistenceState.recordingStatus = "starting" await updateVoiceRecording(persistenceServices, { sessionId: persistenceState.sessionId, recordingStatus: "starting", }) const recording = await startVoiceRecording(persistenceServices, roomName) if (recording) { persistenceState.recordingId = recording.recordingId persistenceState.recordingStatus = recording.recordingStatus persistenceState.recordingUrl = recording.recordingUrl await updateVoiceRecording(persistenceServices, { sessionId: persistenceState.sessionId, recordingStatus: recording.recordingStatus, recordingId: recording.recordingId, recordingUrl: recording.recordingUrl, }) logWorker("recording_started", { room: roomName, participantIdentity: participant.identity, recordingId: recording.recordingId, recordingUrl: recording.recordingUrl, }) } } catch (error) { persistenceState.recordingStatus = "failed" console.error("[voice-agent] recording_start_failed", { room: roomName, participantIdentity: participant.identity, error: serializeError(error), }) await updateVoiceRecording(persistenceServices, { sessionId: persistenceState.sessionId, recordingStatus: "failed", recordingError: error instanceof Error ? error.message : String(error), }).catch((persistError) => { console.error("[voice-agent] recording_failure_persist_failed", { error: serializeError(persistError), }) }) } } } export default defineAgent({ entry: async (ctx) => { await startSession(ctx) }, }) if (process.argv[1] === fileURLToPath(import.meta.url)) { cli.runApp( new WorkerOptions({ agent: fileURLToPath(import.meta.url), agentName: VOICE_ASSISTANT_AGENT_NAME, wsURL: env.livekitUrl, apiKey: env.livekitApiKey, apiSecret: env.livekitApiSecret, }) ) }