Improving concurrency handling by switching some classes to `actor` and remove usage of `AtomicArray` and `AtomicDictionary` in those types

pull/7065/head
Hwee-Boon Yar 1 year ago
parent c4871380df
commit c399ef3aa0
  1. 4
      AlphaWallet/ActiveWalletCoordinator.swift
  2. 5
      AlphaWallet/Common/Services/Application.swift
  3. 1
      AlphaWallet/Common/Views/TokenImageView.swift
  4. 9
      AlphaWalletTests/Core/Helpers/LogLargeNftJsonFilesTests.swift
  5. 9
      modules/AlphaWalletCore/AlphaWalletCore/Extensions/Sequence+Extensions.swift
  6. 1
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Extensions/UIImage+tokenSymbolBackgroundImage.swift
  7. 34
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Helpers/CrashlyticsService.swift
  8. 6
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Initializers/ReportUsersActiveChains.swift
  9. 6
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Initializers/ReportUsersWalletAddresses.swift
  10. 4
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Tokens/ClientSideTokenSourceProvider.swift
  11. 77
      modules/AlphaWalletFoundation/AlphaWalletFoundation/Tokens/TokenImageFetcher.swift
  12. 33
      modules/AlphaWalletTokenScript/AlphaWalletTokenScript/Models/TokenScriptSignatureVerifier.swift

