Skip to content

Commit 36fc2fb

Browse files
committed
Merge remote-tracking branch 'origin/bugfix/eric/fix-upgrade-room' into bugfix/eric/upgrade-room-deduplication
2 parents f6b0e8d + d586f64 commit 36fc2fb

File tree

3 files changed

+93
-22
lines changed

3 files changed

+93
-22
lines changed

vector/src/main/java/im/vector/app/core/utils/DataSource.kt

+4-12
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ interface DataSource<T> {
2626

2727
interface MutableDataSource<T> : DataSource<T> {
2828

29-
suspend fun set(value: T)
30-
3129
fun post(value: T)
3230
}
3331

@@ -45,30 +43,24 @@ open class BehaviorDataSource<T>(private val defaultValue: T? = null) : MutableD
4543
return mutableFlow
4644
}
4745

48-
override suspend fun set(value: T) {
49-
mutableFlow.emit(value)
50-
}
51-
5246
override fun post(value: T) {
5347
mutableFlow.tryEmit(value)
5448
}
5549
}
5650

5751
/**
5852
* This datasource only emits all subsequent observed values to each subscriber.
53+
*
54+
* bufferSize - number of buffered items before it starts dropping oldest. Should be at least 1
5955
*/
60-
open class PublishDataSource<T> : MutableDataSource<T> {
56+
open class PublishDataSource<T>(bufferSize: Int = 10) : MutableDataSource<T> {
6157

62-
private val mutableFlow = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
58+
private val mutableFlow = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = bufferSize, onBufferOverflow = BufferOverflow.DROP_OLDEST)
6359

6460
override fun stream(): Flow<T> {
6561
return mutableFlow
6662
}
6763

68-
override suspend fun set(value: T) {
69-
mutableFlow.emit(value)
70-
}
71-
7264
override fun post(value: T) {
7365
mutableFlow.tryEmit(value)
7466
}

vector/src/main/java/im/vector/app/features/home/room/detail/composer/MessageComposerViewModel.kt

+8-10
Original file line numberDiff line numberDiff line change
@@ -472,16 +472,14 @@ class MessageComposerViewModel @AssistedInject constructor(
472472
Unit
473473
}
474474
is ParsedCommand.UpgradeRoom -> {
475-
viewModelScope.launch {
476-
_viewEvents.set(
477-
MessageComposerViewEvents.ShowRoomUpgradeDialog(
478-
parsedCommand.newVersion,
479-
room.roomSummary()?.isPublic ?: false
480-
)
481-
)
482-
_viewEvents.set(MessageComposerViewEvents.SlashCommandResultOk(parsedCommand))
483-
popDraft()
484-
}
475+
_viewEvents.post(
476+
MessageComposerViewEvents.ShowRoomUpgradeDialog(
477+
parsedCommand.newVersion,
478+
room.roomSummary()?.isPublic ?: false
479+
)
480+
)
481+
_viewEvents.post(MessageComposerViewEvents.SlashCommandResultOk(parsedCommand))
482+
popDraft()
485483
}
486484
}
487485
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2022 New Vector Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package im.vector.app.core.utils
18+
19+
import im.vector.app.test.test
20+
import kotlinx.coroutines.CoroutineScope
21+
import kotlinx.coroutines.Dispatchers
22+
import kotlinx.coroutines.delay
23+
import kotlinx.coroutines.flow.launchIn
24+
import kotlinx.coroutines.flow.onEach
25+
import kotlinx.coroutines.test.TestCoroutineScheduler
26+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
27+
import kotlinx.coroutines.test.runTest
28+
import kotlinx.coroutines.withContext
29+
import org.amshove.kluent.shouldContainSame
30+
import org.junit.Test
31+
32+
class DataSourceTest {
33+
34+
@Test
35+
fun `given PublishDataSource, when posting values before observing, then no value is observed`() = runTest {
36+
val publishDataSource = PublishDataSource<Int>()
37+
publishDataSource.post(0)
38+
publishDataSource.post(1)
39+
40+
publishDataSource.stream()
41+
.test(this)
42+
.assertNoValues()
43+
.finish()
44+
}
45+
46+
@Test
47+
fun `given PublishDataSource with a large enough buffer size, when posting values after observing, then only the latest values are observed`() = runTest {
48+
val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9)
49+
val publishDataSource = PublishDataSource<Int>(bufferSize = valuesToPost.size)
50+
publishDataSource.test(testScheduler, valuesToPost, valuesToPost)
51+
}
52+
53+
@Test
54+
fun `given PublishDataSource with a too small buffer size, when posting values after observing, then we are missing some values`() = runTest {
55+
val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9)
56+
val expectedValues = listOf(2, 9)
57+
val publishDataSource = PublishDataSource<Int>(bufferSize = 1)
58+
publishDataSource.test(testScheduler, valuesToPost, expectedValues)
59+
}
60+
61+
private suspend fun PublishDataSource<Int>.test(testScheduler: TestCoroutineScheduler, valuesToPost: List<Int>, expectedValues: List<Int>) {
62+
val values = ArrayList<Int>()
63+
val job = stream()
64+
.onEach {
65+
// Artificial delay to make consumption longer than production
66+
delay(10)
67+
values.add(it)
68+
}
69+
.launchIn(CoroutineScope(UnconfinedTestDispatcher(testScheduler)))
70+
71+
valuesToPost.forEach {
72+
post(it)
73+
}
74+
withContext(Dispatchers.Default) {
75+
delay(11L * valuesToPost.size)
76+
}
77+
job.cancel()
78+
79+
values shouldContainSame expectedValues
80+
}
81+
}

0 commit comments

Comments
 (0)