Improve concurrency handling by switching some classes to `actor` and remove usage of `AtomicArray` and `AtomicDictionary` in those types

pull/7068/head
Hwee-Boon Yar 1 year ago
parent 2e0546bf3e
commit 994bd9204a
  1. 70
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift
  2. 4
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift
  3. 7
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift
  4. 2
      modules/AlphaWalletNotifications/AlphaWalletNotifications/Notifications/NotificationHandler.swift

@ -5,12 +5,12 @@ import BigInt
import Combine
import AlphaWalletCore
class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
actor EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
private let transactionDataStore: TransactionDataStore
private let session: WalletSession
private let ercTokenDetector: ErcTokenDetector
private let schedulerProviders: AtomicArray<SchedulerProviderData>
private lazy var pendingTransactionProvider: PendingTransactionProvider = {
private var schedulerProviders: [SchedulerProviderData]
private nonisolated lazy var pendingTransactionProvider: PendingTransactionProvider = {
return PendingTransactionProvider(
session: session,
transactionDataStore: transactionDataStore,
@ -20,10 +20,15 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
private let oldestTransferTransactionsScheduler: Scheduler
private let queue = DispatchQueue(label: "com.transactionProvider.updateQueue")
public var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProvider.PendingTransactionProviderError>, Never> {
public nonisolated var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProvider.PendingTransactionProviderError>, Never> {
pendingTransactionProvider.completeTransaction
}
public private (set) var state: TransactionProviderState = .pending
private var _state: TransactionProviderState = .pending
public var state: TransactionProviderState {
get async {
return _state
}
}
init(session: WalletSession,
analytics: AnalyticsLogger,
@ -78,12 +83,20 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
schedulerProviders.forEach { data in
data.schedulerProvider
.publisher
.sink { [weak self] in self?.handle(response: $0, provider: data.schedulerProvider) }
.sink { transactions in
Task { [weak self] in
await self?.handle(response: transactions, provider: data.schedulerProvider)
}
}
.store(in: &cancellable)
}
oldestTransactionsProvider.publisher
.sink { [weak self] in self?.handle(response: $0, provider: oldestTransactionsProvider) }
.sink { transactions in
Task { [weak self] in
await self?.handle(response: transactions, provider: oldestTransactionsProvider)
}
}
.store(in: &cancellable)
/*
@ -100,7 +113,11 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
pendingTransactionProvider.completeTransaction
.compactMap { try? $0.get() }
.sink { [weak self] in self?.forceFetchLatestTransactions(transaction: $0) }
.sink { transaction in
Task { [weak self] in
await self?.forceFetchLatestTransactions(transaction: transaction)
}
}
.store(in: &cancellable)
}
@ -113,39 +130,36 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
}
func resume() async {
guard state == .stopped else { return }
guard _state == .stopped else { return }
await pendingTransactionProvider.resumeScheduler()
schedulerProviders.forEach { $0.restart() }
oldestTransferTransactionsScheduler.restart()
state = .running
_state = .running
}
func pause() async {
guard state == .running || state == .pending else { return }
guard _state == .running || _state == .pending else { return }
await pendingTransactionProvider.cancelScheduler()
schedulerProviders.forEach { $0.cancel() }
oldestTransferTransactionsScheduler.cancel()
state = .stopped
_state = .stopped
}
func start() async {
guard state == .pending else { return }
guard _state == .pending else { return }
await pendingTransactionProvider.start()
schedulerProviders.forEach { $0.start() }
oldestTransferTransactionsScheduler.start()
queue.async { [weak self] in self?.removeUnknownTransactions() }
state = .running
_state = .running
}
public func isServer(_ server: RPCServer) -> Bool {
public nonisolated func isServer(_ server: RPCServer) -> Bool {
return session.server == server
}
//TODO: this method doesn't work right for now
public func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) -> AnyPublisher<[Transaction], PromiseError> {
func fetchLatestTransactions(transactions: [Transaction] = [],
schedulerProvider: SchedulerProvider & LatestTransactionProvidable) -> AnyPublisher<[Transaction], PromiseError> {
public func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) async -> AnyPublisher<[Transaction], PromiseError> {
func fetchLatestTransactions(transactions: [Transaction] = [], schedulerProvider: SchedulerProvider & LatestTransactionProvidable) -> AnyPublisher<[Transaction], PromiseError> {
var transactions = transactions
return schedulerProvider.fetchPublisher()
.flatMap { response -> AnyPublisher<[Transaction], PromiseError> in
@ -162,7 +176,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
}.eraseToAnyPublisher()
}
let publishers = fetchTypes.compactMap { getSchedulerProvider(fetchType: $0) }
let publishers = await fetchTypes.asyncCompactMap { getSchedulerProvider(fetchType: $0) }
.map {
fetchLatestTransactions(schedulerProvider: $0.schedulerProvider)
.replaceError(with: [])
@ -178,7 +192,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
.eraseToAnyPublisher()
}
private func removeUnknownTransactions() {
private nonisolated func removeUnknownTransactions() {
//TODO: why do we remove such transactions? especially `.failed` and `.unknown`?
transactionDataStore.removeTransactions(for: [.unknown], servers: [session.server])
}
@ -196,7 +210,7 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
}
}
private func addOrUpdate(transactions: [Transaction]) {
private nonisolated func addOrUpdate(transactions: [Transaction]) {
guard !transactions.isEmpty else { return }
Task { @MainActor in
@ -226,20 +240,20 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
if let operation = transaction.operation {
switch operation.operationType {
case .erc1155TokenTransfer:
guard let service = self.getSchedulerProvider(fetchType: .erc1155) else { return }
guard let service = getSchedulerProvider(fetchType: .erc1155) else { return }
service.restart(force: true)
case .erc20TokenTransfer:
guard let service = self.getSchedulerProvider(fetchType: .erc20) else { return }
guard let service = getSchedulerProvider(fetchType: .erc20) else { return }
service.restart(force: true)
case .erc721TokenTransfer:
guard let service = self.getSchedulerProvider(fetchType: .erc721) else { return }
guard let service = getSchedulerProvider(fetchType: .erc721) else { return }
service.restart(force: true)
default:
guard let service = self.getSchedulerProvider(fetchType: .normal) else { return }
guard let service = getSchedulerProvider(fetchType: .normal) else { return }
service.restart(force: true)
}
} else {
guard let service = self.getSchedulerProvider(fetchType: .normal) else { return }
guard let service = getSchedulerProvider(fetchType: .normal) else { return }
service.restart(force: true)
}
}

@ -26,12 +26,12 @@ public enum TransactionFetchType: String, CaseIterable {
}
public protocol SingleChainTransactionProvider: AnyObject {
var state: TransactionProviderState { get }
var state: TransactionProviderState { get async }
var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProvider.PendingTransactionProviderError>, Never> { get }
func start() async
func resume() async
func pause() async
func isServer(_ server: RPCServer) -> Bool
func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) -> AnyPublisher<[Transaction], PromiseError>
func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) async -> AnyPublisher<[Transaction], PromiseError>
}

@ -141,16 +141,15 @@ public class TransactionsService {
}
// when we receive a push notification in background we want to fetch latest transactions,
public func fetchLatestTransactions(server: RPCServer) -> AnyPublisher<[Transaction], PromiseError> {
public func fetchLatestTransactions(server: RPCServer) async -> AnyPublisher<[Transaction], PromiseError> {
guard let provider = providers[server] else { return .empty() }
return provider.fetchLatestTransactions(fetchTypes: TransactionFetchType.allCases)
return await provider.fetchLatestTransactions(fetchTypes: TransactionFetchType.allCases)
}
public func forceResumeOrStart(server: RPCServer) async {
guard let provider = providers[server] else { return }
switch provider.state {
switch await provider.state {
case .pending:
await provider.start()
case .running:

@ -102,7 +102,7 @@ public class AlphaWalletNotificationHandler: NotificationHandler {
guard keystore.currentWallet?.address == walletData.wallet else { return .noData }
if let dep = walletsDependencies.walletDependencies(walletAddress: walletData.wallet) {
dep.transactionsService.forceResumeOrStart(server: walletData.rpcServer)
await dep.transactionsService.forceResumeOrStart(server: walletData.rpcServer)
return .newData
}

Loading…
Cancel
Save