[feat] update websocket provider

@types
neeboo 6 years ago
parent 53ac439fb7
commit fcf21c534f
  1. 37
      examples/temp.html
  2. 42
      examples/testGanache.js
  3. 31
      packages/harmony-core/src/blockchain.ts
  4. 61
      packages/harmony-network/src/messenger/messenger.ts
  5. 42
      packages/harmony-network/src/providers/baseSocket.ts
  6. 213
      packages/harmony-network/src/providers/ws.ts
  7. 2
      packages/harmony-network/src/rpcMethod/rpc.ts
  8. 2
      packages/harmony-network/src/rpcMethod/rpcBuilder.ts

@ -36,6 +36,41 @@
</div>
<script src="../dist/HarmonyJs.browser.js"></script>
<script>
const harmony= new HarmonyJs.Harmony('ws://localhost:18545',1);
const diu = async () => {
const p= await harmony.blockchain.newPendingTransactions()
// harmony.blockchain.newBlockHeaders()
p.onData(res=>{
console.log(res)
}).on('error',error=>{
console.log(error)
})
setTimeout(()=>{
if(p.subscriptions!=={}){
p.clearSubscriptions('eth_unsubscribe').then(res=>{
if(res){
console.log('Successfully unsubscribed!')
}
})
}
},50000)
}
setTimeout(diu, 100);
</script>
</body>
</html>

