Improving concurrency handling by switching some classes to `actor` and remove usage of `AtomicArray` and `AtomicDictionary` in those types

pull/7067/head
Hwee-Boon Yar 1 year ago
parent 04e5dde571
commit 5be750ea7d
  1. 8
      AlphaWallet/ActiveWalletCoordinator.swift
  2. 6
      AlphaWallet/Activities/ViewControllers/ActivityViewController.swift
  3. 4
      AlphaWallet/Tokens/Collectibles/ViewModels/NFTCollectionViewModel.swift
  4. 4
      AlphaWallet/Tokens/Coordinators/FungibleTokenCoordinator.swift
  5. 25
      AlphaWallet/Tokens/Coordinators/SingleChainTokenCoordinator.swift
  6. 32
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Activities/ActivitiesGenerator.swift
  7. 17
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Activities/ActivitiesPipeLine.swift
  8. 96
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Activities/ActivitiesService.swift
  9. 34
      modules/AlphaWalletFoundation/AlphaWalletFoundation/CoinTicker/CoinTickers.swift
  10. 6
      modules/AlphaWalletFoundation/AlphaWalletFoundation/CoinTicker/Types/CoinTickersFetcher.swift
  11. 14
      modules/AlphaWalletFoundation/AlphaWalletFoundation/NFT/Enjin/EnjinUserManagementInterceptor.swift
  12. 4
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Tokens/TokensProcessingPipeline.swift
  13. 28
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/EtherscanSingleChainTransactionProvider.swift
  14. 41
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/PendingTransaction/PendingTransactionProvider.swift
  15. 6
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/SingleChainTransactionProvider.swift
  16. 22
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionProvider/TransactionProvider.swift
  17. 51
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Transactions/TransactionsService.swift
  18. 10
      modules/AlphaWalletTokenScript/AlphaWalletTokenScript/Views/TokenScriptWebView.swift

