import asyncio import logging from dotenv import load_dotenv from livekit import rtc from livekit.agents import ( Agent, AgentServer, AgentSession, AutoSubscribe, JobContext, JobProcess, StopResponse, cli, inference, llm, room_io, utils, ) from livekit.plugins import silero load_dotenv() logger = logging.getLogger("transcriber") # This example demonstrates how to transcribe audio from multiple remote participants. # It creates agent sessions for each participant and transcribes their audio. class Transcriber(Agent): def __init__(self, *, participant_identity: str): super().__init__( instructions="not-needed", stt=inference.STT("deepgram/nova-3"), ) self.participant_identity = participant_identity async def on_user_turn_completed(self, chat_ctx: llm.ChatContext, new_message: llm.ChatMessage): user_transcript = new_message.text_content logger.info(f"{self.participant_identity} -> {user_transcript}") raise StopResponse() class MultiUserTranscriber: def __init__(self, ctx: JobContext): self.ctx = ctx self._sessions: dict[str, AgentSession] = {} self._tasks: set[asyncio.Task] = set() def start(self): self.ctx.room.on("participant_connected", self.on_participant_connected) self.ctx.room.on("participant_disconnected", self.on_participant_disconnected) async def aclose(self): await utils.aio.cancel_and_wait(*self._tasks) await asyncio.gather(*[self._close_session(session) for session in self._sessions.values()]) self.ctx.room.off("participant_connected", self.on_participant_connected) self.ctx.room.off("participant_disconnected", self.on_participant_disconnected) def on_participant_connected(self, participant: rtc.RemoteParticipant): if participant.identity in self._sessions: return logger.info(f"starting session for {participant.identity}") task = asyncio.create_task(self._start_session(participant)) self._tasks.add(task) def on_task_done(task: asyncio.Task): try: self._sessions[participant.identity] = task.result() finally: self._tasks.discard(task) task.add_done_callback(on_task_done) def on_participant_disconnected(self, participant: rtc.RemoteParticipant): if (session := self._sessions.pop(participant.identity)) is None: return logger.info(f"closing session for {participant.identity}") task = asyncio.create_task(self._close_session(session)) self._tasks.add(task) task.add_done_callback(lambda _: self._tasks.discard(task)) async def _start_session(self, participant: rtc.RemoteParticipant) -> AgentSession: if participant.identity in self._sessions: return self._sessions[participant.identity] session = AgentSession( vad=self.ctx.proc.userdata["vad"], ) await session.start( agent=Transcriber( participant_identity=participant.identity, ), room=self.ctx.room, room_options=room_io.RoomOptions( audio_input=True, text_output=True, audio_output=False, participant_identity=participant.identity, # text input is not supported for multiple room participants # if needed, register the text stream handler by yourself # and route the text to different sessions based on the participant identity text_input=False, ), ) return session async def _close_session(self, sess: AgentSession) -> None: await sess.drain() await sess.aclose() server = AgentServer() @server.rtc_session() async def entrypoint(ctx: JobContext): transcriber = MultiUserTranscriber(ctx) transcriber.start() await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) for participant in ctx.room.remote_participants.values(): # handle all existing participants transcriber.on_participant_connected(participant) async def cleanup(): await transcriber.aclose() ctx.add_shutdown_callback(cleanup) def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() server.setup_fnc = prewarm if __name__ == "__main__": cli.run_app(server)