@ -210,7 +210,9 @@ class ActiveWalletCoordinator: NSObject, Coordinator {
blockscanChatService.delegate = self
self.keystore.recentlyUsedWallet = wallet
crashlytics.trackActiveWallet(wallet: wallet)
Task {
await crashlytics.trackActiveWallet(wallet: wallet)
}
caip10AccountProvidable.set(activeWallet: wallet)
localNotificationsService.register(source: transactionNotificationSource)

@ -85,7 +85,10 @@ class Application: WalletDependenciesProvidable {
static let shared: Application = try! Application()
convenience init(name: String = "Default") throws {
crashlytics.register(AlphaWallet.FirebaseCrashlyticsReporter.instance)
Task {
//This could a problem if this is not completed soon enough. Maybe not so bad because they are *all* actor-isolated, so serialized. Just crashes early (as in crash handling) would be a problem
await crashlytics.register(AlphaWallet.FirebaseCrashlyticsReporter.instance)
}
let analytics = AnalyticsService()
let walletAddressesStore: WalletAddressesStore = EtherKeystore.migratedWalletAddressesStore(userDefaults: .standardOrForTests)

@ -86,6 +86,7 @@ final class TokenImageView: UIView, ViewRoundingSupportable, ViewLoadingSupporta
chainOverlayImageView.isHidden = isChainOverlayHidden
imageSourceSubject.flatMapLatest { $0 }
.receive(on: DispatchQueue.main)
.sink(receiveValue: { [weak self] value in
self?.symbolLabel.text = ""

@ -11,18 +11,19 @@ import AlphaWalletFoundation
// swiftlint:disable all
class LogLargeNftJsonFilesTests: XCTestCase {
func testLogLargeNftJsonFiles() throws {
func testLogLargeNftJsonFiles() async throws {
let token = Token()
guard let bundlePath = Bundle(for: AddressStorageTests.self).path(forResource: "base64_image_example", ofType: "txt") else { XCTFail(); return }
let largeImage = try String(contentsOfFile: bundlePath)
let uri = URL(string: "https://www.google.com/")!
let asset_1 = NonFungibleBalance.NftAssetRawValue(json: largeImage, source: .uri(uri))
XCTAssertTrue(crashlytics.logLargeNftJsonFiles(for: [.update(token: token, field: .nonFungibleBalance(.assets([asset_1])))], fileSizeThreshold: 0.5))
let matches1 = await crashlytics.logLargeNftJsonFiles(for: [.update(token: token, field: .nonFungibleBalance(.assets([asset_1])))], fileSizeThreshold: 0.5)
XCTAssertTrue(matches1)
let asset_2 = NonFungibleBalance.NftAssetRawValue(json: "", source: .uri(uri))
XCTAssertFalse(crashlytics.logLargeNftJsonFiles(for: [.update(token: token, field: .nonFungibleBalance(.assets([asset_2])))], fileSizeThreshold: 0.5))
let matches2 = await crashlytics.logLargeNftJsonFiles(for: [.update(token: token, field: .nonFungibleBalance(.assets([asset_2])))], fileSizeThreshold: 0.5)
XCTAssertFalse(matches2)
}
}
// swiftlint:enable all

@ -28,4 +28,13 @@ public extension Sequence {
}
return values
}
func asyncContains(where predicate: (Self.Element) async throws -> Bool) async rethrows -> Bool {
for element in self {
if try await predicate(element) {
return true
}
}
return false
}
}

@ -8,6 +8,7 @@
import UIKit
import AlphaWalletCore
//TODO move contents of this extension out to an actor for protecting the dictionary
extension UIImage {
static var tokenSymbolBackgroundImageCache: AtomicDictionary<UIColor, UIImage> = .init()
static func tokenSymbolBackgroundImage(backgroundColor: UIColor, contractAddress: AlphaWallet.Address) -> UIImage {

@ -9,16 +9,16 @@ import Foundation
import AlphaWalletCore
public protocol CrashlyticsReporter: AnyObject {
func track(wallets: [Wallet])
func trackActiveWallet(wallet: Wallet)
func track(enabledServers: [RPCServer])
@discardableResult func logLargeNftJsonFiles(for actions: [AddOrUpdateTokenAction], fileSizeThreshold: Double) -> Bool
func track(wallets: [Wallet]) async
func trackActiveWallet(wallet: Wallet) async
func track(enabledServers: [RPCServer]) async
@discardableResult func logLargeNftJsonFiles(for actions: [AddOrUpdateTokenAction], fileSizeThreshold: Double) async -> Bool
}
public let crashlytics = CrashlyticsService()
public final class CrashlyticsService: NSObject, CrashlyticsReporter {
private var services: AtomicArray<CrashlyticsReporter> = .init()
public final actor CrashlyticsService: NSObject, CrashlyticsReporter {
private var services: Array<CrashlyticsReporter> = .init()
public override init() { }
@ -26,20 +26,26 @@ public final class CrashlyticsService: NSObject, CrashlyticsReporter {
services.append(service)
}
public func track(wallets: [Wallet]) {
services.forEach { $0.track(wallets: wallets) }
public func track(wallets: [Wallet]) async {
for each in services {
await each.track(wallets: wallets)
}
}
public func trackActiveWallet(wallet: Wallet) {
services.forEach { $0.trackActiveWallet(wallet: wallet) }
public func trackActiveWallet(wallet: Wallet) async {
for each in services {
await each.trackActiveWallet(wallet: wallet)
}
}
public func track(enabledServers: [RPCServer]) {
services.forEach { $0.track(enabledServers: enabledServers) }
public func track(enabledServers: [RPCServer]) async {
for each in services {
await each.track(enabledServers: enabledServers)
}
}
public func logLargeNftJsonFiles(for actions: [AddOrUpdateTokenAction], fileSizeThreshold: Double) -> Bool {
return services.contains(where: { $0.logLargeNftJsonFiles(for: actions, fileSizeThreshold: fileSizeThreshold) })
public func logLargeNftJsonFiles(for actions: [AddOrUpdateTokenAction], fileSizeThreshold: Double) async -> Bool {
return await services.asyncContains(where: { await $0.logLargeNftJsonFiles(for: actions, fileSizeThreshold: fileSizeThreshold) })
}
}

@ -16,7 +16,11 @@ public final class ReportUsersActiveChains: Service {
serversProvider.enabledServersPublisher
.delay(for: .seconds(2), scheduler: RunLoop.main)
.removeDuplicates()
.sink { crashlytics.track(enabledServers: Array($0)) }
.sink { servers in
Task {
await crashlytics.track(enabledServers: Array(servers))
}
}
.store(in: &cancelable)
}
}

@ -20,7 +20,11 @@ public final class ReportUsersWalletAddresses: Service {
//NOTE: make 2 sec delay to avoid load on launch
keystore.walletsPublisher
.delay(for: .seconds(2), scheduler: RunLoop.main)
.sink { crashlytics.track(wallets: Array($0)) }
.sink { wallets in
Task {
await crashlytics.track(wallets: Array(wallets))
}
}
.store(in: &cancelable)
}
}

@ -121,7 +121,9 @@ public class ClientSideTokenSourceProvider: TokenSourceProvider {
extension ClientSideTokenSourceProvider: TokenBalanceFetcherDelegate {
public func didUpdateBalance(value actions: [AddOrUpdateTokenAction], in fetcher: TokenBalanceFetcher) {
crashlytics.logLargeNftJsonFiles(for: actions, fileSizeThreshold: 10)
Task {
await crashlytics.logLargeNftJsonFiles(for: actions, fileSizeThreshold: 10)
}
Task {
await tokensDataStore.addOrUpdate(with: actions)
}

@ -68,12 +68,13 @@ public protocol TokenImageFetcher {
serverIconImage: UIImage?) -> TokenImagePublisher
}
public class TokenImageFetcherImpl: TokenImageFetcher {
public actor TokenImageFetcherImpl: TokenImageFetcher {
private let networking: ImageFetcher
private let tokenGroupsIdentifier: TokenGroupIdentifierProtocol
private let spamImage: UIImage
private let subscribables: AtomicDictionary<String, CurrentValueSubject<TokenImage?, Never>> = .init()
private let inFlightTasks: AtomicDictionary<String, Task<Void, Never>> = .init()
private var subscribables: [String: CurrentValueSubject<TokenImage?, Never>] = .init()
private var inFlightTasks: [String: Task<Void, Never>] = .init()
private var cancellables: Set<AnyCancellable> = .init()
enum ImageAvailabilityError: LocalizedError {
case notAvailable
@ -88,16 +89,7 @@ public class TokenImageFetcherImpl: TokenImageFetcher {
self.spamImage = spamImage
}
private nonisolated func getDefaultOrGenerateIcon(server: RPCServer,
contractAddress: AlphaWallet.Address,
type: TokenType,
name: String,
tokenImage: UIImage?,
colors: [UIColor],
staticOverlayIcon: UIImage?,
blockChainNameColor: UIColor,
serverIconImage: UIImage?) -> TokenImage? {
private nonisolated func getDefaultOrGenerateIcon(server: RPCServer, contractAddress: AlphaWallet.Address, type: TokenType, name: String, tokenImage: UIImage?, colors: [UIColor], staticOverlayIcon: UIImage?, blockChainNameColor: UIColor, serverIconImage: UIImage?) -> TokenImage? {
switch type {
case .nativeCryptocurrency:
if let img = iconImageForContractAndChainID(image: serverIconImage, address: contractAddress.eip55String, chainID: server.chainID) {
@ -119,25 +111,32 @@ public class TokenImageFetcherImpl: TokenImageFetcher {
blockChainNameColor: blockChainNameColor)
}
private func iconImageForContractAndChainID(image iconImage: UIImage?, address: String, chainID: Int) -> UIImage? {
private nonisolated func iconImageForContractAndChainID(image iconImage: UIImage?, address: String, chainID: Int) -> UIImage? {
if tokenGroupsIdentifier.isSpam(address: address, chainID: chainID) {
return spamImage
}
return iconImage
}
public func image(contractAddress: AlphaWallet.Address,
server: RPCServer,
name: String,
type: TokenType,
balance: NonFungibleFromJson?,
size: GoogleContentSize,
contractDefinedImage: UIImage?,
colors: [UIColor],
staticOverlayIcon: UIImage?,
blockChainNameColor: UIColor,
serverIconImage: UIImage?) -> TokenImagePublisher {
public nonisolated func image(contractAddress: AlphaWallet.Address, server: RPCServer, name: String, type: TokenType, balance: NonFungibleFromJson?, size: GoogleContentSize, contractDefinedImage: UIImage?, colors: [UIColor], staticOverlayIcon: UIImage?, blockChainNameColor: UIColor, serverIconImage: UIImage?) -> TokenImagePublisher {
//Cannot use PassthroughSubject because of how we use flatMapLatest downstream
let subject: CurrentValueSubject<TokenImage?, Never> = .init(nil)
Task {
let sourcePublisher = await self._image(contractAddress: contractAddress, server: server, name: name, type: type, balance: balance, size: size, contractDefinedImage: contractDefinedImage, colors: colors, staticOverlayIcon: staticOverlayIcon, blockChainNameColor: blockChainNameColor, serverIconImage: serverIconImage)
let cancellable = sourcePublisher.sink { image in
subject.send(image)
}
await storeCancellable(cancellable)
}
return subject.eraseToAnyPublisher()
}
//Need this so we can access `cancellables` in the actor's isolated context from within a `Task`
private func storeCancellable(_ cancellable: AnyCancellable) {
cancellables.insert(cancellable)
}
private func _image(contractAddress: AlphaWallet.Address, server: RPCServer, name: String, type: TokenType, balance: NonFungibleFromJson?, size: GoogleContentSize, contractDefinedImage: UIImage?, colors: [UIColor], staticOverlayIcon: UIImage?, blockChainNameColor: UIColor, serverIconImage: UIImage?) -> TokenImagePublisher {
let subject: CurrentValueSubject<TokenImage?, Never>
let key = "\(contractAddress.eip55String)-\(server.chainID)-\(size.rawValue)"
if let sub = subscribables[key] {
@ -176,7 +175,7 @@ public class TokenImageFetcherImpl: TokenImageFetcher {
}
if inFlightTasks[key] == nil {
inFlightTasks[key] = Task { @MainActor in
inFlightTasks[key] = Task {
if let image = try? await self.fetchFromAssetGitHubRepo(.alphaWallet, contractAddress: contractAddress) {
let tokenImage = TokenImage(image: .image(.loaded(image: image)), isFinal: true, overlayServerIcon: staticOverlayIcon)
subject.send(tokenImage)
@ -309,17 +308,35 @@ class GithubAssetsURLResolver {
public typealias ImagePublisher = AnyPublisher<ImageOrWebImageUrl<Image>?, Never>
public class RPCServerImageFetcher {
public actor RPCServerImageFetcher {
public static var instance = RPCServerImageFetcher()
private let subscribables: AtomicDictionary<Int, ImagePublisher> = .init()
private var subscribables: [Int: ImagePublisher] = .init()
private var cancellables: Set<AnyCancellable> = .init()
public nonisolated func image(server: RPCServer, iconImage: UIImage) -> ImagePublisher {
//Careful to not use PassthroughSubject due to how upstream works
let subject: CurrentValueSubject<ImageOrWebImageUrl<Image>?, Never> = .init(nil)
Task {
let sourcePublisher = await self._image(server: server, iconImage: iconImage)
let cancellable = sourcePublisher.sink { image in
subject.send(image)
}
await storeCancellable(cancellable)
}
return subject.eraseToAnyPublisher()
}
public func image(server: RPCServer, iconImage: UIImage) -> ImagePublisher {
//Need this so we can access `cancellables` in the actor's isolated context from within a `Task`
private func storeCancellable(_ cancellable: AnyCancellable) {
cancellables.insert(cancellable)
}
private func _image(server: RPCServer, iconImage: UIImage) -> ImagePublisher {
if let sub = subscribables[server.chainID] {
return sub
} else {
let sub = CurrentValueSubject<ImageOrWebImageUrl<Image>?, Never>(.image(iconImage))
subscribables[server.chainID] = sub.eraseToAnyPublisher()
return sub.eraseToAnyPublisher()
}
}

@ -15,14 +15,14 @@ public protocol TokenScriptSignatureVerifieble {
func verifyXMLSignatureViaAPI(xml: String, completion: @escaping (TokenScriptSignatureVerifier.VerifierResult) -> Void)
}
public final class TokenScriptSignatureVerifier: TokenScriptSignatureVerifieble {
public final actor TokenScriptSignatureVerifier: TokenScriptSignatureVerifieble {
private let baseTokenScriptFiles: BaseTokenScriptFiles
private let networking: TokenScriptSignatureNetworking
private let queue = DispatchQueue(label: "org.alphawallet.swift.TokenScriptSignatureVerifier")
private let reachability: ReachabilityManagerProtocol
private let cancellable: AtomicDictionary<String, AnyCancellable> = .init()
private var cancellable: [String: AnyCancellable] = .init()
//TODO: remove later when replace with publisher, needed to add waiting for completion of single api call, to avoid multiple uploading of same file
private let completions: AtomicDictionary<String, [((VerifierResult) -> Void)?]> = .init()
private var completions: [String: [((VerifierResult) -> Void)?]] = .init()
private let features: TokenScriptFeatures
public var retryBehavior: RetryBehavior<RunLoop> = .delayed(retries: UInt.max, time: 10)
@ -45,7 +45,22 @@ public final class TokenScriptSignatureVerifier: TokenScriptSignatureVerifieble
self.features = features
}
public func verificationType(forXml xmlString: String) -> Promise<TokenScriptSignatureVerificationType> {
public nonisolated func verificationType(forXml xmlString: String) -> Promise<TokenScriptSignatureVerificationType> {
return Promise<TokenScriptSignatureVerificationType> { seal in
Task {
let promise = await self._verificationType(forXml: xmlString)
firstly {
promise
}.done {
seal.fulfill($0)
}.catch {
seal.reject($0)
}
}
}
}
private func _verificationType(forXml xmlString: String) -> Promise<TokenScriptSignatureVerificationType> {
return Promise { seal in
if features.isActivityEnabled {
if baseTokenScriptFiles.containsBaseTokenScriptFile(for: xmlString) {
@ -73,8 +88,14 @@ public final class TokenScriptSignatureVerifier: TokenScriptSignatureVerifieble
}
}
public nonisolated func verifyXMLSignatureViaAPI(xml: String, completion: @escaping (VerifierResult) -> Void) {
Task {
await self._verifyXMLSignatureViaAPI(xml: xml, completion: completion)
}
}
//TODO log reasons for failures `completion(.failed)` as well as those that triggers retries in in-app Console
public func verifyXMLSignatureViaAPI(xml: String, completion: @escaping (VerifierResult) -> Void) {
private func _verifyXMLSignatureViaAPI(xml: String, completion: @escaping (VerifierResult) -> Void) {
add(callback: completion, xml: xml)
guard cancellable[xml] == nil else { return }
@ -94,9 +115,7 @@ public final class TokenScriptSignatureVerifier: TokenScriptSignatureVerifieble
.sink(receiveCompletion: { _ in
self.cancellable[xml] = nil
}, receiveValue: { result in
DispatchQueue.main.async {
self.fulfill(for: xml, result: result)
}
})
}

Loading…
Cancel
Save