-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsip_lifecycle.py
169 lines (133 loc) · 6.27 KB
/
sip_lifecycle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import logging
import os
import uuid
from pathlib import Path
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli
from livekit import rtc
from livekit import api
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent, AgentSession, RunContext
from livekit.plugins import deepgram, openai, silero, elevenlabs
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
logger = logging.getLogger("sip-lifecycle-agent")
logger.setLevel(logging.INFO)
class SIPLifecycleAgent(Agent):
def __init__(self, job_context=None) -> None:
self.job_context = job_context
super().__init__(
instructions="""
You are a helpful assistant demonstrating SIP call lifecycle management.
You can add SIP participants and end the call when requested.
""",
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=elevenlabs.TTS(
model="eleven_multilingual_v2"
),
vad=silero.VAD.load()
)
@function_tool
async def add_sip_participant(self, context: RunContext, phone_number: str):
"""
Add a SIP participant to the current call.
Args:
context: The call context
phone_number: The phone number to call
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't add participants at this time.")
return None, "Failed to add SIP participant: No job context available"
room_name = self.job_context.room.name
identity = f"sip_{uuid.uuid4().hex[:8]}"
sip_trunk_id = os.environ.get('SIP_TRUNK_ID')
logger.info(f"Adding SIP participant with phone number {phone_number} to room {room_name}")
try:
response = await self.job_context.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
sip_trunk_id=sip_trunk_id,
sip_call_to=phone_number,
room_name=room_name,
participant_identity=identity,
participant_name=f"SIP Participant {phone_number}",
krisp_enabled=True
)
)
logger.info(f"Successfully added SIP participant: {response}")
return None, f"Added SIP participant {phone_number} to the call."
except Exception as e:
logger.error(f"Error adding SIP participant: {e}")
await self.session.say(f"I'm sorry, I couldn't add {phone_number} to the call.")
return None, f"Failed to add SIP participant: {e}"
@function_tool
async def end_call(self, context: RunContext):
"""
End the current call by deleting the room.
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't end the call at this time.")
return None, "Failed to end call: No job context available"
room_name = self.job_context.room.name
logger.info(f"Ending call by deleting room {room_name}")
try:
await self.session.say("Thank you for your time. I'll be ending this call now. Goodbye!")
await asyncio.sleep(3)
await self.job_context.api.room.delete_room(
api.DeleteRoomRequest(room=room_name)
)
logger.info(f"Successfully deleted room {room_name}")
return None, "Call ended successfully."
except Exception as e:
logger.error(f"Error ending call: {e}")
return None, f"Failed to end call: {e}"
@function_tool
async def log_participants(self, context: RunContext):
"""
Log all participants in the current room.
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't list participants at this time.")
return None, "Failed to list participants: No job context available"
room_name = self.job_context.room.name
logger.info(f"Logging participants in room {room_name}")
try:
response = await self.job_context.api.room.list_participants(
api.ListParticipantsRequest(room=room_name)
)
participants = response.participants
participant_info = []
for p in participants:
participant_info.append({
"identity": p.identity,
"name": p.name,
"state": p.state,
"is_publisher": p.is_publisher
})
logger.info(f"Participants in room {room_name}: {participant_info}")
await self.session.say(f"There are {len(participants)} participants in this call.")
return None, f"Listed {len(participants)} participants in the room."
except Exception as e:
logger.error(f"Error listing participants: {e}")
return None, f"Failed to list participants: {e}"
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession()
agent = SIPLifecycleAgent(job_context=ctx)
await session.start(
agent=agent,
room=ctx.room
)
def on_participant_connected_handler(participant: rtc.RemoteParticipant):
asyncio.create_task(async_on_participant_connected(participant))
async def async_on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(f"New participant connected: {participant.identity}")
await agent.session.say(f"Welcome, {participant.name or participant.identity}! I can help you add a participant to this call or end the call.")
ctx.room.on("participant_connected", on_participant_connected_handler)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))