Skip to content

Commit f491c2d

Browse files
authored
Prepare 2.1.0 release (Part 1) (#303)
1 parent bf3f0ed commit f491c2d

File tree

3 files changed

+188
-29
lines changed

3 files changed

+188
-29
lines changed

src/Custom/RealtimeConversation/RealtimeConversationSession.cs

+24-26
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ public partial class RealtimeConversationSession : IDisposable
2020
private readonly RealtimeConversationClient _parentClient;
2121
private readonly Uri _endpoint;
2222
private readonly ApiKeyCredential _credential;
23-
private readonly object _sendingAudioLock = new();
24-
private bool _isSendingAudio = false;
23+
private readonly SemaphoreSlim _audioSendSemaphore = new(1, 1);
24+
private bool _isSendingAudioStream = false;
2525

2626
internal bool ShouldBufferTurnResponseData { get; set; }
2727

@@ -47,13 +47,13 @@ protected internal RealtimeConversationSession(
4747
public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken cancellationToken = default)
4848
{
4949
Argument.AssertNotNull(audio, nameof(audio));
50-
lock (_sendingAudioLock)
50+
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
5151
{
52-
if (_isSendingAudio)
52+
if (_isSendingAudioStream)
5353
{
5454
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
5555
}
56-
_isSendingAudio = true;
56+
_isSendingAudioStream = true;
5757
}
5858
try
5959
{
@@ -75,23 +75,23 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
7575
}
7676
finally
7777
{
78-
lock (_sendingAudioLock)
78+
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
7979
{
80-
_isSendingAudio = false;
80+
_isSendingAudioStream = false;
8181
}
8282
}
8383
}
8484

8585
public virtual void SendInputAudio(Stream audio, CancellationToken cancellationToken = default)
8686
{
8787
Argument.AssertNotNull(audio, nameof(audio));
88-
lock (_sendingAudioLock)
88+
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
8989
{
90-
if (_isSendingAudio)
90+
if (_isSendingAudioStream)
9191
{
9292
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
9393
}
94-
_isSendingAudio = true;
94+
_isSendingAudioStream = true;
9595
}
9696
try
9797
{
@@ -113,9 +113,9 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
113113
}
114114
finally
115115
{
116-
lock (_sendingAudioLock)
116+
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
117117
{
118-
_isSendingAudio = false;
118+
_isSendingAudioStream = false;
119119
}
120120
}
121121
}
@@ -130,18 +130,17 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
130130
public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToken cancellationToken = default)
131131
{
132132
Argument.AssertNotNull(audio, nameof(audio));
133-
lock (_sendingAudioLock)
133+
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
134134
{
135-
if (_isSendingAudio)
135+
if (_isSendingAudioStream)
136136
{
137137
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
138138
}
139-
_isSendingAudio = true;
139+
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
140+
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
141+
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
142+
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
140143
}
141-
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
142-
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
143-
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
144-
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
145144
}
146145

147146
/// <summary>
@@ -154,18 +153,17 @@ public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToke
154153
public virtual void SendInputAudio(BinaryData audio, CancellationToken cancellationToken = default)
155154
{
156155
Argument.AssertNotNull(audio, nameof(audio));
157-
lock (_sendingAudioLock)
156+
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
158157
{
159-
if (_isSendingAudio)
158+
if (_isSendingAudioStream)
160159
{
161160
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
162161
}
163-
_isSendingAudio = true;
162+
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
163+
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
164+
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
165+
SendCommand(requestData, cancellationToken.ToRequestOptions());
164166
}
165-
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
166-
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
167-
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
168-
SendCommand(requestData, cancellationToken.ToRequestOptions());
169167
}
170168

171169
public virtual async Task ClearInputAudioAsync(CancellationToken cancellationToken = default)
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using System.Diagnostics.Contracts;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace OpenAI;
7+
8+
internal static class SemaphoreSlimExtensions
9+
{
10+
public static async Task<IDisposable> AutoReleaseWaitAsync(
11+
this SemaphoreSlim semaphore,
12+
CancellationToken cancellationToken = default)
13+
{
14+
Contract.Requires(semaphore != null);
15+
var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore);
16+
await semaphore.WaitAsync(cancellationToken);
17+
return wrapper;
18+
}
19+
20+
public static IDisposable AutoReleaseWait(
21+
this SemaphoreSlim semaphore,
22+
CancellationToken cancellationToken = default)
23+
{
24+
Contract.Requires(semaphore != null);
25+
var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore);
26+
semaphore.Wait(cancellationToken);
27+
return wrapper;
28+
}
29+
30+
private class ReleaseableSemaphoreSlimWrapper
31+
: IDisposable
32+
{
33+
private readonly SemaphoreSlim semaphore;
34+
private bool alreadyDisposed = false;
35+
36+
public ReleaseableSemaphoreSlimWrapper(SemaphoreSlim semaphore)
37+
=> this.semaphore = semaphore;
38+
39+
public void Dispose()
40+
{
41+
this.Dispose(true);
42+
GC.SuppressFinalize(this);
43+
}
44+
45+
protected void Dispose(bool disposeActuallyCalled)
46+
{
47+
if (!this.alreadyDisposed)
48+
{
49+
if (disposeActuallyCalled)
50+
{
51+
this.semaphore?.Release();
52+
}
53+
54+
this.alreadyDisposed = true;
55+
}
56+
}
57+
}
58+
}

tests/RealtimeConversation/ConversationTests.cs

