Rocky_Mountain_Vending/livekit-agent/worker.ts

722 lines
22 KiB
TypeScript

import { fileURLToPath } from "node:url"
import { config as loadEnv } from "dotenv"
import { AutoSubscribe, cli, defineAgent, llm, type JobContext, voice, WorkerOptions } from "@livekit/agents"
import * as xai from "@livekit/agents-plugin-xai"
import { z } from "zod"
import { SMS_CONSENT_VERSION } from "../lib/sms-compliance"
import {
addVoiceTranscriptTurn,
completeVoiceSession,
createVoiceSessionRecord,
getVoicePersistenceServices,
startVoiceRecording,
stopVoiceRecording,
updateVoiceRecording,
} from "../lib/voice-assistant/persistence"
import { buildHandoffMessage, VOICE_ASSISTANT_SYSTEM_PROMPT } from "../lib/voice-assistant/prompt"
import { getVoiceAssistantServerEnv } from "../lib/voice-assistant/server"
import {
VOICE_ASSISTANT_AGENT_NAME,
VOICE_ASSISTANT_SOURCE,
XAI_REALTIME_VOICE,
type AssistantContactLeadPayload,
type AssistantMachineRequestPayload,
type VoiceAssistantVisitorMetadata,
} from "../lib/voice-assistant/types"
loadEnv({ path: ".env.local" })
loadEnv({ path: ".env.voice-agent.local", override: true })
const env = getVoiceAssistantServerEnv()
const persistenceServices = getVoicePersistenceServices({
livekitUrl: env.livekitUrl,
livekitApiKey: env.livekitApiKey,
livekitApiSecret: env.livekitApiSecret,
})
type WorkerUserData = {
siteUrl: string
pathname: string
pageUrl: string
startedAt: string
}
type ApiSuccess = {
success: boolean
message?: string
error?: string
}
type VoiceSessionPersistenceState = {
recordingId: string | null
recordingStatus: "pending" | "starting" | "recording" | "completed" | "failed"
recordingUrl: string | null
sessionId: string | null
}
function serializeError(error: unknown) {
if (error instanceof Error) {
return {
name: error.name,
message: error.message,
}
}
return {
message: String(error),
}
}
function truncateText(value: string, maxLength = 160) {
const normalized = value.replace(/\s+/g, " ").trim()
if (normalized.length <= maxLength) {
return normalized
}
return `${normalized.slice(0, maxLength - 1)}...`
}
function logWorker(event: string, payload: Record<string, unknown> = {}) {
console.info("[voice-agent]", event, payload)
}
function parseVisitorMetadata(metadata: string | undefined): VoiceAssistantVisitorMetadata {
if (!metadata) {
return {
source: VOICE_ASSISTANT_SOURCE,
pathname: "/",
pageUrl: env.siteUrl,
startedAt: new Date().toISOString(),
}
}
try {
const parsed = JSON.parse(metadata) as Partial<VoiceAssistantVisitorMetadata>
return {
source: VOICE_ASSISTANT_SOURCE,
pathname: typeof parsed.pathname === "string" && parsed.pathname ? parsed.pathname : "/",
pageUrl: typeof parsed.pageUrl === "string" && parsed.pageUrl ? parsed.pageUrl : env.siteUrl,
startedAt: typeof parsed.startedAt === "string" && parsed.startedAt ? parsed.startedAt : new Date().toISOString(),
}
} catch {
return {
source: VOICE_ASSISTANT_SOURCE,
pathname: "/",
pageUrl: env.siteUrl,
startedAt: new Date().toISOString(),
}
}
}
function hasExplicitConfirmation(confirmationMessage: string) {
return /\b(yes|yeah|yep|correct|confirm|confirmed|go ahead|submit|sounds good|please do)\b/i.test(
confirmationMessage,
)
}
function assertExplicitConfirmation(confirmationMessage: string) {
if (!hasExplicitConfirmation(confirmationMessage)) {
throw new Error("The visitor has not clearly confirmed the submission yet.")
}
}
async function postJson<TPayload extends object>(pathname: string, payload: TPayload) {
logWorker("api_request_start", { pathname })
const response = await fetch(new URL(pathname, env.siteUrl), {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(payload),
})
const data = (await response.json().catch(() => ({}))) as ApiSuccess
if (!response.ok || !data.success) {
throw new Error(data.error || data.message || `Request to ${pathname} failed.`)
}
logWorker("api_request_success", { pathname })
return data
}
function buildContactPayload(args: AssistantContactLeadPayload, userData: WorkerUserData) {
return {
...args,
source: VOICE_ASSISTANT_SOURCE,
page: userData.pathname,
timestamp: new Date().toISOString(),
url: userData.pageUrl,
}
}
function buildMachineRequestPayload(args: AssistantMachineRequestPayload, userData: WorkerUserData) {
return {
...args,
machineType: args.machineType
.split(",")
.map((value) => value.trim())
.filter(Boolean)
.join(","),
source: VOICE_ASSISTANT_SOURCE,
page: userData.pathname,
timestamp: new Date().toISOString(),
url: userData.pageUrl,
}
}
function buildConsentMetadata(userData: WorkerUserData) {
return {
consentVersion: SMS_CONSENT_VERSION,
consentCapturedAt: new Date().toISOString(),
consentSourcePage: userData.pathname,
}
}
function extractConversationItemText(item: unknown): string {
if (typeof item === "string") {
return item.replace(/\s+/g, " ").trim()
}
if (!item || typeof item !== "object") {
return ""
}
if ("text" in item && typeof item.text === "string") {
return item.text.replace(/\s+/g, " ").trim()
}
if ("transcript" in item && typeof item.transcript === "string") {
return item.transcript.replace(/\s+/g, " ").trim()
}
if ("content" in item) {
const { content } = item as { content?: unknown }
if (Array.isArray(content)) {
return content
.map((part) => extractConversationItemText(part))
.filter(Boolean)
.join(" ")
.replace(/\s+/g, " ")
.trim()
}
if (typeof content === "string") {
return content.replace(/\s+/g, " ").trim()
}
}
return ""
}
function createTools() {
return {
submit_contact_lead: llm.tool({
description:
"Submit a general contact or callback request after the visitor clearly confirms the details.",
parameters: z.object({
firstName: z.string().min(1).describe("Visitor first name"),
lastName: z.string().min(1).describe("Visitor last name"),
email: z.string().email().describe("Visitor email address"),
phone: z.string().min(7).describe("Visitor phone number"),
company: z.string().optional().describe("Visitor company name if they shared one"),
message: z.string().min(10).describe("Short summary of what the visitor needs"),
serviceTextConsent: z
.literal(true)
.describe("Set to true only if the visitor explicitly agrees to service-related SMS follow-up."),
marketingTextConsent: z
.boolean()
.optional()
.describe("Optional marketing SMS consent when the visitor clearly opts in."),
confirmed: z.literal(true).describe("Set to true only after the visitor explicitly confirms submission."),
confirmationMessage: z
.string()
.min(2)
.describe("The visitor's confirmation words, such as 'yes, go ahead and submit that'."),
}),
execute: async ({ confirmed: _confirmed, confirmationMessage, ...args }, { ctx }) => {
assertExplicitConfirmation(confirmationMessage)
const userData = ctx.userData as WorkerUserData
const payload = buildContactPayload(
{
...args,
...buildConsentMetadata(userData),
},
userData,
)
logWorker("tool_start", {
tool: "submit_contact_lead",
pathname: userData.pathname,
email: args.email,
})
try {
const result = await postJson("/api/contact", payload)
logWorker("tool_success", {
tool: "submit_contact_lead",
pathname: userData.pathname,
email: args.email,
})
return {
success: true,
message:
result.message ||
"Thanks, your contact request has been sent to Rocky Mountain Vending."
}
} catch (error) {
console.error("[voice-agent] tool_failure", {
tool: "submit_contact_lead",
pathname: userData.pathname,
email: args.email,
error: serializeError(error),
})
throw error
}
},
}),
submit_machine_request: llm.tool({
description:
"Submit a free machine placement or machine request lead after the visitor explicitly confirms and agrees to service-related follow-up.",
parameters: z.object({
firstName: z.string().min(1).describe("Visitor first name"),
lastName: z.string().min(1).describe("Visitor last name"),
email: z.string().email().describe("Visitor email address"),
phone: z.string().min(7).describe("Visitor phone number"),
company: z.string().min(1).describe("Business or organization name"),
employeeCount: z
.string()
.min(1)
.describe("Approximate employee count or audience size as a plain number string"),
machineType: z
.string()
.min(1)
.describe("Desired machine type or a comma-separated list like snack, beverage, combo"),
machineCount: z.string().min(1).describe("Desired number of machines as a plain number string"),
message: z.string().optional().describe("Optional visitor notes or placement details"),
serviceTextConsent: z
.literal(true)
.describe("Set to true only if the visitor explicitly agrees to service-related SMS follow-up."),
marketingTextConsent: z
.boolean()
.optional()
.describe("Optional marketing SMS consent when the visitor clearly opts in."),
confirmed: z.literal(true).describe("Set to true only after the visitor explicitly confirms submission."),
confirmationMessage: z
.string()
.min(2)
.describe("The visitor's confirmation words, such as 'yes, please submit it'."),
}),
execute: async ({ confirmed: _confirmed, confirmationMessage, ...args }, { ctx }) => {
assertExplicitConfirmation(confirmationMessage)
const userData = ctx.userData as WorkerUserData
const payload = buildMachineRequestPayload(
{
...args,
...buildConsentMetadata(userData),
},
userData,
)
logWorker("tool_start", {
tool: "submit_machine_request",
pathname: userData.pathname,
email: args.email,
company: args.company,
})
try {
const result = await postJson("/api/request-machine", payload)
logWorker("tool_success", {
tool: "submit_machine_request",
pathname: userData.pathname,
email: args.email,
company: args.company,
})
return {
success: true,
message:
result.message ||
"Thanks, your machine request has been sent to Rocky Mountain Vending."
}
} catch (error) {
console.error("[voice-agent] tool_failure", {
tool: "submit_machine_request",
pathname: userData.pathname,
email: args.email,
company: args.company,
error: serializeError(error),
})
throw error
}
},
}),
handoff_to_human: llm.tool({
description:
"Use this when the visitor wants a human right away or asks for something uncertain, legal, pricing-related, or highly specific.",
parameters: z.object({
reason: z.string().min(2).describe("Short reason for the human handoff"),
}),
execute: async ({ reason }) => {
logWorker("tool_success", {
tool: "handoff_to_human",
reason: truncateText(reason, 120),
})
return {
success: true,
message: `${buildHandoffMessage()} Reason: ${reason}`,
}
},
}),
}
}
async function startSession(ctx: JobContext) {
const roomName = ctx.room.name || `rmv-voice-${ctx.workerId}`
logWorker("job_connect_start", {
room: roomName,
workerId: ctx.workerId,
})
await ctx.connect(undefined, AutoSubscribe.AUDIO_ONLY)
logWorker("job_connect_ready", {
room: roomName,
workerId: ctx.workerId,
})
const participant = await ctx.waitForParticipant()
const visitor = parseVisitorMetadata(participant.metadata)
const userData: WorkerUserData = {
siteUrl: env.siteUrl,
pathname: visitor.pathname,
pageUrl: visitor.pageUrl,
startedAt: visitor.startedAt,
}
const persistenceState: VoiceSessionPersistenceState = {
sessionId: null,
recordingId: null,
recordingStatus: persistenceServices.recording ? "pending" : "failed",
recordingUrl: null,
}
logWorker("participant_joined", {
identity: participant.identity,
room: ctx.room.name,
pathname: userData.pathname,
pageUrl: userData.pageUrl,
})
ctx.room.on("trackPublished", (publication, remoteParticipant) => {
if (remoteParticipant.identity !== participant.identity) {
return
}
try {
publication.setSubscribed(true)
logWorker("remote_track_subscription_requested", {
participantIdentity: remoteParticipant.identity,
source: publication.source,
kind: publication.kind,
trackSid: publication.sid,
})
} catch (error) {
console.error("[voice-agent] remote_track_subscription_failed", {
participantIdentity: remoteParticipant.identity,
source: publication.source,
kind: publication.kind,
trackSid: publication.sid,
error: serializeError(error),
})
}
logWorker("remote_track_published", {
participantIdentity: remoteParticipant.identity,
source: publication.source,
kind: publication.kind,
trackSid: publication.sid,
})
})
ctx.room.on("trackSubscribed", (_track, publication, remoteParticipant) => {
if (remoteParticipant.identity !== participant.identity) {
return
}
logWorker("remote_track_subscribed", {
participantIdentity: remoteParticipant.identity,
source: publication.source,
kind: publication.kind,
trackSid: publication.sid,
})
})
try {
const sessionId = await createVoiceSessionRecord(persistenceServices, {
roomName,
participantIdentity: participant.identity,
siteUrl: userData.siteUrl,
pathname: userData.pathname,
pageUrl: userData.pageUrl,
source: VOICE_ASSISTANT_SOURCE,
metadata: JSON.stringify(visitor),
recordingDisclosureAt: Date.now(),
recordingStatus: persistenceServices.recording ? "pending" : undefined,
startedAt: Date.parse(userData.startedAt) || Date.now(),
})
persistenceState.sessionId = typeof sessionId === "string" ? sessionId : null
logWorker("voice_session_persisted", {
room: ctx.room.name,
participantIdentity: participant.identity,
persisted: Boolean(persistenceState.sessionId),
})
} catch (error) {
console.error("[voice-agent] session_persistence_failed", {
room: roomName,
participantIdentity: participant.identity,
error: serializeError(error),
})
}
const session = new voice.AgentSession<WorkerUserData>({
llm: new xai.realtime.RealtimeModel({
apiKey: env.xaiApiKey,
model: env.realtimeModel,
voice: XAI_REALTIME_VOICE,
}),
userData,
turnHandling: {
interruption: {
enabled: true,
},
},
})
session.on(voice.AgentSessionEventTypes.AgentStateChanged, (event) => {
logWorker("agent_state_changed", {
oldState: event.oldState,
newState: event.newState,
})
})
session.on(voice.AgentSessionEventTypes.UserStateChanged, (event) => {
logWorker("user_state_changed", {
oldState: event.oldState,
newState: event.newState,
})
})
session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (event) => {
logWorker("user_input_transcribed", {
isFinal: event.isFinal,
language: event.language,
transcript: truncateText(event.transcript, 160),
})
if (!event.isFinal || !persistenceState.sessionId) {
return
}
void addVoiceTranscriptTurn(persistenceServices, {
sessionId: persistenceState.sessionId,
roomName,
participantIdentity: participant.identity,
role: "user",
text: event.transcript,
kind: "transcription",
isFinal: event.isFinal,
language: event.language ?? undefined,
}).catch((error) => {
console.error("[voice-agent] transcript_persist_failed", {
role: "user",
error: serializeError(error),
})
})
})
session.on(voice.AgentSessionEventTypes.ConversationItemAdded, (event) => {
const role = "role" in event.item ? event.item.role : "unknown"
const text = extractConversationItemText(event.item)
logWorker("conversation_item_added", {
role,
preview: truncateText(JSON.stringify(event.item), 180),
})
if (role !== "assistant" || !text || !persistenceState.sessionId) {
return
}
void addVoiceTranscriptTurn(persistenceServices, {
sessionId: persistenceState.sessionId,
roomName,
participantIdentity: participant.identity,
role: "assistant",
text,
kind: "response",
source: VOICE_ASSISTANT_SOURCE,
}).catch((error) => {
console.error("[voice-agent] transcript_persist_failed", {
role: "assistant",
error: serializeError(error),
})
})
})
session.on(voice.AgentSessionEventTypes.FunctionToolsExecuted, (event) => {
logWorker("function_tools_executed", {
toolNames: event.functionCalls.map((call) => call.name),
outputs: event.functionCallOutputs.length,
})
})
session.on(voice.AgentSessionEventTypes.SpeechCreated, (event) => {
logWorker("speech_created", {
source: event.source,
userInitiated: event.userInitiated,
})
})
session.on(voice.AgentSessionEventTypes.MetricsCollected, (event) => {
logWorker("metrics_collected", {
metricType: event.metrics.type,
})
})
session.on(voice.AgentSessionEventTypes.Error, (event) => {
console.error("[voice-agent] session_error", {
source: event.source?.constructor?.name || "unknown",
error: serializeError(event.error),
})
})
session.on(voice.AgentSessionEventTypes.Close, (event) => {
logWorker("session_closed", {
reason: event.reason,
error: event.error ? serializeError(event.error) : null,
})
void (async () => {
try {
if (persistenceState.recordingId) {
await stopVoiceRecording(persistenceServices, persistenceState.recordingId)
}
} catch (error) {
console.error("[voice-agent] recording_stop_failed", {
recordingId: persistenceState.recordingId,
error: serializeError(error),
})
}
if (!persistenceState.sessionId) {
return
}
try {
await completeVoiceSession(persistenceServices, {
sessionId: persistenceState.sessionId,
endedAt: Date.now(),
recordingId: persistenceState.recordingId ?? undefined,
recordingStatus: persistenceState.recordingId ? "completed" : persistenceState.recordingStatus,
recordingUrl: persistenceState.recordingUrl ?? undefined,
recordingError: event.error ? serializeError(event.error).message : undefined,
})
} catch (error) {
console.error("[voice-agent] session_complete_failed", {
sessionId: persistenceState.sessionId,
error: serializeError(error),
})
}
})()
})
const agent = new voice.Agent<WorkerUserData>({
instructions: VOICE_ASSISTANT_SYSTEM_PROMPT,
tools: createTools(),
})
logWorker("session_starting", {
room: roomName,
participantIdentity: participant.identity,
pathname: userData.pathname,
model: env.realtimeModel,
})
await session.start({ agent, room: ctx.room })
logWorker("session_started", {
room: roomName,
participantIdentity: participant.identity,
})
if (persistenceServices.recording && persistenceState.sessionId) {
try {
persistenceState.recordingStatus = "starting"
await updateVoiceRecording(persistenceServices, {
sessionId: persistenceState.sessionId,
recordingStatus: "starting",
})
const recording = await startVoiceRecording(persistenceServices, roomName)
if (recording) {
persistenceState.recordingId = recording.recordingId
persistenceState.recordingStatus = recording.recordingStatus
persistenceState.recordingUrl = recording.recordingUrl
await updateVoiceRecording(persistenceServices, {
sessionId: persistenceState.sessionId,
recordingStatus: recording.recordingStatus,
recordingId: recording.recordingId,
recordingUrl: recording.recordingUrl,
})
logWorker("recording_started", {
room: roomName,
participantIdentity: participant.identity,
recordingId: recording.recordingId,
recordingUrl: recording.recordingUrl,
})
}
} catch (error) {
persistenceState.recordingStatus = "failed"
console.error("[voice-agent] recording_start_failed", {
room: roomName,
participantIdentity: participant.identity,
error: serializeError(error),
})
await updateVoiceRecording(persistenceServices, {
sessionId: persistenceState.sessionId,
recordingStatus: "failed",
recordingError: error instanceof Error ? error.message : String(error),
}).catch((persistError) => {
console.error("[voice-agent] recording_failure_persist_failed", {
error: serializeError(persistError),
})
})
}
}
}
export default defineAgent({
entry: async (ctx) => {
await startSession(ctx)
},
})
if (process.argv[1] === fileURLToPath(import.meta.url)) {
cli.runApp(
new WorkerOptions({
agent: fileURLToPath(import.meta.url),
agentName: VOICE_ASSISTANT_AGENT_NAME,
wsURL: env.livekitUrl,
apiKey: env.livekitApiKey,
apiSecret: env.livekitApiSecret,
}),
)
}