import { ConvexHttpClient } from "convex/browser" import { makeFunctionReference } from "convex/server" import { EgressClient, EncodedFileOutput, EncodedFileType, S3Upload, } from "livekit-server-sdk" import { VOICE_ASSISTANT_SOURCE } from "@/lib/voice-assistant/types" type VoiceRecordingStatus = "pending" | "starting" | "recording" | "completed" | "failed" type CreateVoiceSessionArgs = { roomName: string participantIdentity: string siteUrl?: string pathname?: string pageUrl?: string source?: string metadata?: string startedAt?: number recordingDisclosureAt?: number recordingStatus?: VoiceRecordingStatus } type AddTranscriptTurnArgs = { sessionId: string roomName: string participantIdentity: string role: "user" | "assistant" | "system" text: string kind?: string isFinal?: boolean language?: string source?: string createdAt?: number } type UpdateVoiceRecordingArgs = { sessionId: string recordingStatus?: VoiceRecordingStatus recordingId?: string recordingUrl?: string recordingError?: string } type CompleteVoiceSessionArgs = UpdateVoiceRecordingArgs & { endedAt?: number } type VoiceRecordingConfig = { bucket: string egressClient: EgressClient endpoint: string forcePathStyle: boolean pathPrefix: string publicBaseUrl?: string region: string secret: string accessKey: string } type VoicePersistenceServices = { convexClient: ConvexHttpClient | null recording: VoiceRecordingConfig | null } const CREATE_VOICE_SESSION = makeFunctionReference<"mutation">("voiceSessions:createSession") const ADD_VOICE_TRANSCRIPT_TURN = makeFunctionReference<"mutation">("voiceSessions:addTranscriptTurn") const UPDATE_VOICE_RECORDING = makeFunctionReference<"mutation">("voiceSessions:updateRecording") const COMPLETE_VOICE_SESSION = makeFunctionReference<"mutation">("voiceSessions:completeSession") function readOptionalEnv(name: string) { const value = process.env[name] return typeof value === "string" && value.trim() ? value.trim() : "" } function readBooleanEnv(name: string) { const value = readOptionalEnv(name).toLowerCase() if (!value) { return undefined } return value === "1" || value === "true" || value === "yes" } function sanitizePathSegment(value: string) { return value .replace(/[^a-zA-Z0-9/_-]+/g, "-") .replace(/-+/g, "-") .replace(/\/+/g, "/") .replace(/^-|-$/g, "") } function buildRecordingFilepath(roomName: string) { const prefix = readOptionalEnv("VOICE_RECORDING_PATH_PREFIX") || "livekit-recordings" const date = new Date() const year = date.getUTCFullYear() const month = String(date.getUTCMonth() + 1).padStart(2, "0") const day = String(date.getUTCDate()).padStart(2, "0") const stamp = date.toISOString().replace(/[:.]/g, "-") const safeRoom = sanitizePathSegment(roomName) return `${sanitizePathSegment(prefix)}/${year}/${month}/${day}/${safeRoom}-${stamp}.mp3` } function buildRecordingUrl(publicBaseUrl: string | undefined, bucket: string, filepath: string) { const encodedPath = filepath .split("/") .map((segment) => encodeURIComponent(segment)) .join("/") if (publicBaseUrl) { return `${publicBaseUrl.replace(/\/$/, "")}/${encodedPath}` } return `s3://${bucket}/${filepath}` } export function getVoicePersistenceServices(args: { livekitUrl: string livekitApiKey: string livekitApiSecret: string }) { const convexUrl = readOptionalEnv("NEXT_PUBLIC_CONVEX_URL") || readOptionalEnv("CONVEX_URL") const convexClient = convexUrl ? new ConvexHttpClient(convexUrl) : null const accessKey = readOptionalEnv("VOICE_RECORDING_ACCESS_KEY_ID") || readOptionalEnv("CLOUDFLARE_R2_ACCESS_KEY_ID") || readOptionalEnv("AWS_ACCESS_KEY_ID") || readOptionalEnv("AWS_ACCESS_KEY") const secret = readOptionalEnv("VOICE_RECORDING_SECRET_ACCESS_KEY") || readOptionalEnv("CLOUDFLARE_R2_SECRET_ACCESS_KEY") || readOptionalEnv("AWS_SECRET_ACCESS_KEY") || readOptionalEnv("AWS_SECRET_KEY") const endpoint = readOptionalEnv("VOICE_RECORDING_ENDPOINT") || readOptionalEnv("CLOUDFLARE_R2_ENDPOINT") const bucket = readOptionalEnv("VOICE_RECORDING_BUCKET") const region = readOptionalEnv("VOICE_RECORDING_REGION") || readOptionalEnv("AWS_DEFAULT_REGION") || "auto" const publicBaseUrl = readOptionalEnv("VOICE_RECORDING_PUBLIC_BASE_URL") || undefined const forcePathStyle = readBooleanEnv("VOICE_RECORDING_FORCE_PATH_STYLE") ?? true const requested = readBooleanEnv("VOICE_RECORDING_ENABLED") const hasRecordingEnv = Boolean(accessKey && secret && endpoint && bucket) const recordingEnabled = requested ?? hasRecordingEnv const recording = recordingEnabled && hasRecordingEnv ? { accessKey, bucket, egressClient: new EgressClient(args.livekitUrl, args.livekitApiKey, args.livekitApiSecret), endpoint, forcePathStyle, pathPrefix: readOptionalEnv("VOICE_RECORDING_PATH_PREFIX") || "livekit-recordings", publicBaseUrl, region, secret, } : null return { convexClient, recording, } satisfies VoicePersistenceServices } async function safeMutation( client: ConvexHttpClient | null, reference: ReturnType>, args: TArgs, ) { if (!client) { return null } return await client.mutation(reference, args) } export async function createVoiceSessionRecord( services: VoicePersistenceServices, args: CreateVoiceSessionArgs, ) { return await safeMutation(services.convexClient, CREATE_VOICE_SESSION, { ...args, source: args.source || VOICE_ASSISTANT_SOURCE, }) } export async function addVoiceTranscriptTurn( services: VoicePersistenceServices, args: AddTranscriptTurnArgs, ) { const text = args.text.replace(/\s+/g, " ").trim() if (!text) { return null } return await safeMutation(services.convexClient, ADD_VOICE_TRANSCRIPT_TURN, { ...args, source: args.source || VOICE_ASSISTANT_SOURCE, text, }) } export async function updateVoiceRecording( services: VoicePersistenceServices, args: UpdateVoiceRecordingArgs, ) { return await safeMutation(services.convexClient, UPDATE_VOICE_RECORDING, args) } export async function completeVoiceSession( services: VoicePersistenceServices, args: CompleteVoiceSessionArgs, ) { return await safeMutation(services.convexClient, COMPLETE_VOICE_SESSION, args) } export async function startVoiceRecording( services: VoicePersistenceServices, roomName: string, ) { if (!services.recording) { return null } const filepath = buildRecordingFilepath(roomName) const output = new EncodedFileOutput({ fileType: EncodedFileType.MP3, filepath, output: { case: "s3", value: new S3Upload({ accessKey: services.recording.accessKey, secret: services.recording.secret, region: services.recording.region, endpoint: services.recording.endpoint, bucket: services.recording.bucket, forcePathStyle: services.recording.forcePathStyle, }), }, }) const info = await services.recording.egressClient.startRoomCompositeEgress(roomName, output, { audioOnly: true, }) return { recordingId: info.egressId, recordingStatus: "recording" as const, recordingUrl: buildRecordingUrl(services.recording.publicBaseUrl, services.recording.bucket, filepath), } } export async function stopVoiceRecording( services: VoicePersistenceServices, recordingId: string | null | undefined, ) { if (!services.recording || !recordingId) { return null } return await services.recording.egressClient.stopEgress(recordingId) }