-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathwarm_handoff.py
131 lines (108 loc) · 5.09 KB
/
warm_handoff.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import asyncio
import os
import uuid
from pathlib import Path
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli
from livekit import rtc
from livekit import api
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent, AgentSession, RunContext
from livekit.plugins import deepgram, openai, silero, elevenlabs
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
class WarmHandoffAgent(Agent):
def __init__(self, job_context=None) -> None:
self.job_context = job_context
super().__init__(
instructions="""
You are a helpful assistant communicating through voice. You're helping me test ... yourself ... since you're the AI agent.
Don't use any unpronouncable characters.
""",
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=elevenlabs.TTS(
encoding="pcm_44100",
model="eleven_multilingual_v2"
),
vad=silero.VAD.load()
)
@function_tool
async def transfer_call(self, context: RunContext, phone_number: str):
"""
Transfer the current call to a human agent at the specified phone number.
Args:
context: The call context
phone_number: The phone number to transfer the call to
"""
if not self.job_context:
await self.session.say("I'm sorry, I can't transfer the call at this time.")
return None, "Failed to transfer call: No job context available"
# Get room name from environment variable
room_name = os.environ.get('LIVEKIT_ROOM_NAME', self.job_context.room.name)
# Generate a unique identity for the SIP participant
identity = f"transfer_{uuid.uuid4().hex[:8]}"
# Create LiveKit API client
livekit_url = os.environ.get('LIVEKIT_URL')
livekit_api_key = os.environ.get('LIVEKIT_API_KEY')
livekit_api_secret = os.environ.get('LIVEKIT_API_SECRET')
sip_trunk_id = os.environ.get('SIP_TRUNK_ID')
try:
print(f"Transferring call to {phone_number}")
# Using the API from the job context if available
if self.job_context and hasattr(self.job_context, 'api'):
response = await self.job_context.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
sip_trunk_id=sip_trunk_id,
sip_call_to=phone_number,
room_name=room_name,
participant_identity=identity,
participant_name="Human Agent",
krisp_enabled=True
)
)
else:
# Fallback to creating our own API client
livekit_api = api.LiveKitAPI(
url=livekit_url,
api_key=livekit_api_key,
api_secret=livekit_api_secret
)
response = await livekit_api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
sip_trunk_id=sip_trunk_id,
sip_call_to=phone_number,
room_name=room_name,
participant_identity=identity,
participant_name="Human Agent",
krisp_enabled=True
)
)
await livekit_api.aclose()
await self.session.say(f"I'm transferring you to a human agent now. Please hold while we connect you.")
return None, f"I've transferred you to a human agent at {phone_number}. Please hold while we connect you."
except Exception as e:
print(f"Error transferring call: {e}")
await self.session.say(f"I'm sorry, I couldn't transfer the call at this time.")
return None, f"Failed to transfer call: {e}"
async def on_enter(self):
# Generate initial greeting
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession()
agent = WarmHandoffAgent(job_context=ctx)
await session.start(
agent=agent,
room=ctx.room
)
def on_participant_connected_handler(participant: rtc.RemoteParticipant):
asyncio.create_task(async_on_participant_connected(participant))
async def async_on_participant_connected(participant: rtc.RemoteParticipant):
await agent.session.say(f"Hi there! Is there anything I can help you with?")
# Handle existing participants
for participant in ctx.room.remote_participants.values():
asyncio.create_task(async_on_participant_connected(participant))
# Set up listener for new participants
ctx.room.on("participant_connected", on_participant_connected_handler)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))