Make polling subscriber pause when its provider pauses.
This commit is contained in:
parent
90a8a2c80d
commit
785093b276
@ -2,9 +2,9 @@ import { PollingEventSubscriber } from "./subscriber-polling.js";
|
||||
|
||||
import type { Frozen } from "@ethersproject/properties";
|
||||
|
||||
import type { Subscriber } from "./abstract-provider.js";
|
||||
import type { AbstractProvider, Subscriber } from "./abstract-provider.js";
|
||||
import type { Network } from "./network.js";
|
||||
import type { EventFilter, Provider } from "./provider.js";
|
||||
import type { EventFilter } from "./provider.js";
|
||||
import type { JsonRpcApiProvider } from "./provider-jsonrpc.js";
|
||||
|
||||
|
||||
@ -33,11 +33,11 @@ export class FilterIdSubscriber implements Subscriber {
|
||||
throw new Error("subclasses must override this");
|
||||
}
|
||||
|
||||
_emitResults(provider: Provider, result: Array<any>): Promise<void> {
|
||||
_emitResults(provider: AbstractProvider, result: Array<any>): Promise<void> {
|
||||
throw new Error("subclasses must override this");
|
||||
}
|
||||
|
||||
_recover(provider: Provider): Subscriber {
|
||||
_recover(provider: AbstractProvider): Subscriber {
|
||||
throw new Error("subclasses must override this");
|
||||
}
|
||||
|
||||
@ -100,7 +100,7 @@ export class FilterIdEventSubscriber extends FilterIdSubscriber {
|
||||
this.#event = copy(filter);
|
||||
}
|
||||
|
||||
_recover(provider: Provider): Subscriber {
|
||||
_recover(provider: AbstractProvider): Subscriber {
|
||||
return new PollingEventSubscriber(provider, this.#event);
|
||||
}
|
||||
|
||||
|
@ -1,15 +1,14 @@
|
||||
import { isHexString } from "@ethersproject/bytes";
|
||||
|
||||
import type { Subscriber } from "./abstract-provider.js";
|
||||
import type { EventFilter, OrphanFilter, Provider, ProviderEvent } from "./provider.js";
|
||||
|
||||
import type { AbstractProvider, Subscriber } from "./abstract-provider.js";
|
||||
import type { EventFilter, OrphanFilter, ProviderEvent } from "./provider.js";
|
||||
import { logger } from "./logger.js";
|
||||
|
||||
function copy(obj: any): any {
|
||||
return JSON.parse(JSON.stringify(obj));
|
||||
}
|
||||
|
||||
export function getPollingSubscriber(provider: Provider, event: ProviderEvent): Subscriber {
|
||||
export function getPollingSubscriber(provider: AbstractProvider, event: ProviderEvent): Subscriber {
|
||||
if (event === "block") { return new PollingBlockSubscriber(provider); }
|
||||
if (isHexString(event, 32)) { return new PollingTransactionSubscriber(provider, event); }
|
||||
|
||||
@ -21,8 +20,8 @@ export function getPollingSubscriber(provider: Provider, event: ProviderEvent):
|
||||
// @TODO: refactor this
|
||||
|
||||
export class PollingBlockSubscriber implements Subscriber{
|
||||
#provider: Provider;
|
||||
#poller: null | NodeJS.Timer;
|
||||
#provider: AbstractProvider;
|
||||
#poller: null | number;
|
||||
|
||||
#interval: number;
|
||||
|
||||
@ -30,7 +29,7 @@ export class PollingBlockSubscriber implements Subscriber{
|
||||
// indicates we still need to fetch an initial block number
|
||||
#blockNumber: number;
|
||||
|
||||
constructor(provider: Provider) {
|
||||
constructor(provider: AbstractProvider) {
|
||||
this.#provider = provider;
|
||||
this.#poller = null;
|
||||
this.#interval = 4000;
|
||||
@ -58,18 +57,18 @@ export class PollingBlockSubscriber implements Subscriber{
|
||||
this.#blockNumber = blockNumber;
|
||||
}
|
||||
|
||||
this.#poller = setTimeout(this.#poll.bind(this), this.#interval);
|
||||
this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
|
||||
}
|
||||
|
||||
start(): void {
|
||||
if (this.#poller) { throw new Error("subscriber already running"); }
|
||||
this.#poll();
|
||||
this.#poller = setTimeout(this.#poll.bind(this), this.#interval);
|
||||
this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
if (!this.#poller) { throw new Error("subscriber not running"); }
|
||||
clearTimeout(this.#poller);
|
||||
this.#provider._clearTimeout(this.#poller);
|
||||
this.#poller = null;
|
||||
}
|
||||
|
||||
@ -84,17 +83,17 @@ export class PollingBlockSubscriber implements Subscriber{
|
||||
}
|
||||
|
||||
export class OnBlockSubscriber implements Subscriber {
|
||||
#provider: Provider;
|
||||
#provider: AbstractProvider;
|
||||
#poll: (b: number) => void;
|
||||
|
||||
constructor(provider: Provider) {
|
||||
constructor(provider: AbstractProvider) {
|
||||
this.#provider = provider;
|
||||
this.#poll = (blockNumber: number) => {
|
||||
this._poll(blockNumber, this.#provider);
|
||||
}
|
||||
}
|
||||
|
||||
async _poll(blockNumber: number, provider: Provider): Promise<void> {
|
||||
async _poll(blockNumber: number, provider: AbstractProvider): Promise<void> {
|
||||
throw new Error("sub-classes must override this");
|
||||
}
|
||||
|
||||
@ -114,12 +113,12 @@ export class OnBlockSubscriber implements Subscriber {
|
||||
export class PollingOrphanSubscriber extends OnBlockSubscriber {
|
||||
#filter: OrphanFilter;
|
||||
|
||||
constructor(provider: Provider, filter: OrphanFilter) {
|
||||
constructor(provider: AbstractProvider, filter: OrphanFilter) {
|
||||
super(provider);
|
||||
this.#filter = copy(filter);
|
||||
}
|
||||
|
||||
async _poll(blockNumber: number, provider: Provider): Promise<void> {
|
||||
async _poll(blockNumber: number, provider: AbstractProvider): Promise<void> {
|
||||
throw new Error("@TODO");
|
||||
console.log(this.#filter);
|
||||
}
|
||||
@ -128,19 +127,19 @@ export class PollingOrphanSubscriber extends OnBlockSubscriber {
|
||||
export class PollingTransactionSubscriber extends OnBlockSubscriber {
|
||||
#hash: string;
|
||||
|
||||
constructor(provider: Provider, hash: string) {
|
||||
constructor(provider: AbstractProvider, hash: string) {
|
||||
super(provider);
|
||||
this.#hash = hash;
|
||||
}
|
||||
|
||||
async _poll(blockNumber: number, provider: Provider): Promise<void> {
|
||||
async _poll(blockNumber: number, provider: AbstractProvider): Promise<void> {
|
||||
const tx = await provider.getTransactionReceipt(this.#hash);
|
||||
if (tx) { provider.emit(this.#hash, tx); }
|
||||
}
|
||||
}
|
||||
|
||||
export class PollingEventSubscriber implements Subscriber {
|
||||
#provider: Provider;
|
||||
#provider: AbstractProvider;
|
||||
#filter: EventFilter;
|
||||
#poller: (b: number) => void;
|
||||
|
||||
@ -148,7 +147,7 @@ export class PollingEventSubscriber implements Subscriber {
|
||||
// indicates we still need to fetch an initial block number
|
||||
#blockNumber: number;
|
||||
|
||||
constructor(provider: Provider, filter: EventFilter) {
|
||||
constructor(provider: AbstractProvider, filter: EventFilter) {
|
||||
this.#provider = provider;
|
||||
this.#filter = copy(filter);
|
||||
this.#poller = this.#poll.bind(this);
|
||||
|
Loading…
Reference in New Issue
Block a user