+106-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
using NUnit.Framework;
22
using OpenAI.RealtimeConversation;
33
using System;
4+
using System.Buffers;
45
using System.ClientModel;
56
using System.ClientModel.Primitives;
67
using System.Collections.Generic;
78
using System.IO;
89
using System.Linq;
910
using System.Numerics;
1011
using System.Text;
12+
using System.Threading;
1113
using System.Threading.Tasks;
1214

1315
namespace OpenAI.Tests.Conversation;
@@ -239,7 +241,47 @@ await session.AddItemAsync(
239241
}
240242

241243
[Test]
242-
public async Task AudioWithToolsWorks()
244+
public async Task AudioStreamConvenienceBlocksCorrectly()
245+
{
246+
RealtimeConversationClient client = GetTestClient();
247+
using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken);
248+
249+
string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav");
250+
using TestDelayedFileReadStream delayedStream = new(inputAudioFilePath, TimeSpan.FromMilliseconds(200), readsBeforeDelay: 2);
251+
_ = session.SendInputAudioAsync(delayedStream, CancellationToken);
252+
253+
bool gotSpeechStarted = false;
254+
255+
await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync(CancellationToken))
256+
{
257+
if (update is ConversationInputSpeechStartedUpdate)
258+
{
259+
gotSpeechStarted = true;
260+
Assert.ThrowsAsync<InvalidOperationException>(
261+
async () =>
262+
{
263+
using MemoryStream dummyStream = new();
264+
await session.SendInputAudioAsync(dummyStream, CancellationToken);
265+
},
266+
"Sending a Stream while another Stream is being sent should throw!");
267+
Assert.ThrowsAsync<InvalidOperationException>(
268+
async () =>
269+
{
270+
BinaryData dummyData = BinaryData.FromString("hello, world! this isn't audio.");
271+
await session.SendInputAudioAsync(dummyData, CancellationToken);
272+
},
273+
"Sending BinaryData while a Stream is being sent should throw!");
274+
break;
275+
}
276+
}
277+
278+
Assert.That(gotSpeechStarted, Is.True);
279+
}
280+
281+
[Test]
282+
[TestCase(TestAudioSendType.WithAudioStreamHelper)]
283+
[TestCase(TestAudioSendType.WithManualAudioChunks)]
284+
public async Task AudioWithToolsWorks(TestAudioSendType audioSendType)
243285
{
244286
RealtimeConversationClient client = GetTestClient();
245287
using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken);
@@ -285,8 +327,27 @@ public async Task AudioWithToolsWorks()
285327

286328
await session.ConfigureSessionAsync(options, CancellationToken);
287329

288-
using Stream audioStream = File.OpenRead(Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav"));
289-
_ = session.SendInputAudioAsync(audioStream, CancellationToken);
330+
_ = Task.Run(async () =>
331+
{
332+
string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav");
333+
if (audioSendType == TestAudioSendType.WithAudioStreamHelper)
334+
{
335+
using Stream audioStream = File.OpenRead(inputAudioFilePath);
336+
await session.SendInputAudioAsync(audioStream, CancellationToken);
337+
}
338+
else if (audioSendType == TestAudioSendType.WithManualAudioChunks)
339+
{
340+
byte[] allAudioBytes = await File.ReadAllBytesAsync(inputAudioFilePath, CancellationToken);
341+
const int audioSendBufferLength = 8 * 1024;
342+
byte[] audioSendBuffer = ArrayPool<byte>.Shared.Rent(audioSendBufferLength);
343+
for (int readPos = 0; readPos < allAudioBytes.Length; readPos += audioSendBufferLength)
344+
{
345+
int nextSegmentLength = Math.Min(audioSendBufferLength, allAudioBytes.Length - readPos);
346+
ArraySegment<byte> nextSegment = new(allAudioBytes, readPos, nextSegmentLength);
347+
await session.SendInputAudioAsync(BinaryData.FromBytes(nextSegment), CancellationToken);
348+
}
349+
}
350+
});
290351

291352
string userTranscript = null;
292353

@@ -465,4 +526,46 @@ public async Task CanAddItems()
465526

466527
Assert.That(itemCreatedCount, Is.EqualTo(items.Count + 1));
467528
}
529+
530+
public enum TestAudioSendType
531+
{
532+
WithAudioStreamHelper,
533+
WithManualAudioChunks
534+
}
535+
536+
private class TestDelayedFileReadStream : FileStream
537+
{
538+
private readonly TimeSpan _delayBetweenReads;
539+
private readonly int _readsBeforeDelay;
540+
private int _readsPerformed;
541+
542+
public TestDelayedFileReadStream(
543+
string path,
544+
TimeSpan delayBetweenReads,
545+
int readsBeforeDelay = 0)
546+
: base(path, FileMode.Open, FileAccess.Read)
547+
{
548+
_delayBetweenReads = delayBetweenReads;
549+
_readsBeforeDelay = readsBeforeDelay;
550+
_readsPerformed = 0;
551+
}
552+
553+
public override int Read(byte[] buffer, int offset, int count)
554+
{
555+
if (++_readsPerformed > _readsBeforeDelay)
556+
{
557+
System.Threading.Thread.Sleep((int)_delayBetweenReads.TotalMilliseconds);
558+
}
559+
return base.Read(buffer, offset, count);
560+
}
561+
562+
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
563+
{
564+
if (++_readsPerformed > _readsBeforeDelay)
565+
{
566+
await Task.Delay(_delayBetweenReads);
567+
}
568+
return await base.ReadAsync(buffer, offset, count, cancellationToken);
569+
}
570+
}
468571
}

0 commit comments

Comments
 (0)