Merge pull request #3146 from vladyslav-iosdev/#3145

move realm updates for events [part of 3125] #3145
pull/3154/head
Vladyslav Shepitko 3 years ago committed by GitHub
commit 2b0237ccaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      AlphaWallet/Activities/ActivitiesService.swift
  2. 10
      AlphaWallet/InCoordinator.swift
  3. 38
      AlphaWallet/TokenScriptClient/Coordinators/EventSourceCoordinator.swift
  4. 22
      AlphaWallet/TokenScriptClient/Coordinators/EventSourceCoordinatorForActivities.swift
  5. 109
      AlphaWallet/TokenScriptClient/Models/EventsActivityDataStore.swift
  6. 55
      AlphaWallet/TokenScriptClient/Models/EventsDataStore.swift

@ -141,7 +141,7 @@ class ActivitiesService: NSObject, ActivitiesServiceType {
private let activitiesFilterStrategy: ActivitiesFilterStrategy
private var filteredTransactionsSubscriptionKey: Subscribable<[TransactionInstance]>.SubscribableKey!
private var recentEventsSubscriptionKey: Subscribable<[EventActivity]>.SubscribableKey!
private var recentEventsSubscriptionKey: Subscribable<Void>.SubscribableKey!
private let transactionCollection: TransactionCollection
private lazy var recentEventsSubscribable = eventsActivityDataStore.recentEventsSubscribable
private lazy var filteredTransactionsSubscription = transactionCollection.subscribableFor(filter: transactionsFilterStrategy)

@ -258,12 +258,12 @@ class InCoordinator: NSObject, Coordinator {
}
private func createEventSourceCoordinator() -> EventSourceCoordinatorType {
return EventSourceCoordinator(wallet: wallet, config: config, tokensStorages: tokensStorages, assetDefinitionStore: assetDefinitionStore, eventsDataStore: eventsDataStore)
return EventSourceCoordinator(wallet: wallet, tokenCollection: tokenCollection, assetDefinitionStore: assetDefinitionStore, eventsDataStore: eventsDataStore)
}
private func setUpEventSourceCoordinatorForActivities() {
guard Features.isActivityEnabled else { return }
eventSourceCoordinatorForActivities = EventSourceCoordinatorForActivities(wallet: wallet, config: config, tokensStorages: tokensStorages, assetDefinitionStore: assetDefinitionStore, eventsDataStore: eventsActivityDataStore)
eventSourceCoordinatorForActivities = EventSourceCoordinatorForActivities(wallet: wallet, config: config, tokenCollection: tokenCollection, assetDefinitionStore: assetDefinitionStore, eventsDataStore: eventsActivityDataStore)
}
private func setupTokenDataStores() {
@ -364,9 +364,13 @@ class InCoordinator: NSObject, Coordinator {
promptBackupCoordinator.start()
}
private func createTokensCoordinator(promptBackupCoordinator: PromptBackupCoordinator, activitiesService: ActivitiesServiceType) -> TokensCoordinator {
private lazy var tokenCollection: TokenCollection = {
let tokensStoragesForEnabledServers = config.enabledServers.map { tokensStorages[$0] }
let tokenCollection = TokenCollection(filterTokensCoordinator: filterTokensCoordinator, tokenDataStores: tokensStoragesForEnabledServers)
return tokenCollection
}()
private func createTokensCoordinator(promptBackupCoordinator: PromptBackupCoordinator, activitiesService: ActivitiesServiceType) -> TokensCoordinator {
promptBackupCoordinator.listenToNativeCryptoCurrencyBalance(withWalletSessions: walletSessions)
pollEthereumEvents(tokenCollection: tokenCollection)

@ -5,6 +5,17 @@ import BigInt
import PromiseKit
import web3swift
extension PromiseKit.Result {
var optionalValue: T? {
switch self {
case .fulfilled(let value):
return value
case .rejected:
return nil
}
}
}
protocol EventSourceCoordinatorType: class {
func fetchEthereumEvents()
func fetchEventsByTokenId(forToken token: TokenObject) -> [Promise<Void>]
@ -13,18 +24,16 @@ protocol EventSourceCoordinatorType: class {
//TODO rename this generic name to reflect that it's for event instances, not for event activity
class EventSourceCoordinator: EventSourceCoordinatorType {
private var wallet: Wallet
private let config: Config
private let tokensStorages: ServerDictionary<TokensDataStore>
private let tokenCollection: TokenCollection
private let assetDefinitionStore: AssetDefinitionStore
private let eventsDataStore: EventsDataStoreProtocol
private var isFetching = false
private var rateLimitedUpdater: RateLimiter?
private let queue = DispatchQueue(label: "com.eventSourceCoordinator.updateQueue")
init(wallet: Wallet, config: Config, tokensStorages: ServerDictionary<TokensDataStore>, assetDefinitionStore: AssetDefinitionStore, eventsDataStore: EventsDataStoreProtocol) {
init(wallet: Wallet, tokenCollection: TokenCollection, assetDefinitionStore: AssetDefinitionStore, eventsDataStore: EventsDataStoreProtocol) {
self.wallet = wallet
self.config = config
self.tokensStorages = tokensStorages
self.tokenCollection = tokenCollection
self.assetDefinitionStore = assetDefinitionStore
self.eventsDataStore = eventsDataStore
}
@ -65,14 +74,19 @@ class EventSourceCoordinator: EventSourceCoordinatorType {
guard !isFetching else { return }
isFetching = true
let tokensStoragesForEnabledServers = config.enabledServers.compactMap { tokensStorages[safe: $0] }
let fetchPromises = tokensStoragesForEnabledServers.flatMap {
$0.enabledObject.flatMap { fetchEventsByTokenId(forToken: $0) }
}
when(resolved: fetchPromises).done { _ in
firstly {
tokenCollection.tokenObjects
//NOTE: calling .fetchEventsByTokenId shoul be performed on .main queue
}.then(on: .main, { tokens -> Promise<Void> in
return Promise { seal in
let promises = tokens.map { self.fetchEventsByTokenId(forToken: $0) }.flatMap { $0 }
when(resolved: promises).done { _ in
seal.fulfill(())
}
}
}).done(on: queue, { _ in
self.isFetching = false
}
}).cauterize()
}
}

@ -14,17 +14,17 @@ protocol EventSourceCoordinatorForActivitiesType: AnyObject {
class EventSourceCoordinatorForActivities: EventSourceCoordinatorForActivitiesType {
private var wallet: Wallet
private let config: Config
private let tokensStorages: ServerDictionary<TokensDataStore>
private let tokenCollection: TokenCollection
private let assetDefinitionStore: AssetDefinitionStore
private let eventsDataStore: EventsActivityDataStoreProtocol
private var isFetching = false
private var rateLimitedUpdater: RateLimiter?
private let queue = DispatchQueue(label: "com.EventSourceCoordinatorForActivities.updateQueue")
init(wallet: Wallet, config: Config, tokensStorages: ServerDictionary<TokensDataStore>, assetDefinitionStore: AssetDefinitionStore, eventsDataStore: EventsActivityDataStoreProtocol) {
init(wallet: Wallet, config: Config, tokenCollection: TokenCollection, assetDefinitionStore: AssetDefinitionStore, eventsDataStore: EventsActivityDataStoreProtocol) {
self.wallet = wallet
self.config = config
self.tokensStorages = tokensStorages
self.tokenCollection = tokenCollection
self.assetDefinitionStore = assetDefinitionStore
self.eventsDataStore = eventsDataStore
}
@ -79,16 +79,8 @@ class EventSourceCoordinatorForActivities: EventSourceCoordinatorForActivitiesTy
typealias EnabledTokenAddreses = [(contract: AlphaWallet.Address, tokenType: TokenType, server: RPCServer)]
private func tokensForEnabledRPCServers() -> Promise<EnabledTokenAddreses> {
return Promise { seal in
let tokensStoragesForEnabledServers = self.config.enabledServers.compactMap { self.tokensStorages[safe: $0] }
let data = tokensStoragesForEnabledServers.flatMap {
$0.enabledObject
}.compactMap {
(contract: $0.contractAddress, tokenType: $0.type, server: $0.server)
}
seal.fulfill(data)
tokenCollection.tokenObjects.map { tokenObjects -> EnabledTokenAddreses in
tokenObjects.compactMap { (contract: $0.contractAddress, tokenType: $0.type, server: $0.server) }
}
}
}
@ -151,8 +143,8 @@ extension EventSourceCoordinatorForActivities.functional {
})
}
return when(resolved: promises).map(on: queue, { _ -> [EventActivityInstance] in
promises.compactMap { $0.value }.compactMap { $0 }
return when(resolved: promises).map(on: queue, { values -> [EventActivityInstance] in
values.compactMap { $0.optionalValue }.compactMap { $0 }
})
}).then(on: queue, { events -> Promise<Void> in
if events.isEmpty {

@ -5,11 +5,12 @@ import RealmSwift
import PromiseKit
protocol EventsActivityDataStoreProtocol {
var recentEventsSubscribable: Subscribable<[EventActivity]> { get }
func removeSubscription(subscription: Subscribable<[EventActivity]>)
var recentEventsSubscribable: Subscribable<Void> { get }
func removeSubscription(subscription: Subscribable<Void>)
func getRecentEvents() -> [EventActivity]
func getMatchingEventsSortedByBlockNumber(forContract contract: AlphaWallet.Address, tokenContract: AlphaWallet.Address, server: RPCServer, eventName: String) -> Promise <EventActivityInstance?>
func getRecentEventsPromise() -> Promise<[EventActivity]>
func getMatchingEventsSortedByBlockNumber(forContract contract: AlphaWallet.Address, tokenContract: AlphaWallet.Address, server: RPCServer, eventName: String) -> Promise<EventActivityInstance?>
func add(events: [EventActivityInstance], forTokenContract contract: AlphaWallet.Address) -> Promise<Void>
}
@ -25,59 +26,67 @@ class EventsActivityDataStore: EventsActivityDataStoreProtocol {
self.queue = queue
}
private var cachedRecentEventsSubscribable: [Subscribable<[EventActivity]>: NotificationToken] = [:]
var recentEventsSubscribable: Subscribable<[EventActivity]> {
func getRecentEvents() -> Results<EventActivity> {
realm.threadSafe
.objects(EventActivity.self)
.sorted(byKeyPath: "date", ascending: false)
}
let notifier = Subscribable<[EventActivity]>(nil)
let subscription = getRecentEvents().observe(on: queue) { change in
switch change {
case .initial, .error:
break
case .update(let events, _, _, _):
notifier.value = events.map { $0 }
private var cachedRecentEventsSubscribable: [Subscribable<Void>: NotificationToken] = [:]
//NOTE: we are need only fact that we got events,
//its easier way to determiene that events got updated
var recentEventsSubscribable: Subscribable<Void> {
let notifier = Subscribable<Void>(nil)
let recentEvents = realm.objects(EventActivity.self)
.sorted(byKeyPath: "date", ascending: false)
let subscription = recentEvents.observe(on: queue) { _ in
self.queue.async {
notifier.value = ()
}
}
queue.async {
notifier.value = getRecentEvents().map { $0 }
}
cachedRecentEventsSubscribable[notifier] = subscription
return notifier
}
func removeSubscription(subscription: Subscribable<[EventActivity]>) {
func removeSubscription(subscription: Subscribable<Void>) {
cachedRecentEventsSubscribable[subscription] = nil
}
func getMatchingEventsSortedByBlockNumber(forContract contract: AlphaWallet.Address, tokenContract: AlphaWallet.Address, server: RPCServer, eventName: String) -> Promise <EventActivityInstance?> {
return Promise { seal in
let objects = realm.threadSafe.objects(EventActivity.self)
.filter("contract = '\(contract.eip55String)'")
.filter("tokenContract = '\(tokenContract.eip55String)'")
.filter("chainId = \(server.chainID)")
.filter("eventName = '\(eventName)'")
.sorted(byKeyPath: "blockNumber")
.last
.map { EventActivityInstance(event: $0) }
seal.fulfill(objects)
DispatchQueue.main.async { [weak self] in
guard let strongSelf = self else { return seal.reject(PMKError.cancelled) }
let objects = strongSelf.realm.objects(EventActivity.self)
.filter("contract = '\(contract.eip55String)'")
.filter("tokenContract = '\(tokenContract.eip55String)'")
.filter("chainId = \(server.chainID)")
.filter("eventName = '\(eventName)'")
.sorted(byKeyPath: "blockNumber")
.last
.map { EventActivityInstance(event: $0) }
seal.fulfill(objects)
}
}
}
func getRecentEvents() -> [EventActivity] {
return Array(realm.threadSafe.objects(EventActivity.self)
.sorted(byKeyPath: "date", ascending: false)
.prefix(Self.numberOfActivitiesToUse))
.sorted(byKeyPath: "date", ascending: false)
.prefix(Self.numberOfActivitiesToUse))
}
func getRecentEventsPromise() -> Promise<[EventActivity]> {
return Promise { seal in
DispatchQueue.main.async { [weak self] in
guard let strongSelf = self else { return seal.reject(PMKError.cancelled) }
let values = Array(strongSelf.realm.objects(EventActivity.self)
.sorted(byKeyPath: "date", ascending: false)
.prefix(Self.numberOfActivitiesToUse))
seal.fulfill(values)
}
}
}
private func delete<S: Sequence>(events: S) where S.Element: EventActivity {
@ -87,21 +96,25 @@ class EventsActivityDataStore: EventsActivityDataStoreProtocol {
}
func add(events: [EventActivityInstance], forTokenContract contract: AlphaWallet.Address) -> Promise<Void> {
if events.isEmpty {
return .value(())
} else {
return Promise { seal in
let eventsToSave = events.map { EventActivity(value: $0) }
do {
let realm = self.realm.threadSafe
try realm.write {
return Promise { seal in
DispatchQueue.main.async { [weak self] in
guard let strongSelf = self else { return seal.reject(PMKError.cancelled) }
if events.isEmpty {
seal.fulfill(())
} else {
let realm = strongSelf.realm
let eventsToSave = events.map { EventActivity(value: $0) }
do {
realm.beginWrite()
realm.add(eventsToSave, update: .all)
try realm.commitWrite()
seal.fulfill(())
} catch {
seal.reject(error)
}
} catch {
seal.reject(error)
}
}
}

@ -37,7 +37,7 @@ class EventsDataStore: EventsDataStoreProtocol {
.filter("eventName = '\(eventName)'")
//Filter stored as string, so we do a string comparison
.filter("filter = '\(filterName)=\(filterValue)'"))
}
}
func deleteEvents(forTokenContract contract: AlphaWallet.Address) {
let events = getEvents(forTokenContract: contract)
@ -57,35 +57,46 @@ class EventsDataStore: EventsDataStoreProtocol {
func getLastMatchingEventSortedByBlockNumber(forContract contract: AlphaWallet.Address, tokenContract: AlphaWallet.Address, server: RPCServer, eventName: String) -> Promise<EventInstanceValue?> {
return Promise { seal in
let event = Array(realm.threadSafe.objects(EventInstance.self)
.filter("contract = '\(contract.eip55String)'")
.filter("tokenContract = '\(tokenContract.eip55String)'")
.filter("chainId = \(server.chainID)")
.filter("eventName = '\(eventName)'")
.sorted(byKeyPath: "blockNumber"))
.map{ EventInstanceValue(event: $0) }
.last
DispatchQueue.main.async { [weak self] in
guard let strongSelf = self else { return seal.reject(PMKError.cancelled) }
let event = Array(strongSelf.realm.objects(EventInstance.self)
.filter("contract = '\(contract.eip55String)'")
.filter("tokenContract = '\(tokenContract.eip55String)'")
.filter("chainId = \(server.chainID)")
.filter("eventName = '\(eventName)'")
.sorted(byKeyPath: "blockNumber"))
.map { EventInstanceValue(event: $0) }
.last
seal.fulfill(event)
seal.fulfill(event)
}
}
}
func add(events: [EventInstanceValue], forTokenContract contract: AlphaWallet.Address) -> Promise<Void> {
if events.isEmpty {
return .value(())
}
return Promise { seal in
do {
let realm = self.realm.threadSafe
try realm.write {
let eventsToSave = events.map { EventInstance(event: $0) }
realm.add(eventsToSave, update: .all)
DispatchQueue.main.async { [weak self] in
guard let strongSelf = self else { return seal.reject(PMKError.cancelled) }
if events.isEmpty {
return seal.fulfill(())
} else {
do {
let realm = strongSelf.realm
let eventsToSave = events.map { EventInstance(event: $0) }
realm.beginWrite()
realm.add(eventsToSave, update: .all)
try realm.commitWrite()
seal.fulfill(())
} catch {
seal.reject(error)
}
seal.fulfill(())
strongSelf.triggerSubscribers(forContract: contract)
}
} catch {
seal.reject(error)
}
}
}

Loading…
Cancel
Save