-
Notifications
You must be signed in to change notification settings - Fork 782
Feature/bma/incr sync perf #6917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
13f7a9f
9a0ea7b
a7666e2
94a8774
e3f5d15
c9e76f5
5c02290
1a79828
aa750cc
f668be5
a8eb7d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix long incremental sync. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,32 +16,39 @@ | |
|
||
package org.matrix.android.sdk.internal.session.sync.handler | ||
|
||
import com.zhuinden.monarchy.Monarchy | ||
import androidx.work.BackoffPolicy | ||
import androidx.work.ExistingWorkPolicy | ||
import org.matrix.android.sdk.api.MatrixPatterns | ||
import org.matrix.android.sdk.api.extensions.tryOrNull | ||
import org.matrix.android.sdk.api.session.user.model.User | ||
import org.matrix.android.sdk.internal.di.SessionDatabase | ||
import org.matrix.android.sdk.internal.session.profile.GetProfileInfoTask | ||
import org.matrix.android.sdk.internal.crypto.crosssigning.DefaultCrossSigningService | ||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker | ||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorkerDataRepository | ||
import org.matrix.android.sdk.internal.di.SessionId | ||
import org.matrix.android.sdk.internal.di.WorkManagerProvider | ||
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore | ||
import org.matrix.android.sdk.internal.session.sync.SyncResponsePostTreatmentAggregator | ||
import org.matrix.android.sdk.internal.session.sync.model.accountdata.toMutable | ||
import org.matrix.android.sdk.internal.session.user.UserEntityFactory | ||
import org.matrix.android.sdk.internal.session.user.accountdata.DirectChatsHelper | ||
import org.matrix.android.sdk.internal.session.user.accountdata.UpdateUserAccountDataTask | ||
import org.matrix.android.sdk.internal.util.awaitTransaction | ||
import org.matrix.android.sdk.internal.util.logLimit | ||
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory | ||
import timber.log.Timber | ||
import java.util.concurrent.TimeUnit | ||
import javax.inject.Inject | ||
|
||
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor( | ||
private val directChatsHelper: DirectChatsHelper, | ||
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore, | ||
private val updateUserAccountDataTask: UpdateUserAccountDataTask, | ||
private val getProfileInfoTask: GetProfileInfoTask, | ||
@SessionDatabase private val monarchy: Monarchy, | ||
private val crossSigningService: DefaultCrossSigningService, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we inject the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The arch is not really clean, we have exception for the crypto code. |
||
private val updateTrustWorkerDataRepository: UpdateTrustWorkerDataRepository, | ||
private val workManagerProvider: WorkManagerProvider, | ||
@SessionId private val sessionId: String, | ||
) { | ||
suspend fun handle(aggregator: SyncResponsePostTreatmentAggregator) { | ||
cleanupEphemeralFiles(aggregator.ephemeralFilesToDelete) | ||
updateDirectUserIds(aggregator.directChatsToCheck) | ||
fetchAndUpdateUsers(aggregator.userIdsToFetch) | ||
handleUserIdsForCheckingTrustAndAffectedRoomShields(aggregator.userIdsForCheckingTrustAndAffectedRoomShields) | ||
} | ||
|
||
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) { | ||
|
@@ -79,23 +86,26 @@ internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor( | |
} | ||
} | ||
|
||
private suspend fun fetchAndUpdateUsers(userIdsToFetch: List<String>) { | ||
fetchUsers(userIdsToFetch) | ||
.takeIf { it.isNotEmpty() } | ||
?.saveLocally() | ||
} | ||
private fun fetchAndUpdateUsers(userIdsToFetch: Collection<String>) { | ||
if (userIdsToFetch.isEmpty()) return | ||
Timber.d("## Configure Worker to update users: ${userIdsToFetch.logLimit()}") | ||
val workerParams = UpdateTrustWorker.Params( | ||
sessionId = sessionId, | ||
filename = updateTrustWorkerDataRepository.createParam(userIdsToFetch.toList()) | ||
) | ||
val workerData = WorkerParamsFactory.toData(workerParams) | ||
|
||
private suspend fun fetchUsers(userIdsToFetch: List<String>) = userIdsToFetch.mapNotNull { | ||
tryOrNull { | ||
val profileJson = getProfileInfoTask.execute(GetProfileInfoTask.Params(it)) | ||
User.fromJson(it, profileJson) | ||
} | ||
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<UpdateUserWorker>() | ||
.setInputData(workerData) | ||
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS) | ||
.build() | ||
|
||
workManagerProvider.workManager | ||
.beginUniqueWork("USER_UPDATE_QUEUE", ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest) | ||
.enqueue() | ||
} | ||
|
||
private suspend fun List<User>.saveLocally() { | ||
val userEntities = map { user -> UserEntityFactory.create(user) } | ||
monarchy.awaitTransaction { | ||
it.insertOrUpdate(userEntities) | ||
} | ||
private fun handleUserIdsForCheckingTrustAndAffectedRoomShields(userIdsWithDeviceUpdate: Iterable<String>) { | ||
crossSigningService.checkTrustAndAffectedRoomShields(userIdsWithDeviceUpdate.toList()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright 2022 The Matrix.org Foundation C.I.C. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.matrix.android.sdk.internal.session.sync.handler | ||
|
||
import android.content.Context | ||
import androidx.work.WorkerParameters | ||
import com.zhuinden.monarchy.Monarchy | ||
import org.matrix.android.sdk.api.extensions.tryOrNull | ||
import org.matrix.android.sdk.api.session.user.model.User | ||
import org.matrix.android.sdk.internal.SessionManager | ||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker | ||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorkerDataRepository | ||
import org.matrix.android.sdk.internal.di.SessionDatabase | ||
import org.matrix.android.sdk.internal.session.SessionComponent | ||
import org.matrix.android.sdk.internal.session.profile.GetProfileInfoTask | ||
import org.matrix.android.sdk.internal.session.user.UserEntityFactory | ||
import org.matrix.android.sdk.internal.util.awaitTransaction | ||
import org.matrix.android.sdk.internal.util.logLimit | ||
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker | ||
import timber.log.Timber | ||
import javax.inject.Inject | ||
|
||
/** | ||
* Note: We reuse the same type [UpdateTrustWorker.Params], since the input data are the same. | ||
*/ | ||
internal class UpdateUserWorker(context: Context, params: WorkerParameters, sessionManager: SessionManager) : | ||
SessionSafeCoroutineWorker<UpdateTrustWorker.Params>(context, params, sessionManager, UpdateTrustWorker.Params::class.java) { | ||
|
||
@SessionDatabase | ||
@Inject lateinit var monarchy: Monarchy | ||
@Inject lateinit var updateTrustWorkerDataRepository: UpdateTrustWorkerDataRepository | ||
@Inject lateinit var getProfileInfoTask: GetProfileInfoTask | ||
|
||
override fun injectWith(injector: SessionComponent) { | ||
injector.inject(this) | ||
} | ||
|
||
override suspend fun doSafeWork(params: UpdateTrustWorker.Params): Result { | ||
val userList = params.filename | ||
?.let { updateTrustWorkerDataRepository.getParam(it) } | ||
?.userIds | ||
?: params.updatedUserIds.orEmpty() | ||
|
||
// List should not be empty, but let's avoid go further in case of empty list | ||
if (userList.isNotEmpty()) { | ||
Timber.v("## UpdateUserWorker - updating users: ${userList.logLimit()}") | ||
fetchAndUpdateUsers(userList) | ||
} | ||
|
||
cleanup(params) | ||
return Result.success() | ||
} | ||
|
||
private suspend fun fetchAndUpdateUsers(userIdsToFetch: Collection<String>) { | ||
fetchUsers(userIdsToFetch) | ||
.takeIf { it.isNotEmpty() } | ||
?.saveLocally() | ||
} | ||
|
||
private suspend fun fetchUsers(userIdsToFetch: Collection<String>) = userIdsToFetch.mapNotNull { | ||
tryOrNull { | ||
val profileJson = getProfileInfoTask.execute(GetProfileInfoTask.Params(it)) | ||
User.fromJson(it, profileJson) | ||
} | ||
} | ||
|
||
private suspend fun List<User>.saveLocally() { | ||
val userEntities = map { user -> UserEntityFactory.create(user) } | ||
Timber.d("## saveLocally()") | ||
monarchy.awaitTransaction { | ||
Timber.d("## saveLocally() - in transaction") | ||
it.insertOrUpdate(userEntities) | ||
} | ||
Timber.d("## saveLocally() - END") | ||
} | ||
|
||
private fun cleanup(params: UpdateTrustWorker.Params) { | ||
params.filename | ||
?.let { updateTrustWorkerDataRepository.delete(it) } | ||
} | ||
|
||
override fun buildErrorParams(params: UpdateTrustWorker.Params, message: String): UpdateTrustWorker.Params { | ||
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I struggle to understand why using the
aggregator
changes something here. Seeing the implementation ofonUsersDeviceUpdate
, I see we only schedule a worker we should not take so much time, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current code,
crossSigningService.onUsersDeviceUpdate(otherRoomMembers)
is called once per room.With the new code this is called once per sync response (which can contains many rooms)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay thanks for explaining.