-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathagent.py
116 lines (97 loc) · 4.05 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.llm import ImageContent, ChatContext, ChatMessage
from livekit.agents.voice import AgentSession, Agent, room_io
from livekit.plugins import (
cartesia,
openai,
deepgram,
noise_cancellation,
silero,
turn_detector,
)
from pathlib import Path
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
logger = logging.getLogger("voice-agent")
async def get_video_track(room: rtc.Room):
"""Find and return the first available remote video track in the room."""
for participant_id, participant in room.remote_participants.items():
for track_id, track_publication in participant.track_publications.items():
if track_publication.track and isinstance(
track_publication.track, rtc.RemoteVideoTrack
):
logger.info(
f"Found video track {track_publication.track.sid} "
f"from participant {participant_id}"
)
return track_publication.track
raise ValueError("No remote video track found in the room")
async def get_latest_image(room: rtc.Room):
"""Capture and return a single frame from the video track."""
video_stream = None
try:
video_track = await get_video_track(room)
video_stream = rtc.VideoStream(video_track)
async for event in video_stream:
logger.debug("Captured latest video frame")
return event.frame
except Exception as e:
logger.error(f"Failed to get latest image: {e}")
return None
finally:
if video_stream:
await video_stream.aclose()
class Assistant(Agent):
def __init__(self, room: rtc.Room) -> None:
super().__init__(
instructions=(
"You are a voice assistant created by LiveKit that can both see and hear. "
"You should use short and concise responses, avoiding unpronounceable punctuation. "
"When you see an image in our conversation, naturally incorporate what you see "
"into your response. Keep visual descriptions brief but informative."
),
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(),
turn_detection=turn_detector.EOUModel(),
)
self._room = room
async def on_end_of_turn(
self, chat_ctx: ChatContext, new_message: ChatMessage, generating_reply: bool
) -> None:
"""
Callback that runs right before the LLM generates a response.
Captures the current video frame and adds it to the conversation context.
"""
chat_ctx = chat_ctx.copy()
latest_image = await get_latest_image(self._room)
if latest_image:
image_content = ImageContent(image=latest_image)
new_message.content.append(image_content)
logger.debug("Added latest frame to conversation context")
chat_ctx.items.append(new_message)
await self.update_chat_ctx(chat_ctx)
async def entrypoint(ctx: JobContext):
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)
# Wait for the first participant to connect
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
session = AgentSession(
min_endpointing_delay=0.5,
max_endpointing_delay=5.0,
)
await session.start(
room=ctx.room,
agent=Assistant(ctx.room),
room_input_options=room_io.RoomInputOptions(
noise_cancellation=noise_cancellation.BVC(),
),
)
# The agent should be polite and greet the user when it joins :)
await session.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))