Skip to content

Commit 3835755

Browse files
committed
Merge remote-tracking branch 'origin/bugfix/eric/fix-upgrade-room' into feature/bca/fix_slow_space_switch_regression
2 parents 89e528d + d586f64 commit 3835755

File tree

3 files changed

+87
-2
lines changed

3 files changed

+87
-2
lines changed

changelog.d/6154.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed /upgraderoom command not doing anything

vector/src/main/java/im/vector/app/core/utils/DataSource.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ interface DataSource<T> {
2525
}
2626

2727
interface MutableDataSource<T> : DataSource<T> {
28+
2829
fun post(value: T)
2930
}
3031

@@ -49,10 +50,12 @@ open class BehaviorDataSource<T>(private val defaultValue: T? = null) : MutableD
4950

5051
/**
5152
* This datasource only emits all subsequent observed values to each subscriber.
53+
*
54+
* bufferSize - number of buffered items before it starts dropping oldest. Should be at least 1
5255
*/
53-
open class PublishDataSource<T> : MutableDataSource<T> {
56+
open class PublishDataSource<T>(bufferSize: Int = 10) : MutableDataSource<T> {
5457

55-
private val mutableFlow = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
58+
private val mutableFlow = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = bufferSize, onBufferOverflow = BufferOverflow.DROP_OLDEST)
5659

5760
override fun stream(): Flow<T> {
5861
return mutableFlow
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2022 New Vector Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package im.vector.app.core.utils
18+
19+
import im.vector.app.test.test
20+
import kotlinx.coroutines.CoroutineScope
21+
import kotlinx.coroutines.Dispatchers
22+
import kotlinx.coroutines.delay
23+
import kotlinx.coroutines.flow.launchIn
24+
import kotlinx.coroutines.flow.onEach
25+
import kotlinx.coroutines.test.TestCoroutineScheduler
26+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
27+
import kotlinx.coroutines.test.runTest
28+
import kotlinx.coroutines.withContext
29+
import org.amshove.kluent.shouldContainSame
30+
import org.junit.Test
31+
32+
class DataSourceTest {
33+
34+
@Test
35+
fun `given PublishDataSource, when posting values before observing, then no value is observed`() = runTest {
36+
val publishDataSource = PublishDataSource<Int>()
37+
publishDataSource.post(0)
38+
publishDataSource.post(1)
39+
40+
publishDataSource.stream()
41+
.test(this)
42+
.assertNoValues()
43+
.finish()
44+
}
45+
46+
@Test
47+
fun `given PublishDataSource with a large enough buffer size, when posting values after observing, then only the latest values are observed`() = runTest {
48+
val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9)
49+
val publishDataSource = PublishDataSource<Int>(bufferSize = valuesToPost.size)
50+
publishDataSource.test(testScheduler, valuesToPost, valuesToPost)
51+
}
52+
53+
@Test
54+
fun `given PublishDataSource with a too small buffer size, when posting values after observing, then we are missing some values`() = runTest {
55+
val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9)
56+
val expectedValues = listOf(2, 9)
57+
val publishDataSource = PublishDataSource<Int>(bufferSize = 1)
58+
publishDataSource.test(testScheduler, valuesToPost, expectedValues)
59+
}
60+
61+
private suspend fun PublishDataSource<Int>.test(testScheduler: TestCoroutineScheduler, valuesToPost: List<Int>, expectedValues: List<Int>) {
62+
val values = ArrayList<Int>()
63+
val job = stream()
64+
.onEach {
65+
// Artificial delay to make consumption longer than production
66+
delay(10)
67+
values.add(it)
68+
}
69+
.launchIn(CoroutineScope(UnconfinedTestDispatcher(testScheduler)))
70+
71+
valuesToPost.forEach {
72+
post(it)
73+
}
74+
withContext(Dispatchers.Default) {
75+
delay(11L * valuesToPost.size)
76+
}
77+
job.cancel()
78+
79+
values shouldContainSame expectedValues
80+
}
81+
}

0 commit comments

Comments
 (0)