From 994bd9204a22f3e89bb79bdd12b204c59d53e4ee Mon Sep 17 00:00:00 2001 From: Hwee-Boon Yar Date: Sun, 22 Oct 2023 18:29:45 +0800 Subject: [PATCH] Improve concurrency handling by switching some classes to `actor` and remove usage of `AtomicArray` and `AtomicDictionary` in those types --- ...erscanSingleChainTransactionProvider.swift | 70 +++++++++++-------- .../SingleChainTransactionProvider.swift | 4 +- .../Transactions/TransactionsService.swift | 7 +- .../Notifications/NotificationHandler.swift | 2 +- 4 files changed, 48 insertions(+), 35 deletions(-) diff --git a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift index 553f45583..d09e6ecb2 100644 --- a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift +++ b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift @@ -5,12 +5,12 @@ import BigInt import Combine import AlphaWalletCore -class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { +actor EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { private let transactionDataStore: TransactionDataStore private let session: WalletSession private let ercTokenDetector: ErcTokenDetector - private let schedulerProviders: AtomicArray - private lazy var pendingTransactionProvider: PendingTransactionProvider = { + private var schedulerProviders: [SchedulerProviderData] + private nonisolated lazy var pendingTransactionProvider: PendingTransactionProvider = { return PendingTransactionProvider( session: session, transactionDataStore: transactionDataStore, @@ -20,10 +20,15 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { private let oldestTransferTransactionsScheduler: Scheduler private let queue = DispatchQueue(label: "com.transactionProvider.updateQueue") - public var completeTransaction: AnyPublisher, Never> { + public nonisolated var completeTransaction: AnyPublisher, Never> { pendingTransactionProvider.completeTransaction } - public private (set) var state: TransactionProviderState = .pending + private var _state: TransactionProviderState = .pending + public var state: TransactionProviderState { + get async { + return _state + } + } init(session: WalletSession, analytics: AnalyticsLogger, @@ -78,12 +83,20 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { schedulerProviders.forEach { data in data.schedulerProvider .publisher - .sink { [weak self] in self?.handle(response: $0, provider: data.schedulerProvider) } + .sink { transactions in + Task { [weak self] in + await self?.handle(response: transactions, provider: data.schedulerProvider) + } + } .store(in: &cancellable) } oldestTransactionsProvider.publisher - .sink { [weak self] in self?.handle(response: $0, provider: oldestTransactionsProvider) } + .sink { transactions in + Task { [weak self] in + await self?.handle(response: transactions, provider: oldestTransactionsProvider) + } + } .store(in: &cancellable) /* @@ -100,7 +113,11 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { pendingTransactionProvider.completeTransaction .compactMap { try? $0.get() } - .sink { [weak self] in self?.forceFetchLatestTransactions(transaction: $0) } + .sink { transaction in + Task { [weak self] in + await self?.forceFetchLatestTransactions(transaction: transaction) + } + } .store(in: &cancellable) } @@ -113,39 +130,36 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { } func resume() async { - guard state == .stopped else { return } + guard _state == .stopped else { return } await pendingTransactionProvider.resumeScheduler() schedulerProviders.forEach { $0.restart() } oldestTransferTransactionsScheduler.restart() - state = .running + _state = .running } func pause() async { - guard state == .running || state == .pending else { return } + guard _state == .running || _state == .pending else { return } await pendingTransactionProvider.cancelScheduler() schedulerProviders.forEach { $0.cancel() } oldestTransferTransactionsScheduler.cancel() - state = .stopped + _state = .stopped } func start() async { - guard state == .pending else { return } + guard _state == .pending else { return } await pendingTransactionProvider.start() schedulerProviders.forEach { $0.start() } oldestTransferTransactionsScheduler.start() queue.async { [weak self] in self?.removeUnknownTransactions() } - state = .running + _state = .running } - public func isServer(_ server: RPCServer) -> Bool { + public nonisolated func isServer(_ server: RPCServer) -> Bool { return session.server == server } //TODO: this method doesn't work right for now - public func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) -> AnyPublisher<[Transaction], PromiseError> { - - func fetchLatestTransactions(transactions: [Transaction] = [], - schedulerProvider: SchedulerProvider & LatestTransactionProvidable) -> AnyPublisher<[Transaction], PromiseError> { - + public func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) async -> AnyPublisher<[Transaction], PromiseError> { + func fetchLatestTransactions(transactions: [Transaction] = [], schedulerProvider: SchedulerProvider & LatestTransactionProvidable) -> AnyPublisher<[Transaction], PromiseError> { var transactions = transactions return schedulerProvider.fetchPublisher() .flatMap { response -> AnyPublisher<[Transaction], PromiseError> in @@ -162,7 +176,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { }.eraseToAnyPublisher() } - let publishers = fetchTypes.compactMap { getSchedulerProvider(fetchType: $0) } + let publishers = await fetchTypes.asyncCompactMap { getSchedulerProvider(fetchType: $0) } .map { fetchLatestTransactions(schedulerProvider: $0.schedulerProvider) .replaceError(with: []) @@ -178,7 +192,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { .eraseToAnyPublisher() } - private func removeUnknownTransactions() { + private nonisolated func removeUnknownTransactions() { //TODO: why do we remove such transactions? especially `.failed` and `.unknown`? transactionDataStore.removeTransactions(for: [.unknown], servers: [session.server]) } @@ -196,7 +210,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { } } - private func addOrUpdate(transactions: [Transaction]) { + private nonisolated func addOrUpdate(transactions: [Transaction]) { guard !transactions.isEmpty else { return } Task { @MainActor in @@ -226,20 +240,20 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider { if let operation = transaction.operation { switch operation.operationType { case .erc1155TokenTransfer: - guard let service = self.getSchedulerProvider(fetchType: .erc1155) else { return } + guard let service = getSchedulerProvider(fetchType: .erc1155) else { return } service.restart(force: true) case .erc20TokenTransfer: - guard let service = self.getSchedulerProvider(fetchType: .erc20) else { return } + guard let service = getSchedulerProvider(fetchType: .erc20) else { return } service.restart(force: true) case .erc721TokenTransfer: - guard let service = self.getSchedulerProvider(fetchType: .erc721) else { return } + guard let service = getSchedulerProvider(fetchType: .erc721) else { return } service.restart(force: true) default: - guard let service = self.getSchedulerProvider(fetchType: .normal) else { return } + guard let service = getSchedulerProvider(fetchType: .normal) else { return } service.restart(force: true) } } else { - guard let service = self.getSchedulerProvider(fetchType: .normal) else { return } + guard let service = getSchedulerProvider(fetchType: .normal) else { return } service.restart(force: true) } } diff --git a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift index 2eef5129b..8d338ad5d 100644 --- a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift +++ b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift @@ -26,12 +26,12 @@ public enum TransactionFetchType: String, CaseIterable { } public protocol SingleChainTransactionProvider: AnyObject { - var state: TransactionProviderState { get } + var state: TransactionProviderState { get async } var completeTransaction: AnyPublisher, Never> { get } func start() async func resume() async func pause() async func isServer(_ server: RPCServer) -> Bool - func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) -> AnyPublisher<[Transaction], PromiseError> + func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) async -> AnyPublisher<[Transaction], PromiseError> } diff --git a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift index 2a7f15b99..9d02c0a2f 100644 --- a/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift +++ b/modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift @@ -141,16 +141,15 @@ public class TransactionsService { } // when we receive a push notification in background we want to fetch latest transactions, - public func fetchLatestTransactions(server: RPCServer) -> AnyPublisher<[Transaction], PromiseError> { + public func fetchLatestTransactions(server: RPCServer) async -> AnyPublisher<[Transaction], PromiseError> { guard let provider = providers[server] else { return .empty() } - return provider.fetchLatestTransactions(fetchTypes: TransactionFetchType.allCases) + return await provider.fetchLatestTransactions(fetchTypes: TransactionFetchType.allCases) } public func forceResumeOrStart(server: RPCServer) async { guard let provider = providers[server] else { return } - - switch provider.state { + switch await provider.state { case .pending: await provider.start() case .running: diff --git a/modules/AlphaWalletNotifications/AlphaWalletNotifications/Notifications/NotificationHandler.swift b/modules/AlphaWalletNotifications/AlphaWalletNotifications/Notifications/NotificationHandler.swift index 6c4017299..065fb65f7 100644 --- a/modules/AlphaWalletNotifications/AlphaWalletNotifications/Notifications/NotificationHandler.swift +++ b/modules/AlphaWalletNotifications/AlphaWalletNotifications/Notifications/NotificationHandler.swift @@ -102,7 +102,7 @@ public class AlphaWalletNotificationHandler: NotificationHandler { guard keystore.currentWallet?.address == walletData.wallet else { return .noData } if let dep = walletsDependencies.walletDependencies(walletAddress: walletData.wallet) { - dep.transactionsService.forceResumeOrStart(server: walletData.rpcServer) + await dep.transactionsService.forceResumeOrStart(server: walletData.rpcServer) return .newData }