[chore] update subscribe transaction confirmation flow

@types
neeboo 6 years ago
parent a66bfe3503
commit fa290a6689
  1. 14
      examples/testContract.js
  2. 7
      examples/testGanache.js
  3. 3
      packages/harmony-core/src/blockchain.ts
  4. 16
      packages/harmony-network/src/messenger/messenger.ts
  5. 1
      packages/harmony-network/src/providers/emitter.ts
  6. 3
      packages/harmony-network/src/tracker/baseTracker.ts
  7. 4
      packages/harmony-network/src/tracker/subscribeTracker.ts
  8. 6
      packages/harmony-network/src/types.ts
  9. 108
      packages/harmony-transaction/src/transaction.ts

@ -6,6 +6,10 @@
const { Harmony } = require('../packages/harmony-core/dist');
// You can import BN from `@harmony-js/crypto` or use `Harmony.utils.BN`instead
const { BN } = require('../packages/harmony-crypto/dist');
const {
SubscribeBlockTracker,
RPCMethod,
} = require('../packages/harmony-network/dist');
// import more utils
const {
isArray,
@ -108,6 +112,14 @@ const acc1 = harmony.wallet.addByMnemonic(mne, 0);
// now we create contract using extracted abi
const myContract = harmony.contracts.createContract(abi);
// harmony.messenger.send(RPCMethod.BlockNumber, []).then(console.log);
// harmony.messenger.subscribe(RPCMethod.Subscribe, ['newHeads']).then((res) => {
// res.onData((data) => {
// console.log(data);
// });
// });
// first we get the account's balance to see if we have enough token on the testnet
acc1.getBalance().then((res) => {
console.log(`-- hint: account balance of ${acc1.address}`);
@ -198,8 +210,6 @@ deployContract().then((deployed) => {
console.log(`${res.result}`);
console.log(``);
console.log(``);
console.log(toUtf8String(res.result));
deployed.methods
.myFunction()
.call()

@ -241,11 +241,6 @@ server.listen(port, function(err, blockchain) {
// console.log(txn);
// });
// });
const blockTracker = new SubscribeBlockTracker(harmony.messenger);
console.log(`--------------------asdfasdfasfasdfasdf----------------`);
const blocks = [];
blockTracker.on('latest', (block) => blocks.push(block));
console.log(blockTracker.isRunning());
console.log(`--------------------asdfasdfasfasdfasdf----------------`);
main();
});

@ -3,6 +3,7 @@ import {
Messenger,
ResponseMiddleware,
WSProvider,
SubscribeReturns,
} from '@harmony-js/network';
import {
@ -323,6 +324,7 @@ class Blockchain extends HarmonyCore {
return this.messenger.subscribe(
RPCMethod.Subscribe,
['newPendingTransactions'],
SubscribeReturns.method,
this.chainPrefix,
);
} else {
@ -335,6 +337,7 @@ class Blockchain extends HarmonyCore {
return this.messenger.subscribe(
RPCMethod.Subscribe,
['newHeads'],
SubscribeReturns.method,
this.chainPrefix,
);
} else {

@ -5,6 +5,7 @@ import { HttpProvider } from '../providers/http';
import { WSProvider } from '../providers/ws';
// import { getResultForData } from '../util';
import { RPCMethod } from '../rpcMethod/rpc';
import { SubscribeReturns } from '../types';
const defaultConfig = {
Default: {
@ -182,6 +183,7 @@ class Messenger extends HarmonyCore {
subscribe = async (
method: RPCMethod | string,
params?: string | any[] | undefined,
returnType: SubscribeReturns = SubscribeReturns.all,
rpcPrefix: string = this.chainPrefix,
) => {
let rpcMethod = method;
@ -190,11 +192,13 @@ class Messenger extends HarmonyCore {
} else if (!rpcPrefix || rpcPrefix === this.chainPrefix) {
rpcMethod = this.setRPCPrefix(method, this.chainPrefix);
}
let id: any = null;
if (this.provider instanceof WSProvider) {
const provider = this.provider;
try {
const payload = this.JsonRpc.toPayload(rpcMethod, params);
const id = await provider.subscribe(payload);
id = await provider.subscribe(payload);
provider.on(id, (result: any) => {
provider.emitter.emit('data', result);
});
@ -207,7 +211,15 @@ class Messenger extends HarmonyCore {
provider.emitter.emit('error', error);
provider.removeEventListener('*');
}
return provider;
if (returnType === SubscribeReturns.all) {
return [provider, id];
} else if (returnType === SubscribeReturns.method) {
return provider;
} else if (returnType === SubscribeReturns.id) {
return id;
} else {
throw new Error('Invalid returns');
}
} else {
throw new Error('HttpProvider does not support this');
}

@ -19,7 +19,6 @@ class Emitter {
this.reject = reject;
});
this.then = this.promise.then.bind(this.promise);
this.emit('newListener', 'newListener');
}
resetHandlers() {

@ -37,7 +37,8 @@ export class BaseBlockTracker extends Emitter {
// this._onRemoveListener = this._onRemoveListener.bind(this);
// this._resetCurrentBlock = this._resetCurrentBlock.bind(this);
// listen for handler changes
this._setupInternalEvents();
// this._setupInternalEvents();
this._maybeStart();
}
isRunning() {

@ -31,7 +31,7 @@ export class SubscribeBlockTracker extends BaseBlockTracker {
async _start() {
try {
const blockNumber = await this.messenger.send(RPCMethod.BlockNumber, []);
console.log({ blockNumber });
if (blockNumber.isError()) {
throw blockNumber.message;
} else if (blockNumber.isResult()) {
@ -39,7 +39,7 @@ export class SubscribeBlockTracker extends BaseBlockTracker {
'newHeads',
]);
this.subscriptionId = subs;
subs.onData(this._handleSubData);
subs[0].onData(this._handleSubData);
this._newPotentialLatest(blockNumber);
}

@ -8,6 +8,12 @@ export const enum MiddlewareType {
RES,
}
export const enum SubscribeReturns {
all = 'all',
id = 'id',
method = 'method',
}
export interface Middleware {
request: object;
response: object;

@ -8,7 +8,13 @@ import {
splitSignature,
} from '@harmony-js/crypto';
import { add0xToString, numberToHex } from '@harmony-js/utils';
import { Messenger, RPCMethod, Emitter } from '@harmony-js/network';
import {
Messenger,
RPCMethod,
Emitter,
HttpProvider,
SubscribeReturns,
} from '@harmony-js/network';
import { TxParams, TxStatus, TransasctionReceipt } from './types';
import {
recover,
@ -22,6 +28,8 @@ class Transaction {
emitter: Emitter;
messenger: Messenger;
txStatus: TxStatus;
blockNumbers: string[] = [];
confirmationCheck: number = 0;
receipt?: TransasctionReceipt;
private id: string;
private from: string;
@ -290,27 +298,91 @@ class Transaction {
maxAttempts: number = 20,
interval: number = 1000,
) {
this.txStatus = TxStatus.PENDING;
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
try {
if (await this.trackTx(txHash)) {
if (this.messenger.provider instanceof HttpProvider) {
this.txStatus = TxStatus.PENDING;
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
try {
if (await this.trackTx(txHash)) {
this.emitConfirm(this.txStatus);
return this;
}
} catch (err) {
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
return this;
throw err;
}
if (attempt + 1 < maxAttempts) {
await sleep(interval * attempt);
}
} catch (err) {
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
throw err;
}
if (attempt + 1 < maxAttempts) {
await sleep(interval * attempt);
}
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
throw new Error(
`The transaction is still not confirmed after ${maxAttempts} attempts.`,
);
} else {
await this.socketConfirm(txHash, maxAttempts, interval);
return this;
}
}
async socketConfirm(
txHash: string,
maxAttempts: number = 20,
interval: number = 1000,
) {
try {
const [newHeads, subscriptionId] = await this.messenger.subscribe(
RPCMethod.Subscribe,
['newHeads'],
SubscribeReturns.all,
);
newHeads
.onData(async (data: any) => {
const currentBlock = await this.messenger.send(
RPCMethod.BlockNumber,
[],
);
if (currentBlock.isError()) {
throw currentBlock.message;
}
if (!this.blockNumbers.includes(data.number)) {
const tracker = await this.trackTx(txHash);
if (tracker) {
this.emitConfirm(this.txStatus);
await this.messenger.unsubscribe(RPCMethod.UnSubscribe, [
subscriptionId,
]);
} else {
this.blockNumbers.push(currentBlock.result);
this.confirmationCheck += 1;
if (this.confirmationCheck === maxAttempts * interval) {
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
await this.messenger.unsubscribe(RPCMethod.UnSubscribe, [
subscriptionId,
]);
}
}
}
})
.onError(async (error: any) => {
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
this.emitError(error);
await this.messenger.unsubscribe(RPCMethod.UnSubscribe, [
subscriptionId,
]);
throw new Error(error);
});
} catch (error) {
throw error;
}
this.txStatus = TxStatus.REJECTED;
this.emitConfirm(this.txStatus);
throw new Error(
`The transaction is still not confirmed after ${maxAttempts} attempts.`,
);
}
emitTransactionHash(transactionHash: string) {

Loading…
Cancel
Save