From dee3fe00d1de8a927b3d5648e667f516476a8ffc Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Wed, 22 Feb 2023 11:40:34 +0900 Subject: [PATCH 1/5] feat: Push job to fixed heartbeat q & wake up single-node reporter --- core/src/worker/aggregator.ts | 43 ++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/core/src/worker/aggregator.ts b/core/src/worker/aggregator.ts index 70b6b5800..9f8df3a16 100644 --- a/core/src/worker/aggregator.ts +++ b/core/src/worker/aggregator.ts @@ -42,12 +42,30 @@ export async function aggregatorWorker(_logger: Logger) { const aggregatorsWithAdapters = mergeAggregatorsAdapters(aggregators, adapters) logger.debug(aggregatorsWithAdapters, 'aggregatorsWithAdapters') - // Launch all aggregators to be executed with random heartbeat - const heartbeatQueue = new Queue(RANDOM_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) + const randomHeartbeatQueue = new Queue(RANDOM_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) + const fixedHeartbeatQueue = new Queue(FIXED_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION) + + // Launch all aggregators to be executed with random and fixed heartbeat for (const aggregatorAddress in aggregatorsWithAdapters) { const aggregator = aggregatorsWithAdapters[aggregatorAddress] + if (aggregator.fixedHeartbeatRate.active) { + await fixedHeartbeatQueue.add( + 'fixed-heartbeat', + { aggregatorAddress }, + { + delay: await getSynchronizedDelay( + aggregatorAddress, + aggregator.fixedHeartbeatRate.value, + _logger + ), + removeOnComplete: true, + removeOnFail: true + } + ) + } + if (aggregator.randomHeartbeatRate.active) { - await heartbeatQueue.add('random-heartbeat', addReportProperty(aggregator, undefined), { + await randomHeartbeatQueue.add('random-heartbeat', addReportProperty(aggregator, undefined), { delay: uniform(0, aggregator.randomHeartbeatRate.value), removeOnComplete: REMOVE_ON_COMPLETE, removeOnFail: REMOVE_ON_FAIL @@ -298,3 +316,22 @@ function shouldReport( function addReportProperty(o, report: boolean | undefined) { return Object.assign({}, ...[o, { report }]) } + +async function getSynchronizedDelay( + aggregatorAddress: string, + delay: number, + _logger: Logger +): Promise { + const startedAt = await getRoundStartedAt(aggregatorAddress) + const synchronizedDelay = delay - (startedAt % delay) + _logger.debug({ synchronizedDelay }, 'synchronizedDelay') + return synchronizedDelay +} + +async function getRoundStartedAt(aggregatorAddress: string): Promise { + const { _startedAt } = await oracleRoundStateCall({ + aggregatorAddress, + operatorAddress: OPERATOR_ADDRESS + }) + return _startedAt.toNumber() +} From 1ea41f71cc464273b691bc3bafc7574bcccd4624 Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Wed, 22 Feb 2023 12:10:42 +0900 Subject: [PATCH 2/5] feat: Pass fixed delay from reporter --- core/src/reporter/aggregator.ts | 2 +- core/src/types.ts | 1 + core/src/worker/aggregator.ts | 24 ++++++++++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/core/src/reporter/aggregator.ts b/core/src/reporter/aggregator.ts index 3cb4760ad..b6032562f 100644 --- a/core/src/reporter/aggregator.ts +++ b/core/src/reporter/aggregator.ts @@ -55,7 +55,7 @@ function job(wallet, _logger: Logger) { aggregatorAddress } await heartbeatQueue.add('fixed-heartbeat', outData, { - delay: 15_000, // FIXME + delay: inData.delay, removeOnComplete: true, removeOnFail: true, jobId: aggregatorAddress diff --git a/core/src/types.ts b/core/src/types.ts index 6bbfab6c1..2c4f838a1 100644 --- a/core/src/types.ts +++ b/core/src/types.ts @@ -248,6 +248,7 @@ export interface IAggregatorWorkerReporter { roundId: number submission: number workerSource: string + delay: number } // VRF diff --git a/core/src/worker/aggregator.ts b/core/src/worker/aggregator.ts index 9f8df3a16..a374a3dab 100644 --- a/core/src/worker/aggregator.ts +++ b/core/src/worker/aggregator.ts @@ -90,7 +90,12 @@ export async function aggregatorWorker(_logger: Logger) { // Random heartbeat worker new Worker( RANDOM_HEARTBEAT_QUEUE_NAME, - randomHeartbeatJob(RANDOM_HEARTBEAT_QUEUE_NAME, REPORTER_AGGREGATOR_QUEUE_NAME, _logger), + randomHeartbeatJob( + RANDOM_HEARTBEAT_QUEUE_NAME, + REPORTER_AGGREGATOR_QUEUE_NAME, + aggregatorsWithAdapters, + _logger + ), BULLMQ_CONNECTION ) } @@ -118,6 +123,7 @@ function aggregatorJob( const outData = await prepareDataForReporter({ data: aggregator, workerSource: inData.workerSource, + delay: aggregator.fixedHeartbeatRate.value, roundId, _logger }) @@ -178,6 +184,7 @@ function fixedHeartbeatJob(aggregatorJobQueueName: string, _logger: Logger) { function randomHeartbeatJob( heartbeatQueueName: string, reporterQueueName: string, + aggregatorsWithAdapters: IAggregatorJob[], _logger: Logger ) { const logger = _logger.child({ name: 'randomHeartbeatJob', file: FILE_NAME }) @@ -190,11 +197,17 @@ function randomHeartbeatJob( logger.debug(inData, 'inData') const aggregatorAddress = inData.address + const aggregator = aggregatorsWithAdapters[aggregatorAddress] + + if (!aggregator) { + throw new IcnError(IcnErrorCode.UndefinedAggregator) + } try { const outData = await prepareDataForReporter({ data: inData, workerSource: 'random', + delay: aggregator.fixedHeartbeatRate.value, _logger }) logger.debug(outData, 'outData') @@ -227,17 +240,23 @@ function randomHeartbeatJob( * Fetch the latest data and prepare them to be sent to reporter. * * @param {IAggregatorHeartbeatWorker} data - * @return {Promise} + * @param {string} workerSource + * @param {number} delay + * @param {number} roundId + * @param {Logger} _logger + * @return {Promise { @@ -272,6 +291,7 @@ async function prepareDataForReporter({ report, callbackAddress, workerSource, + delay, submission, roundId: roundId || oracleRoundState._roundId } From 9d77ecf91107d17ffcb676db08827a9a652391a6 Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Wed, 22 Feb 2023 15:13:49 +0900 Subject: [PATCH 3/5] fix: If the latest round hasn't been reported, use previous for sync --- core/src/worker/aggregator.ts | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/worker/aggregator.ts b/core/src/worker/aggregator.ts index a374a3dab..c2e995cc5 100644 --- a/core/src/worker/aggregator.ts +++ b/core/src/worker/aggregator.ts @@ -342,16 +342,27 @@ async function getSynchronizedDelay( delay: number, _logger: Logger ): Promise { - const startedAt = await getRoundStartedAt(aggregatorAddress) - const synchronizedDelay = delay - (startedAt % delay) - _logger.debug({ synchronizedDelay }, 'synchronizedDelay') - return synchronizedDelay -} + // FIXME modify aggregator to use single contract call -async function getRoundStartedAt(aggregatorAddress: string): Promise { - const { _startedAt } = await oracleRoundStateCall({ + let startedAt: number = 0 + let { _startedAt, _roundId } = await oracleRoundStateCall({ aggregatorAddress, operatorAddress: OPERATOR_ADDRESS }) - return _startedAt.toNumber() + + if (_startedAt.toNumber() != 0) { + startedAt = _startedAt.toNumber() + } else { + const { _startedAt } = await oracleRoundStateCall({ + aggregatorAddress, + operatorAddress: OPERATOR_ADDRESS, + roundId: Math.max(0, _roundId - 1) + }) + startedAt = _startedAt.toNumber() + } + + _logger.debug({ startedAt }, 'synchronizedDelay') + const synchronizedDelay = delay - (startedAt % delay) + _logger.debug({ synchronizedDelay }, 'synchronizedDelay') + return synchronizedDelay } From cfbea6fbac74ad7b67f61fd02b34968535a1c68d Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Wed, 22 Feb 2023 15:16:46 +0900 Subject: [PATCH 4/5] fix: eslint --- core/src/worker/aggregator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/worker/aggregator.ts b/core/src/worker/aggregator.ts index c2e995cc5..b72623928 100644 --- a/core/src/worker/aggregator.ts +++ b/core/src/worker/aggregator.ts @@ -344,8 +344,8 @@ async function getSynchronizedDelay( ): Promise { // FIXME modify aggregator to use single contract call - let startedAt: number = 0 - let { _startedAt, _roundId } = await oracleRoundStateCall({ + const startedAt = 0 + const { _startedAt, _roundId } = await oracleRoundStateCall({ aggregatorAddress, operatorAddress: OPERATOR_ADDRESS }) From e35ff4b856dde4f94e0d2980a30fb0bfdb34b823 Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Wed, 22 Feb 2023 15:20:33 +0900 Subject: [PATCH 5/5] fix: const -> let --- core/src/worker/aggregator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/worker/aggregator.ts b/core/src/worker/aggregator.ts index b72623928..0d2cf041d 100644 --- a/core/src/worker/aggregator.ts +++ b/core/src/worker/aggregator.ts @@ -344,7 +344,7 @@ async function getSynchronizedDelay( ): Promise { // FIXME modify aggregator to use single contract call - const startedAt = 0 + let startedAt = 0 const { _startedAt, _roundId } = await oracleRoundStateCall({ aggregatorAddress, operatorAddress: OPERATOR_ADDRESS