@ -805,8 +805,8 @@ extension ActiveWalletCoordinator: ActivityViewControllerDelegate {
.publisher(queue: .main)
}
func reinject(viewController: ActivityViewController) {
activitiesPipeLine.reinject(activity: viewController.viewModel.activity)
func reinject(viewController: ActivityViewController) async {
await activitiesPipeLine.reinject(activity: viewController.viewModel.activity)
}
func goToToken(viewController: ActivityViewController) {
@ -914,7 +914,9 @@ extension ActiveWalletCoordinator: TokensCoordinatorDelegate {
func viewWillAppearOnce(in coordinator: TokensCoordinator) {
tokensService.refreshBalance(updatePolicy: .all)
activitiesPipeLine.start()
Task {
await activitiesPipeLine.start()
}
}
func blockieSelected(in coordinator: TokensCoordinator) {

@ -7,7 +7,7 @@ import AlphaWalletFoundation
import AlphaWalletTokenScript
protocol ActivityViewControllerDelegate: AnyObject, RequestSignMessageDelegate {
func reinject(viewController: ActivityViewController)
func reinject(viewController: ActivityViewController) async
func goToToken(viewController: ActivityViewController)
func speedupTransaction(transactionId: String, server: RPCServer, viewController: ActivityViewController)
func cancelTransaction(transactionId: String, server: RPCServer, viewController: ActivityViewController)
@ -254,7 +254,7 @@ extension ActivityViewController: TokenScriptWebViewDelegate {
//no-op
}
func reinject(tokenScriptWebView: TokenScriptWebView) {
delegate?.reinject(viewController: self)
func reinject(tokenScriptWebView: TokenScriptWebView) async {
await delegate?.reinject(viewController: self)
}
}

@ -83,7 +83,9 @@ final class NFTCollectionViewModel {
}
func transform(input: NFTCollectionViewModelInput) -> NFTCollectionViewModelOutput {
activitiesService.start()
Task {
await activitiesService.start()
}
tokensService.tokenHoldersPublisher(for: token)
.assign(to: \.value, on: tokenHolders)

@ -102,7 +102,9 @@ class FungibleTokenCoordinator: Coordinator {
viewController.configure(viewModel: $0.activitiesViewModel)
}.store(in: &cancelable)
activitiesService.start()
Task {
await activitiesService.start()
}
return viewController
}

@ -87,32 +87,20 @@ class SingleChainTokenCoordinator: Coordinator {
}
let activitiesFilterStrategy = token.activitiesFilterStrategy
let activitiesService = self.activitiesService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: TransactionDataStore.functional.transactionsFilter(for: activitiesFilterStrategy, token: token))
let coordinator = NFTCollectionCoordinator(
session: session,
navigationController: navigationController,
keystore: keystore,
token: token,
assetDefinitionStore: assetDefinitionStore,
analytics: analytics,
nftProvider: nftProvider,
activitiesService: activitiesService,
tokensService: tokensPipeline,
sessionsProvider: sessionsProvider,
currencyService: currencyService,
tokenImageFetcher: tokenImageFetcher,
tokenActionsProvider: tokenActionsProvider)
Task {
let activitiesService = await self.activitiesService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: TransactionDataStore.functional.transactionsFilter(for: activitiesFilterStrategy, token: token))
let coordinator = NFTCollectionCoordinator(session: session, navigationController: navigationController, keystore: keystore, token: token, assetDefinitionStore: assetDefinitionStore, analytics: analytics, nftProvider: nftProvider, activitiesService: activitiesService, tokensService: tokensPipeline, sessionsProvider: sessionsProvider, currencyService: currencyService, tokenImageFetcher: tokenImageFetcher, tokenActionsProvider: tokenActionsProvider)
addCoordinator(coordinator)
coordinator.delegate = self
coordinator.start()
}
}
func show(fungibleToken token: Token, navigationController: UINavigationController) {
//NOTE: create half mutable copy of `activitiesService` to configure it for fetching activities for specific token
let activitiesFilterStrategy = token.activitiesFilterStrategy
let activitiesService = self.activitiesService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: TransactionDataStore.functional.transactionsFilter(for: activitiesFilterStrategy, token: token))
Task {
let activitiesService = await self.activitiesService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: TransactionDataStore.functional.transactionsFilter(for: activitiesFilterStrategy, token: token))
Task { @MainActor in
let coordinator = await FungibleTokenCoordinator(token: token, navigationController: navigationController, session: session, keystore: keystore, assetDefinitionStore: assetDefinitionStore, analytics: analytics, tokenActionsProvider: tokenActionsProvider, coinTickersProvider: coinTickersProvider, activitiesService: activitiesService, alertService: alertService, tokensPipeline: tokensPipeline, sessionsProvider: sessionsProvider, currencyService: currencyService, tokenImageFetcher: tokenImageFetcher, tokensService: tokensService)
@ -121,6 +109,7 @@ class SingleChainTokenCoordinator: Coordinator {
coordinator.start()
}
}
}
private func showTokenInstanceActionView(forAction action: TokenInstanceAction, fungibleTokenObject token: Token, navigationController: UINavigationController) {
let tokenHolder = session.tokenAdaptor.getTokenHolder(token: token)

@ -12,14 +12,14 @@ import AlphaWalletCore
import AlphaWalletTokenScript
import CombineExt
class ActivitiesGenerator {
actor ActivitiesGenerator {
private let sessionsProvider: SessionsProvider
private let transactionsFilterStrategy: TransactionsFilterStrategy
private let tokensService: TokensService
private let activitiesFilterStrategy: ActivitiesFilterStrategy
private let eventsActivityDataStore: EventsActivityDataStoreProtocol
var tokensAndTokenHolders: AtomicDictionary<AddressAndRPCServer, [TokenHolder]> = .init()
var tokensAndTokenHolders: [AddressAndRPCServer: [TokenHolder]] = [:]
init(sessionsProvider: SessionsProvider,
transactionsFilterStrategy: TransactionsFilterStrategy,
@ -143,20 +143,12 @@ class ActivitiesGenerator {
}
}
private func getActivities(contract: AlphaWallet.Address, server: RPCServer, card: TokenScriptCard, interpolatedFilter: String) async -> [ActivityTokenObjectTokenHolder] {
//NOTE: eventsActivityDataStore. getRecentEvents() returns only 100 events, that could cause error with creating activities (missing events)
//replace with fetching only filtered event instances,
let events = await eventsActivityDataStore.getRecentEventsSortedByBlockNumber(for: card.eventOrigin.contract, server: server, eventName: card.eventOrigin.eventName, interpolatedFilter: interpolatedFilter)
let activitiesForThisCard: [ActivityTokenObjectTokenHolder] = await events.asyncCompactMap { eachEvent -> ActivityTokenObjectTokenHolder? in
guard let token = await tokensService.token(for: contract, server: server) else { return nil }
guard let session = sessionsProvider.session(for: token.server) else { return nil }
let implicitAttributes = generateImplicitAttributesForToken(contract: contract, server: server, symbol: token.symbol)
private func getActivityForEvent(token: Token, session: WalletSession, card: TokenScriptCard, event: EventActivityInstance) async -> ActivityTokenObjectTokenHolder? {
let implicitAttributes = generateImplicitAttributesForToken(contract: token.contractAddress, server: session.server, symbol: token.symbol)
let tokenAttributes = implicitAttributes
var cardAttributes = functional.generateImplicitAttributesForCard(forContract: contract, server: server, event: eachEvent)
var cardAttributes = functional.generateImplicitAttributesForCard(forContract: token.contractAddress, server: session.server, event: event)
cardAttributes.merge(eachEvent.data) { _, new in new }
cardAttributes.merge(event.data) { _, new in new }
for parameter in card.eventOrigin.parameters {
guard let originalValue = cardAttributes[parameter.name] else { continue }
@ -196,11 +188,21 @@ class ActivitiesGenerator {
//no-op
}
let activity = Activity(id: Int.random(in: 0..<Int.max), rowType: .standalone, token: token, server: eachEvent.server, name: card.name, eventName: eachEvent.eventName, blockNumber: eachEvent.blockNumber, transactionId: eachEvent.transactionId, transactionIndex: eachEvent.transactionIndex, logIndex: eachEvent.logIndex, date: eachEvent.date, values: (token: tokenAttributes, card: cardAttributes), view: card.view, itemView: card.itemView, isBaseCard: card.isBase, state: .completed)
let activity = Activity(id: Int.random(in: 0..<Int.max), rowType: .standalone, token: token, server: event.server, name: card.name, eventName: event.eventName, blockNumber: event.blockNumber, transactionId: event.transactionId, transactionIndex: event.transactionIndex, logIndex: event.logIndex, date: event.date, values: (token: tokenAttributes, card: cardAttributes), view: card.view, itemView: card.itemView, isBaseCard: card.isBase, state: .completed)
return (activity: activity, tokenObject: token, tokenHolder: tokenHolder)
}
private func getActivities(contract: AlphaWallet.Address, server: RPCServer, card: TokenScriptCard, interpolatedFilter: String) async -> [ActivityTokenObjectTokenHolder] {
guard let token: Token = await tokensService.token(for: contract, server: server) else { return [] }
guard let session: WalletSession = sessionsProvider.session(for: token.server) else { return [] }
//NOTE: eventsActivityDataStore. getRecentEvents() returns only 100 events, that could cause error with creating activities (missing events)
//replace with fetching only filtered event instances,
let events = await eventsActivityDataStore.getRecentEventsSortedByBlockNumber(for: card.eventOrigin.contract, server: server, eventName: card.eventOrigin.eventName, interpolatedFilter: interpolatedFilter)
let activitiesForThisCard: [ActivityTokenObjectTokenHolder] = await events.asyncCompactMap { eachEvent -> ActivityTokenObjectTokenHolder? in
return await self.getActivityForEvent(token: token, session: session, card: card, event: eachEvent)
}
return activitiesForThisCard
}

@ -72,25 +72,24 @@ public final class ActivitiesPipeLine: ActivitiesServiceType {
self.sessionsProvider = sessionsProvider
}
public func start() {
public func start() async {
//NOTE: need to figure out creating xml handlers, object creating takes a lot of resources
eventSource.start()
eventSourceForActivities?.start()
activitiesSubService.start()
await activitiesSubService.start()
}
public func stop() {
activitiesSubService.stop()
public func stop() async {
await activitiesSubService.stop()
eventSource.stop()
eventSourceForActivities?.stop()
}
public func reinject(activity: Activity) {
activitiesSubService.reinject(activity: activity)
public func reinject(activity: Activity) async {
await activitiesSubService.reinject(activity: activity)
}
public func copy(activitiesFilterStrategy: ActivitiesFilterStrategy, transactionsFilterStrategy: TransactionsFilterStrategy) -> ActivitiesServiceType {
activitiesSubService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: transactionsFilterStrategy)
public func copy(activitiesFilterStrategy: ActivitiesFilterStrategy, transactionsFilterStrategy: TransactionsFilterStrategy) async -> ActivitiesServiceType {
return await activitiesSubService.copy(activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: transactionsFilterStrategy)
}
}

@ -17,24 +17,24 @@ public protocol ActivitiesServiceType: AnyObject {
var activitiesPublisher: AnyPublisher<[ActivityCollection.MappedToDateActivityOrTransaction], Never> { get }
var didUpdateActivityPublisher: AnyPublisher<Activity, Never> { get }
func start()
func stop()
func reinject(activity: Activity)
func copy(activitiesFilterStrategy: ActivitiesFilterStrategy, transactionsFilterStrategy: TransactionsFilterStrategy) -> ActivitiesServiceType
func start() async
func stop() async
func reinject(activity: Activity) async
func copy(activitiesFilterStrategy: ActivitiesFilterStrategy, transactionsFilterStrategy: TransactionsFilterStrategy) async -> ActivitiesServiceType
}
typealias ContractsAndCards = [(contract: AlphaWallet.Address, server: RPCServer, card: TokenScriptCard, interpolatedFilter: String)]
typealias ActivityTokenObjectTokenHolder = (activity: Activity, tokenObject: Token, tokenHolder: TokenHolder)
typealias TokenObjectsAndXMLHandlers = [(contract: AlphaWallet.Address, server: RPCServer, xmlHandler: XMLHandler)]
public class ActivitiesService: ActivitiesServiceType {
public actor ActivitiesService: ActivitiesServiceType {
let sessionsProvider: SessionsProvider
private let tokensService: TokensService
private let eventsActivityDataStore: EventsActivityDataStoreProtocol
//Dictionary for lookup. Using `.firstIndex` too many times is too slow (60s for 10k events)
private var activitiesIndexLookup: AtomicDictionary<Int, (index: Int, activity: Activity)> = .init()
private var cancellableSet: AtomicDictionary<AddressAndRPCServer, AnyCancellable> = .init()
private var activities: AtomicArray<Activity> = .init()
private var activitiesIndexLookup: [Int: (index: Int, activity: Activity)] = [:]
private var cancellableSet: [AddressAndRPCServer: AnyCancellable] = [:]
private var activities: [Activity] = []
private let didUpdateActivitySubject: PassthroughSubject<Activity, Never> = .init()
private let activitiesSubject: CurrentValueSubject<[ActivityCollection.MappedToDateActivityOrTransaction], Never> = .init([])
private var cancellable = Set<AnyCancellable>()
@ -47,11 +47,11 @@ public class ActivitiesService: ActivitiesServiceType {
private let transactionDataStore: TransactionDataStore
private let transactionsFilterStrategy: TransactionsFilterStrategy
public var activitiesPublisher: AnyPublisher<[ActivityCollection.MappedToDateActivityOrTransaction], Never> {
public nonisolated var activitiesPublisher: AnyPublisher<[ActivityCollection.MappedToDateActivityOrTransaction], Never> {
activitiesSubject.eraseToAnyPublisher()
}
public var didUpdateActivityPublisher: AnyPublisher<Activity, Never> {
public nonisolated var didUpdateActivityPublisher: AnyPublisher<Activity, Never> {
didUpdateActivitySubject.eraseToAnyPublisher()
}
@ -79,7 +79,7 @@ public class ActivitiesService: ActivitiesServiceType {
eventsActivityDataStore: eventsActivityDataStore)
}
public func start() {
public func start() async {
let transactionsChangeset = sessionsProvider.sessions
.receive(on: DispatchQueue.main)
.handleEvents(receiveOutput: { [activitiesSubject] _ in activitiesSubject.send([]) })
@ -88,39 +88,36 @@ public class ActivitiesService: ActivitiesServiceType {
transactionDataStore.transactionsChangeset(filter: transactionsFilterStrategy, servers: $0)
}
let activities = activitiesGenerator.generateActivities()
let activities = await activitiesGenerator.generateActivities()
Publishers.CombineLatest(transactionsChangeset, activities)
.sink { [weak self] data in self?.createActivities(activitiesAndTokens: data.1) }
.store(in: &cancellable)
.sink { data in
Task { [weak self] in
await self?.createActivities(activitiesAndTokens: data.1)
}
}.store(in: &cancellable)
didUpdateActivitySubject
.debounce(for: .seconds(5), scheduler: RunLoop.main)
.receive(on: DispatchQueue.global())
.sink { [weak self] _ in
self?.combineActivitiesWithTransactions()
.sink { _ in
Task { [weak self] in
await self?.combineActivitiesWithTransactions()
}
}.store(in: &cancellable)
}
public func stop() {
public func stop() async {
cancellable.cancellAll()
}
public func copy(activitiesFilterStrategy: ActivitiesFilterStrategy,
transactionsFilterStrategy: TransactionsFilterStrategy) -> ActivitiesServiceType {
return ActivitiesService(
sessionsProvider: sessionsProvider,
eventsActivityDataStore: eventsActivityDataStore,
transactionDataStore: transactionDataStore,
activitiesFilterStrategy: activitiesFilterStrategy,
transactionsFilterStrategy: transactionsFilterStrategy,
tokensService: tokensService)
public func copy(activitiesFilterStrategy: ActivitiesFilterStrategy, transactionsFilterStrategy: TransactionsFilterStrategy) async -> ActivitiesServiceType {
return ActivitiesService(sessionsProvider: sessionsProvider, eventsActivityDataStore: eventsActivityDataStore, transactionDataStore: transactionDataStore, activitiesFilterStrategy: activitiesFilterStrategy, transactionsFilterStrategy: transactionsFilterStrategy, tokensService: tokensService)
}
private func createActivities(activitiesAndTokens: [ActivityTokenObjectTokenHolder]) {
activities.set(array: activitiesAndTokens.compactMap { $0.activity }.sorted { $0.blockNumber > $1.blockNumber })
updateActivitiesIndexLookup(with: activities.all)
activities = activitiesAndTokens.compactMap { $0.activity }.sorted { $0.blockNumber > $1.blockNumber }
updateActivitiesIndexLookup(with: activities)
combineActivitiesWithTransactions()
@ -129,16 +126,15 @@ public class ActivitiesService: ActivitiesServiceType {
}
}
public func reinject(activity: Activity) {
guard let tokenHolders = activitiesGenerator.tokensAndTokenHolders[activity.token.addressAndRPCServer] else { return }
public func reinject(activity: Activity) async {
guard let tokenHolders = await activitiesGenerator.tokensAndTokenHolders[activity.token.addressAndRPCServer] else { return }
refreshActivity(token: activity.token, tokenHolder: tokenHolders[0], activity: activity)
}
private func combineActivitiesWithTransactions() {
Task { @MainActor in
let transactions = await transactionDataStore.transactions(forFilter: transactionsFilterStrategy, servers: Array(sessionsProvider.activeSessions.keys), oldestBlockNumber: activities.last?.blockNumber)
let items = await combine(activities: activities.all, with: transactions)
let items = await combine(activities: activities, with: transactions)
let activities = ActivityCollection.sorted(activities: items)
activitiesSubject.send(activities)
}
@ -204,7 +200,7 @@ public class ActivitiesService: ActivitiesServiceType {
} else if transaction.localizedOperations.count == 1 {
return [.standaloneTransaction(transaction: transaction, activity: activity)]
} else {
let isSwap = self.isSwap(activities: activities.all, operations: transaction.localizedOperations, wallet: wallet)
let isSwap = self.isSwap(activities: activities, operations: transaction.localizedOperations, wallet: wallet)
var results: [ActivityRowModel] = .init()
results.append(.parentTransaction(transaction: transaction, isSwap: isSwap, activities: .init()))
results.append(contentsOf: await transaction.localizedOperations.asyncMap {
@ -227,26 +223,30 @@ public class ActivitiesService: ActivitiesServiceType {
return hasSend && hasReceive
}
//Important to pass in the `TokenHolder` instance and not re-create so that we don't override the subscribable values for the token with ones that are not resolved yet
private func refreshActivity(token: Token, tokenHolder: TokenHolder, activity: Activity) {
let attributeValues = AssetAttributeValues(attributeValues: tokenHolder.values)
cancellableSet[token.addressAndRPCServer] = attributeValues.resolveAllAttributes()
.sink(receiveValue: { [weak self] resolvedAttributeNameValues in
guard let stronSelf = self else { return }
//NOTE: Fix crush when element with index out of range
if let (index, oldActivity) = stronSelf.activitiesIndexLookup[activity.id] {
private func updatedActivityWithResolvedValues(token: Token, tokenHolder: TokenHolder, activity: Activity, resolvedAttributeNameValues: [AttributeId: AssetInternalValue]) {
//NOTE: Fix crash when element with index out of range
if let (index, oldActivity) = activitiesIndexLookup[activity.id] {
let updatedValues = (token: oldActivity.values.token.merging(resolvedAttributeNameValues) { _, new in new }, card: oldActivity.values.card)
let updatedActivity: Activity = .init(id: oldActivity.id, rowType: oldActivity.rowType, token: token, server: oldActivity.server, name: oldActivity.name, eventName: oldActivity.eventName, blockNumber: oldActivity.blockNumber, transactionId: oldActivity.transactionId, transactionIndex: oldActivity.transactionIndex, logIndex: oldActivity.logIndex, date: oldActivity.date, values: updatedValues, view: oldActivity.view, itemView: oldActivity.itemView, isBaseCard: oldActivity.isBaseCard, state: oldActivity.state)
if stronSelf.activities.contains(index: index) {
stronSelf.activities[index] = updatedActivity
stronSelf.didUpdateActivitySubject.send(updatedActivity)
if activities.indices.contains(index) {
activities[index] = updatedActivity
didUpdateActivitySubject.send(updatedActivity)
}
} else {
//no-op. We should be able to find it unless the list of activities has changed
}
}
//Important to pass in the `TokenHolder` instance and not re-create so that we don't override the subscribable values for the token with ones that are not resolved yet
private func refreshActivity(token: Token, tokenHolder: TokenHolder, activity: Activity) {
let attributeValues = AssetAttributeValues(attributeValues: tokenHolder.values)
cancellableSet[token.addressAndRPCServer] = attributeValues.resolveAllAttributes()
.sink(receiveValue: { [weak self] resolvedAttributeNameValues in
guard let strongSelf = self else { return }
Task { [weak self] in
await self?.updatedActivityWithResolvedValues(token: token, tokenHolder: tokenHolder, activity: activity, resolvedAttributeNameValues: resolvedAttributeNameValues)
}
})
}
@ -256,7 +256,7 @@ public class ActivitiesService: ActivitiesServiceType {
for (index, each) in activities.enumerated() {
newValue[each.id] = (index, each)
}
activitiesIndexLookup.set(value: newValue)
activitiesIndexLookup = newValue
}
}

@ -4,14 +4,14 @@ import Foundation
import Combine
import AlphaWalletCore
public final class CoinTickers {
private let fetchers: AtomicArray<CoinTickersFetcher> = .init()
public final actor CoinTickers {
private var fetchers: [CoinTickersFetcher] = []
private let storage: CoinTickersStorage & ChartHistoryStorage & TickerIdsStorage
private var chartHistories: [TokenMappedToTicker: Task<[ChartHistoryPeriod: ChartHistory], Never>] = .init()
private var cancelable = Set<AnyCancellable>()
public init(fetchers: [CoinTickersFetcher], storage: CoinTickersStorage & ChartHistoryStorage & TickerIdsStorage) {
self.fetchers.set(array: fetchers)
self.fetchers = fetchers
self.storage = storage
}
@ -31,22 +31,22 @@ public final class CoinTickers {
}
extension CoinTickers: CoinTickersFetcher {
public func fetchTickers(for tokens: [TokenMappedToTicker], force: Bool, currency: Currency) {
public func fetchTickers(for tokens: [TokenMappedToTicker], force: Bool, currency: Currency) async {
for each in functional.createFetcherToTokenMappedToTickerPairs(for: tokens, fetchers: fetchers) {
guard !each.tokenMappedToTickers.isEmpty else { continue }
each.fetcher.fetchTickers(for: each.tokenMappedToTickers, force: force, currency: currency)
await each.fetcher.fetchTickers(for: each.tokenMappedToTickers, force: force, currency: currency)
}
}
public func resolveTickerIds(for tokens: [TokenMappedToTicker]) {
public func resolveTickerIds(for tokens: [TokenMappedToTicker]) async {
for each in functional.createFetcherToTokenMappedToTickerPairs(for: tokens, fetchers: fetchers) {
guard !each.tokenMappedToTickers.isEmpty else { continue }
each.fetcher.resolveTickerIds(for: each.tokenMappedToTickers)
await each.fetcher.resolveTickerIds(for: each.tokenMappedToTickers)
}
}
public func fetchChartHistories(for token: TokenMappedToTicker, force: Bool, periods: [ChartHistoryPeriod], currency: Currency) async -> [ChartHistoryPeriod: ChartHistory] {
if let fetcher = functional.getFetcher(forTokenMappedToTicker: token, fetchers: fetchers) {
public nonisolated func fetchChartHistories(for token: TokenMappedToTicker, force: Bool, periods: [ChartHistoryPeriod], currency: Currency) async -> [ChartHistoryPeriod: ChartHistory] {
if let fetcher = functional.getFetcher(forTokenMappedToTicker: token, fetchers: await fetchers) {
return await fetcher.fetchChartHistories(for: token, force: force, periods: periods, currency: currency)
} else {
return [:]
@ -54,17 +54,19 @@ extension CoinTickers: CoinTickersFetcher {
}
//TODO this isn't called?
public func cancel() {
fetchers.forEach { $0.cancel() }
public func cancel() async {
for each in fetchers {
await each.cancel()
}
}
}
extension CoinTickers: CoinTickersProvider {
public var tickersDidUpdate: AnyPublisher<Void, Never> {
public nonisolated var tickersDidUpdate: AnyPublisher<Void, Never> {
return storage.tickersDidUpdate
}
public var updateTickerIds: AnyPublisher<[(tickerId: TickerIdString, key: AddressAndRPCServer)], Never> {
public nonisolated var updateTickerIds: AnyPublisher<[(tickerId: TickerIdString, key: AddressAndRPCServer)], Never> {
storage.updateTickerIds
}
@ -72,7 +74,7 @@ extension CoinTickers: CoinTickersProvider {
return await storage.ticker(for: key, currency: currency)
}
public func addOrUpdateTestsOnly(ticker: CoinTicker?, for token: TokenMappedToTicker) -> Task<Void, Never> {
public nonisolated func addOrUpdateTestsOnly(ticker: CoinTicker?, for token: TokenMappedToTicker) -> Task<Void, Never> {
let tickers: [AssignedCoinTickerId: CoinTicker] = ticker.flatMap { ticker in
let tickerId = AssignedCoinTickerId(tickerId: "tickerId-\(token.contractAddress)-\(token.server.chainID)", token: token)
return [tickerId: ticker]
@ -101,11 +103,11 @@ fileprivate extension CoinTickers.functional {
let tokenMappedToTickers: [TokenMappedToTicker]
}
static func getFetcher(forTokenMappedToTicker tokenMappedToTicker: TokenMappedToTicker, fetchers: AtomicArray<CoinTickersFetcher>) -> CoinTickersFetcher? {
static func getFetcher(forTokenMappedToTicker tokenMappedToTicker: TokenMappedToTicker, fetchers: [CoinTickersFetcher]) -> CoinTickersFetcher? {
createFetcherToTokenMappedToTickerPairs(for: [tokenMappedToTicker], fetchers: fetchers).first?.fetcher
}
static func createFetcherToTokenMappedToTickerPairs(for tokenMappedToTickers: [TokenMappedToTicker], fetchers: AtomicArray<CoinTickersFetcher>) -> [FetcherTokenMappedToTickerPair] {
static func createFetcherToTokenMappedToTickerPairs(for tokenMappedToTickers: [TokenMappedToTicker], fetchers: [CoinTickersFetcher]) -> [FetcherTokenMappedToTickerPair] {
var mappedToProvidersTypeTokens: [String: [TokenMappedToTicker]] = [:]
for each in tokenMappedToTickers {
//TODO fragile

@ -4,8 +4,8 @@ import Foundation
import Combine
public protocol CoinTickersFetcher {
func fetchTickers(for tokens: [TokenMappedToTicker], force: Bool, currency: Currency)
func resolveTickerIds(for tokens: [TokenMappedToTicker])
func fetchTickers(for tokens: [TokenMappedToTicker], force: Bool, currency: Currency) async
func resolveTickerIds(for tokens: [TokenMappedToTicker]) async
func fetchChartHistories(for token: TokenMappedToTicker, force: Bool, periods: [ChartHistoryPeriod], currency: Currency) async -> [ChartHistoryPeriod: ChartHistory]
func cancel()
func cancel() async
}

@ -182,7 +182,7 @@ struct FallbackJSONResponseParsingInterceptor: ApolloInterceptor {
}
final class EnjinUserManagementInterceptor: ApolloInterceptor {
final actor EnjinUserManagementInterceptor: ApolloInterceptor {
enum UserError: Error {
case noUserLoggedIn
@ -190,18 +190,20 @@ final class EnjinUserManagementInterceptor: ApolloInterceptor {
}
private let userManager: EnjinUserManager
private var pending: AtomicArray<() -> Void> = .init()
private var pending: [() -> Void] = []
private var inFlightPromise: Promise<EnjinAccessToken>?
init(userManager: EnjinUserManager) {
self.userManager = userManager
}
func interceptAsync<Operation: GraphQLOperation>(chain: RequestChain,
request: HTTPRequest<Operation>,
response: HTTPResponse<Operation>?,
completion: @escaping (Swift.Result<GraphQLResult<Operation.Data>, Error>) -> Void) {
nonisolated func interceptAsync<Operation: GraphQLOperation>(chain: RequestChain, request: HTTPRequest<Operation>, response: HTTPResponse<Operation>?, completion: @escaping (Swift.Result<GraphQLResult<Operation.Data>, Error>) -> Void) {
Task {
await _interceptAsync(chain: chain, request: request, response: response, completion: completion)
}
}
private func _interceptAsync<Operation: GraphQLOperation>(chain: RequestChain, request: HTTPRequest<Operation>, response: HTTPResponse<Operation>?, completion: @escaping (Swift.Result<GraphQLResult<Operation.Data>, Error>) -> Void) {
func addTokenAndProceed<Operation: GraphQLOperation>(_ token: EnjinAccessToken, to request: HTTPRequest<Operation>, chain: RequestChain, response: HTTPResponse<Operation>?, completion: @escaping (Swift.Result<GraphQLResult<Operation.Data>, Error>) -> Void) {
request.addHeader(name: "Authorization", value: "Bearer \(token)")

@ -204,6 +204,7 @@ public final class WalletDataProcessingPipeline: TokensProcessingPipeline {
Publishers.CombineLatest(tokens, currencyService.$currency)
.sink { [coinTickersFetcher, tokensService] tokens, currency in
Task {
let nativeCryptoForAllChains = RPCServer.allCases.map { MultipleChainsTokensDataStore.functional.etherToken(forServer: $0) }
//NOTE: remove type type filtering when add support for nonfungibles
let tokens = (tokens + nativeCryptoForAllChains).filter { !$0.server.isTestnet && ($0.type == .nativeCryptocurrency || $0.type == .erc20) }
@ -216,8 +217,9 @@ public final class WalletDataProcessingPipeline: TokensProcessingPipeline {
coinGeckoId: $0.info.coinGeckoId)
}
coinTickersFetcher.fetchTickers(for: uniqueTokens, force: false, currency: currency)
await coinTickersFetcher.fetchTickers(for: uniqueTokens, force: false, currency: currency)
tokensService.refreshBalance(updatePolicy: .tokens(tokens: tokens))
}
}.store(in: &cancelable)
coinTickersProvider.updateTickerIds

@ -107,46 +107,36 @@ class EtherscanSingleChainTransactionProvider: SingleChainTransactionProvider {
deinit {
schedulerProviders.forEach { $0.cancel() }
oldestTransferTransactionsScheduler.cancel()
pendingTransactionProvider.cancelScheduler()
Task {
await pendingTransactionProvider.cancelScheduler()
}
}
func resume() {
func resume() async {
guard state == .stopped else { return }
pendingTransactionProvider.resumeScheduler()
await pendingTransactionProvider.resumeScheduler()
schedulerProviders.forEach { $0.restart() }
oldestTransferTransactionsScheduler.restart()
state = .running
}
func pause() {
func pause() async {
guard state == .running || state == .pending else { return }
pendingTransactionProvider.cancelScheduler()
await pendingTransactionProvider.cancelScheduler()
schedulerProviders.forEach { $0.cancel() }
oldestTransferTransactionsScheduler.cancel()
state = .stopped
}
func start() {
func start() async {
guard state == .pending else { return }
pendingTransactionProvider.start()
await pendingTransactionProvider.start()
schedulerProviders.forEach { $0.start() }
oldestTransferTransactionsScheduler.start()
queue.async { [weak self] in self?.removeUnknownTransactions() }
state = .running
}
public func stop() {
pendingTransactionProvider.cancelScheduler()
schedulerProviders.forEach { $0.cancel() }
oldestTransferTransactionsScheduler.cancel()
}
public func isServer(_ server: RPCServer) -> Bool {
return session.server == server
}

@ -10,8 +10,7 @@ import AlphaWalletCore
import BigInt
import Combine
public final class PendingTransactionProvider {
public final actor PendingTransactionProvider {
public enum PendingTransactionProviderError: Error {
case `internal`(Error)
case failureToRetrieveTransaction(hash: String, error: Error)
@ -21,7 +20,6 @@ public final class PendingTransactionProvider {
private let transactionDataStore: TransactionDataStore
private let ercTokenDetector: ErcTokenDetector
private var cancelable = Set<AnyCancellable>()
private let queue = DispatchQueue(label: "com.PendingTransactionProvider.updateQueue")
private let fetchPendingTransactionsQueue: OperationQueue = {
let queue = OperationQueue()
queue.name = "Auto-update Pending Transactions"
@ -30,9 +28,9 @@ public final class PendingTransactionProvider {
return queue
}()
private let completeTransactionSubject = PassthroughSubject<Result<Transaction, PendingTransactionProviderError>, Never>()
private lazy var store: AtomicDictionary<String, SchedulerProtocol> = .init()
private lazy var store: [String: SchedulerProtocol] = [:]
public var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProviderError>, Never> {
public nonisolated var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProviderError>, Never> {
completeTransactionSubject.eraseToAnyPublisher()
}
@ -48,30 +46,28 @@ public final class PendingTransactionProvider {
public func start() {
transactionDataStore
.initialOrNewTransactionsPublisher(forServer: session.server, transactionState: .pending)
.receive(on: queue)
.sink { [weak self] transactions in self?.runPendingTransactionWatchers(transactions: transactions) }
.store(in: &cancelable)
.sink { transactions in
Task { [weak self] in
await self?.runPendingTransactionWatchers(transactions: transactions)
}
}.store(in: &cancelable)
}
public func cancelScheduler() {
queue.async {
for each in self.store.values {
each.value.cancel()
}
for each in store.values {
each.cancel()
}
}
public func resumeScheduler() {
queue.async {
for each in self.store.values {
each.value.restart()
}
for each in store.values {
each.restart()
}
}
deinit {
for each in self.store.values {
each.value.cancel()
each.cancel()
}
}
@ -86,8 +82,11 @@ public final class PendingTransactionProvider {
fetchPendingTransactionsQueue: fetchPendingTransactionsQueue)
provider.responsePublisher
.receive(on: queue)
.sink { [weak self] in self?.handle(response: $0, transaction: transaction) }
.sink { response in
Task { [weak self] in
await self?.handle(response: response, transaction: transaction)
}
}
.store(in: &cancelable)
let scheduler = Scheduler(provider: provider)
@ -116,7 +115,7 @@ public final class PendingTransactionProvider {
completeTransactionSubject.send(.success(transaction))
}
cancelScheduler(transaction: transaction)
await cancelScheduler(transaction: transaction)
}
}
@ -139,7 +138,7 @@ public final class PendingTransactionProvider {
Task { @MainActor in
guard await transactionDataStore.hasCompletedTransaction(withNonce: transaction.nonce, forServer: session.server) else { return }
transactionDataStore.delete(transactions: [transaction])
cancelScheduler(transaction: transaction)
await cancelScheduler(transaction: transaction)
}
//The transaction might not be posted to this node yet (ie. it doesn't even think that this transaction is pending). Especially common if we post a transaction to Ethermine and fetch pending status through Etherscan
case .responseNotFound, .errorObjectParseError, .unsupportedVersion, .unexpectedTypeObject, .missingBothResultAndError, .nonArrayResponse, .none:

@ -29,9 +29,9 @@ public protocol SingleChainTransactionProvider: AnyObject {
var state: TransactionProviderState { get }
var completeTransaction: AnyPublisher<Result<Transaction, PendingTransactionProvider.PendingTransactionProviderError>, Never> { get }
func start()
func resume()
func pause()
func start() async
func resume() async
func pause() async
func isServer(_ server: RPCServer) -> Bool
func fetchLatestTransactions(fetchTypes: [TransactionFetchType]) -> AnyPublisher<[Transaction], PromiseError>
}

@ -85,7 +85,9 @@ public class TransactionProvider: SingleChainTransactionProvider {
deinit {
schedulerProviders.forEach { $0.cancel() }
pendingTransactionProvider.cancelScheduler()
Task {
await pendingTransactionProvider.cancelScheduler()
}
}
private func handle(response: Result<[Transaction], PromiseError>, provider: SchedulerProvider) {
@ -132,30 +134,24 @@ public class TransactionProvider: SingleChainTransactionProvider {
}
//Don't worry about start method and pending state once object created we first call method `start`
public func start() {
public func start() async {
guard state == .pending else { return }
pendingTransactionProvider.start()
await pendingTransactionProvider.start()
schedulerProviders.forEach { $0.start() }
queue.async { [weak self] in self?.removeUnknownTransactions() }
state = .running
}
public func resume() {
public func resume() async {
guard state == .stopped else { return }
pendingTransactionProvider.resumeScheduler()
await pendingTransactionProvider.resumeScheduler()
schedulerProviders.forEach { $0.restart() }
state = .running
}
public func pause() {
public func pause() async {
guard state == .running || state == .pending else { return }
pendingTransactionProvider.cancelScheduler()
await pendingTransactionProvider.cancelScheduler()
schedulerProviders.forEach { $0.cancel() }
state = .stopped
}

@ -51,17 +51,20 @@ public class TransactionsService {
NotificationCenter.default.applicationState
.receive(on: RunLoop.main)
.sink { [weak self] state in
.sink { state in
Task { [weak self] in
switch state {
case .didEnterBackground:
self?.pause()
await self?.pause()
case .willEnterForeground:
self?.resume()
await self?.resume()
}
}
}.store(in: &cancelable)
sessionsProvider.sessions
.map { [weak self] sessions -> [RPCServer: SingleChainTransactionProvider] in
.flatMap { [weak self] sessions -> Future<[RPCServer: SingleChainTransactionProvider], Never> in
asFuture {
guard let strongSelf = self else { return [:] }
var providers: [RPCServer: SingleChainTransactionProvider] = [:]
@ -69,21 +72,27 @@ public class TransactionsService {
if let provider = strongSelf.providers[session.key] {
providers[session.key] = provider
} else {
providers[session.key] = strongSelf.buildTransactionProvider(for: session.value)
providers[session.key] = await strongSelf.buildTransactionProvider(for: session.value)
}
}
return providers
}.handleEvents(receiveOutput: { [weak self] in self?.pauseDeleted(except: $0) })
.assign(to: \.providers, on: self)
}
}.handleEvents(receiveOutput: { providers in
Task { [weak self] in
await self?.pauseDeleted(except: providers)
}
}).assign(to: \.providers, on: self)
.store(in: &cancelable)
}
private func pauseDeleted(except providers: [RPCServer: SingleChainTransactionProvider]) {
private func pauseDeleted(except providers: [RPCServer: SingleChainTransactionProvider]) async {
let providersToStop = self.providers.keys.filter { !providers.keys.contains($0) }.compactMap { self.providers[$0] }
providersToStop.forEach { $0.pause() }
for each in providersToStop {
await each.pause()
}
}
private func buildTransactionProvider(for session: WalletSession) -> SingleChainTransactionProvider {
private func buildTransactionProvider(for session: WalletSession) async -> SingleChainTransactionProvider {
let ercTokenDetector = ErcTokenDetector(
tokensService: tokensService,
server: session.server,
@ -100,7 +109,7 @@ public class TransactionsService {
ercTokenDetector: ercTokenDetector,
blockchainExplorer: session.blockchainExplorer)
provider.start()
await provider.start()
return provider
case .covalent, .oklink, .unknown:
@ -111,23 +120,23 @@ public class TransactionsService {
ercTokenDetector: ercTokenDetector,
networking: session.blockchainExplorer)
provider.start()
await provider.start()
return provider
}
}
@objc private func pause() {
private func pause() async {
for each in providers {
each.value.pause()
await each.value.pause()
}
}
@objc private func resume() {
private func resume() async {
guard !config.development.isAutoFetchingDisabled else { return }
for each in providers {
each.value.resume()
await each.value.resume()
}
}
@ -138,17 +147,17 @@ public class TransactionsService {
return provider.fetchLatestTransactions(fetchTypes: TransactionFetchType.allCases)
}
public func forceResumeOrStart(server: RPCServer) {
public func forceResumeOrStart(server: RPCServer) async {
guard let provider = providers[server] else { return }
switch provider.state {
case .pending:
provider.start()
await provider.start()
case .running:
provider.pause()
provider.resume()
await provider.pause()
await provider.resume()
case .stopped:
provider.resume()
await provider.resume()
}
}

@ -16,7 +16,7 @@ import PromiseKit
public protocol TokenScriptWebViewDelegate: AnyObject {
func shouldClose(tokenScriptWebView: TokenScriptWebView)
func reinject(tokenScriptWebView: TokenScriptWebView)
func reinject(tokenScriptWebView: TokenScriptWebView) async
func requestSignMessage(message: SignMessageType, server: RPCServer, account: AlphaWallet.Address, inTokenScriptWebView tokenScriptWebView: TokenScriptWebView) -> AnyPublisher<Data, PromiseError>
}
@ -356,13 +356,15 @@ extension TokenScriptWebView: WKScriptMessageHandler {
case .dappAction(let command):
handleCommandForDappAction(command)
case .setActionProps(.action(let id, let changedProperties)):
handleSetActionProperties(id: id, changedProperties: changedProperties)
Task {
await handleSetActionProperties(id: id, changedProperties: changedProperties)
}
case .none:
break
}
}
private func handleSetActionProperties(id: Int, changedProperties: SetProperties.Properties) {
private func handleSetActionProperties(id: Int, changedProperties: SetProperties.Properties) async {
guard !changedProperties.isEmpty else { return }
let oldProperties = actionProperties
@ -377,7 +379,7 @@ extension TokenScriptWebView: WKScriptMessageHandler {
guard let oldJsonString = oldProperties.jsonString, let newJsonString = actionProperties.jsonString, oldJsonString != newJsonString else { return }
if lastCardLevelAttributeValues != nil {
delegate?.reinject(tokenScriptWebView: self)
await delegate?.reinject(tokenScriptWebView: self)
}
}

Loading…
Cancel
Save