@ -1,6 +1,5 @@
const { Harmony } = require('@harmony/core');
const ganache = require('ganache-cli');
const WebSocket = require('ws');
const port = 18545;
@ -50,6 +49,8 @@ async function main() {
await createAndEncrypt(mne, 10, password);
// harmony.blockchain.newPendingTransacitons();
const latestBalance = await harmony.blockchain.getBalance({
address: acc.address,
blockNumber: 'latest',
@ -196,10 +197,45 @@ async function main() {
console.log('-------------------------------------');
console.log(getBalanceAgainObject);
console.log('-------------------------------------');
setTimeout(async () => {
const s2 = await harmony.wallet.signTransaction(txn, undefined, password);
await s2.sendTransaction();
}, 5000);
setTimeout(async () => {
const s3 = await harmony.wallet.signTransaction(txn, undefined, password);
await s3.sendTransaction();
}, 10000);
setTimeout(async () => {
const s3 = await harmony.wallet.signTransaction(txn, undefined, password);
await s3.sendTransaction();
}, 15000);
setTimeout(async () => {
const s4 = await harmony.wallet.signTransaction(txn, undefined, password);
await s4.sendTransaction();
}, 20000);
setTimeout(async () => {
const s5 = await harmony.wallet.signTransaction(txn, undefined, password);
await s5.sendTransaction();
}, 25000);
setTimeout(async () => {
const s6 = await harmony.wallet.signTransaction(txn, undefined, password);
await s6.sendTransaction();
}, 30000);
}
server.listen(port, function(err, blockchain) {
// console.log(blockchain);
main();
setTimeout(() => main(), 5000);
// console.log(wsHarmony.provider.connected);
});
// setTimeout(() => {
// harmony.blockchain.newPendingTransactions().then((emitter) => {
// emitter.on('data', (data) => {
// console.log('-------- ws data --------');
// console.log(data);
// console.log('-------- ws data --------');
// });
// });
// }, 1000);

@ -1,4 +1,9 @@
import { RPCMethod, Messenger, ResponseMiddleware } from '@harmony/network';
import {
RPCMethod,
Messenger,
ResponseMiddleware,
WSProvider,
} from '@harmony/network';
import {
assertObject,
@ -294,6 +299,30 @@ class Blockchain extends HarmonyCore {
return result;
}
}
newPendingTransactions() {
if (this.messenger.provider instanceof WSProvider) {
return this.messenger.subscribe(
RPCMethod.Subscribe,
['newPendingTransactions'],
this.chainPrefix,
);
} else {
throw new Error('HttpProvider does not support this feature');
}
}
newBlockHeaders() {
if (this.messenger.provider instanceof WSProvider) {
return this.messenger.subscribe(
RPCMethod.Subscribe,
['newBlockHeaders'],
this.chainPrefix,
);
} else {
throw new Error('HttpProvider does not support this feature');
}
}
}
export { Blockchain };

@ -156,7 +156,7 @@ class Messenger extends HarmonyCore {
* @param {String} method - method name
*/
setResMiddleware(middleware: any, method = '*') {
return this.provider.middlewares.response.use(middleware, method);
this.provider.middlewares.response.use(middleware, method);
}
/**
@ -177,5 +177,64 @@ class Messenger extends HarmonyCore {
stringArray[0] = prefix;
return stringArray.join('_');
}
subscribe = async (
method: RPCMethod | string,
params?: string | any[] | undefined,
rpcPrefix?: string,
) => {
let rpcMethod = method;
if (rpcPrefix && isString(rpcPrefix) && rpcPrefix !== this.chainPrefix) {
rpcMethod = this.setRPCPrefix(method, rpcPrefix);
} else if (!rpcPrefix || rpcPrefix === this.chainPrefix) {
rpcMethod = this.setRPCPrefix(method, this.chainPrefix);
}
if (this.provider instanceof WSProvider) {
const provider = this.provider;
try {
const payload = this.JsonRpc.toPayload(rpcMethod, params);
const id = await provider.subscribe(payload);
provider.on(id, (result: any) => {
provider.emitter.emit('data', result);
});
provider.once('error', (error) => {
provider.removeEventListener(id);
provider.emitter.emit('error', error);
provider.removeEventListener('*');
});
} catch (error) {
provider.emitter.emit('error', error);
provider.removeEventListener('*');
}
return provider;
} else {
throw new Error('HttpProvider does not support this');
}
};
unsubscribe = async (
method: RPCMethod | string,
params?: string | any[] | undefined,
rpcPrefix?: string,
) => {
let rpcMethod = method;
if (rpcPrefix && isString(rpcPrefix) && rpcPrefix !== this.chainPrefix) {
rpcMethod = this.setRPCPrefix(method, rpcPrefix);
} else if (!rpcPrefix || rpcPrefix === this.chainPrefix) {
rpcMethod = this.setRPCPrefix(method, this.chainPrefix);
}
if (this.provider instanceof WSProvider) {
const provider = this.provider;
try {
const payload = this.JsonRpc.toPayload(rpcMethod, params);
const response = await provider.unsubscribe(payload);
return response;
} catch (error) {
throw error;
}
} else {
throw new Error('HttpProvider does not support this');
}
};
}
export { Messenger };

@ -19,10 +19,14 @@ export const enum SocketState {
SOCKET_ACCOUNTS_CHANGED = 'socket_accountsChanged',
}
export const enum EmittType {
INSTANCE = 'instance',
PUBSUB = 'pubsub',
}
class BaseSocket extends BaseProvider {
url: string;
emitter: mitt.Emitter;
subscriptions: any = {};
handlers: any = {};
constructor(url: string) {
super(url);
@ -41,10 +45,14 @@ class BaseSocket extends BaseProvider {
once(type: string, handler: mitt.Handler) {
this.emitter.on(type, handler);
this.removeListener(type);
this.removeEventListener(type);
}
addEventListener(type: string, handler: mitt.Handler) {
this.emitter.on(type, handler);
}
removeListener(type?: string, handler?: mitt.Handler) {
removeEventListener(type?: string, handler?: mitt.Handler) {
if (!type) {
this.handlers = {};
return;
@ -56,32 +64,32 @@ class BaseSocket extends BaseProvider {
}
}
reset() {
this.removeListener('*');
this.removeEventListener('*');
// this.registerEventListeners();
}
removeAllSocketListeners() {
this.removeListener(SocketState.SOCKET_MESSAGE);
this.removeListener(SocketState.SOCKET_READY);
this.removeListener(SocketState.SOCKET_CLOSE);
this.removeListener(SocketState.SOCKET_ERROR);
this.removeListener(SocketState.SOCKET_CONNECT);
this.removeEventListener(SocketState.SOCKET_MESSAGE);
this.removeEventListener(SocketState.SOCKET_READY);
this.removeEventListener(SocketState.SOCKET_CLOSE);
this.removeEventListener(SocketState.SOCKET_ERROR);
this.removeEventListener(SocketState.SOCKET_CONNECT);
}
onReady(event: any) {
this.emitter.on(SocketConnection.READY, () => event);
this.emitter.on(SocketState.SOCKET_READY, () => event);
this.emitter.emit(SocketConnection.READY, event);
this.emitter.emit(SocketState.SOCKET_READY, event);
}
onError(error: any) {
this.emitter.on(SocketConnection.ERROR, () => error);
this.emitter.on(SocketState.SOCKET_ERROR, () => error);
this.emitter.emit(SocketConnection.ERROR, error);
this.emitter.emit(SocketState.SOCKET_ERROR, error);
this.removeAllSocketListeners();
this.removeListener('*');
this.removeEventListener('*');
}
onClose(error = null) {
this.emitter.on(SocketConnection.CLOSE, () => error);
this.emitter.on(SocketState.SOCKET_CLOSE, () => error);
this.emitter.emit(SocketConnection.CLOSE, error);
this.emitter.emit(SocketState.SOCKET_CLOSE, error);
this.removeAllSocketListeners();
this.removeListener('*');
this.removeEventListener('*');
}
}

@ -1,17 +1,28 @@
// TODO: implement Websocket Provider
import { w3cwebsocket as W3CWebsocket } from 'websocket';
import { BaseSocket } from './baseSocket';
import { isWs } from '@harmony/utils';
import {
BaseSocket,
SocketConnection,
SocketState,
// EmittType,
} from './baseSocket';
import { isWs, isObject, isArray } from '@harmony/utils';
import { JsonRpc } from '../rpcMethod/rpcbuilder';
import { composeMiddleware } from '../rpcMethod/net';
import { RPCRequestPayload } from '../types';
class WSProvider extends BaseSocket {
get connected() {
return this.connection.readyState === this.connection.OPEN;
}
url: string;
subscriptions: any = {};
options: any = {};
subscriptions: any;
options: any;
connection: W3CWebsocket | WebSocket;
jsonRpc: JsonRpc;
on: any;
// ws: w3cwebsocket;
constructor(url: string, options: any = {}) {
super(url);
@ -22,6 +33,14 @@ class WSProvider extends BaseSocket {
this.options = options;
this.connection = this.createWebsocketProvider(this.url, this.options);
this.jsonRpc = new JsonRpc();
this.subscriptions = {};
this.registerEventListeners();
this.on = this.emitter.on.bind(this);
}
onData(handler: any) {
this.emitter.on('data', handler);
return this;
}
createWebsocketProvider(url: string, options: any = {}) {
@ -51,10 +70,6 @@ class WSProvider extends BaseSocket {
}
}
get connected() {
return this.connection.readyState === this.connection.OPEN;
}
isConnecting() {
return this.connection.readyState === this.connection.CONNECTING;
}
@ -63,23 +78,181 @@ class WSProvider extends BaseSocket {
const [tReq, tRes] = this.getMiddleware(payload.method);
const reqMiddleware = composeMiddleware(...tReq);
const resMiddleware = composeMiddleware(...tRes);
try {
this.connection.send(reqMiddleware(JSON.stringify(payload)));
} catch (error) {
throw error;
}
return new Promise((resolve, reject) => {
this.connection.send(reqMiddleware(JSON.stringify(payload)));
this.connection.onmessage = (msg: MessageEvent) => {
if (msg && msg.data) {
let result;
try {
result = JSON.parse(msg.data);
resolve(resMiddleware(result));
} catch (error) {
reject(error);
}
this.emitter.on(`${payload.id}`, (data) => {
resolve(resMiddleware(data));
this.removeEventListener(`${payload.id}`);
});
this.emitter.on('connect', () => {
this.send(payload)
.then(resolve)
.catch(reject);
});
});
}
async subscribe(payload: RPCRequestPayload<any[]>) {
const response = await this.send(payload);
const responseValidateResult = this.validate(response);
if (responseValidateResult instanceof Error) {
throw responseValidateResult;
}
this.subscriptions[response.result] = {
id: response.result,
subscribeMethod: payload.method,
parameters: payload.params,
payload,
};
return response.result;
}
async unsubscribe(payload: RPCRequestPayload<any[]>) {
const subscriptionId = payload.params[0];
if (this.hasSubscription(subscriptionId)) {
return this.send(payload).then((response) => {
if (response) {
this.removeEventListener(this.getSubscriptionEvent(subscriptionId));
delete this.subscriptions[subscriptionId];
}
return response;
});
}
return Promise.reject(
new Error(
`Provider error: Subscription with ID ${subscriptionId} does not exist.`,
),
);
}
async clearSubscriptions(unsubscribeMethod: string) {
const unsubscribePromises: Array<Promise<any>> = [];
Object.keys(this.subscriptions).forEach((key) => {
this.removeEventListener(key);
unsubscribePromises.push(
this.unsubscribe(
this.jsonRpc.toPayload(unsubscribeMethod, this.subscriptions[key].id),
),
);
});
const results = await Promise.all(unsubscribePromises);
if (results.includes(false)) {
throw new Error(
`Could not unsubscribe all subscriptions: ${JSON.stringify(results)}`,
);
}
return true;
}
registerEventListeners() {
this.connection.onmessage = this.onMessage.bind(this);
this.connection.onopen = this.onReady.bind(this);
this.connection.onopen = this.onConnect.bind(this);
this.connection.onclose = this.onClose.bind(this);
this.connection.onerror = this.onError.bind(this);
}
onMessage(msg: MessageEvent) {
if (msg && msg.data) {
let result;
let event;
try {
result = isObject(msg.data) ? msg.data : JSON.parse(msg.data);
if (isArray(result)) {
event = result[0].id;
}
if (typeof result.id === 'undefined') {
event =
this.getSubscriptionEvent(result.params.subscription) ||
result.params.subscription;
// result = result.params;
} else {
reject('provider error');
event = result.id;
}
};
} catch (error) {
throw error;
}
this.emitter.emit(SocketState.SOCKET_MESSAGE, result);
this.emitter.emit(`${event}`, result);
} else {
throw new Error('provider error');
}
}
async onConnect() {
if (!this.subscriptions) {
this.subscriptions = {};
}
const subscriptionKeys = Object.keys(this.subscriptions);
if (subscriptionKeys.length > 0) {
for (const key of subscriptionKeys) {
const subscriptionId: any = await this.subscribe(
this.subscriptions[key].payload,
);
delete this.subscriptions[subscriptionId];
this.subscriptions[key].id = subscriptionId;
}
}
this.emitter.emit(SocketState.SOCKET_CONNECT);
this.emitter.emit(SocketConnection.CONNECT);
}
getSubscriptionEvent(subscriptionId: any) {
if (this.subscriptions[subscriptionId]) {
return subscriptionId;
}
let event;
Object.keys(this.subscriptions).forEach((key) => {
if (this.subscriptions[key].id === subscriptionId) {
event = key;
}
});
return event;
}
hasSubscription(subscriptionId: string) {
return typeof this.getSubscriptionEvent(subscriptionId) !== 'undefined';
}
validate(response: any, payload?: any) {
if (isObject(response)) {
if (response.error) {
if (response.error instanceof Error) {
return new Error(`Node error: ${response.error.message}`);
}
return new Error(`Node error: ${JSON.stringify(response.error)}`);
}
if (payload && response.id !== payload.id) {
return new Error(
`Validation error: Invalid JSON-RPC response ID (request: ${
payload.id
} / response: ${response.id})`,
);
}
if (response.result === undefined) {
return new Error('Validation error: Undefined JSON-RPC result');
}
return true;
}
return new Error('Validation error: Response should be of type Object');
}
}

@ -31,6 +31,8 @@ export const enum RPCMethod {
SendTransaction = 'hmy_sendTransaction',
// 15. hmy_sendRawTransaction
SendRawTransaction = 'hmy_sendRawTransaction',
// 16. hmy_subscribe
Subscribe = 'hmy_subscribe',
}
export const enum RPCErrorCode {

@ -27,7 +27,7 @@ class JsonRpc {
toPayload = (
method: RPCMethod | string,
params: string | undefined | any[],
): RPCRequestPayload<object> => {
): RPCRequestPayload<any> => {
// FIXME: error to be done by shared/errors
if (!method) throw new Error('jsonrpc method should be specified!');

Loading…
Cancel
Save