Fix concurrent events in PollingEventSubscriber

This commit is contained in:
Andre Staltz 2023-08-02 12:37:39 +03:00
parent 17969fe416
commit 259237f2fd

View File

@ -7,6 +7,10 @@ function copy(obj: any): any {
return JSON.parse(JSON.stringify(obj)); return JSON.parse(JSON.stringify(obj));
} }
function stall(duration: number): Promise<void> {
return new Promise((resolve) => { setTimeout(resolve, duration); });
}
/** /**
* Return the polling subscriber for common events. * Return the polling subscriber for common events.
* *
@ -219,6 +223,7 @@ export class PollingEventSubscriber implements Subscriber {
// The most recent block we have scanned for events. The value -2 // The most recent block we have scanned for events. The value -2
// indicates we still need to fetch an initial block number // indicates we still need to fetch an initial block number
#blockNumber: number; #blockNumber: number;
#concurrentBlockNumber: number;
/** /**
* Create a new **PollingTransactionSubscriber** attached to * Create a new **PollingTransactionSubscriber** attached to
@ -230,18 +235,31 @@ export class PollingEventSubscriber implements Subscriber {
this.#poller = this.#poll.bind(this); this.#poller = this.#poll.bind(this);
this.#running = false; this.#running = false;
this.#blockNumber = -2; this.#blockNumber = -2;
this.#concurrentBlockNumber = -2;
} }
async #poll(blockNumber: number): Promise<void> { async #poll(blockNumber: number): Promise<void> {
// The initial block hasn't been determined yet // The initial block hasn't been determined yet
if (this.#blockNumber === -2) { return; } if (this.#blockNumber === -2) { return; }
// Debounce concurrent calls to #poll within a 16ms time window
if (blockNumber > this.#concurrentBlockNumber) {
this.#concurrentBlockNumber = blockNumber;
}
await stall(16);
if (blockNumber !== this.#concurrentBlockNumber) {
return;
}
const filter = copy(this.#filter); const filter = copy(this.#filter);
filter.fromBlock = this.#blockNumber + 1; filter.fromBlock = this.#blockNumber + 1;
filter.toBlock = blockNumber; filter.toBlock = blockNumber;
const logs = await this.#provider.getLogs(filter); const logs = await this.#provider.getLogs(filter);
// Undo debounce state
this.#concurrentBlockNumber = -2;
// No logs could just mean the node has not indexed them yet, // No logs could just mean the node has not indexed them yet,
// so we keep a sliding window of 60 blocks to keep scanning // so we keep a sliding window of 60 blocks to keep scanning
if (logs.length === 0) { if (logs.length === 0) {