8000 Wake up single-node Data Feed reporters by martinkersner · Pull Request #301 · Bisonai/orakl · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Wake up single-node Data Feed reporters #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/reporter/aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function job(wallet, _logger: Logger) {
aggregatorAddress
}
await heartbeatQueue.add('fixed-heartbeat', outData, {
delay: 15_000, // FIXME
delay: inData.delay,
removeOnComplete: true,
removeOnFail: true,
jobId: aggregatorAddress
Expand Down
1 change: 1 addition & 0 deletions core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ export interface IAggregatorWorkerReporter {
roundId: number
submission: number
workerSource: string
delay: number
}

// VRF
Expand Down
78 changes: 73 additions & 5 deletions core/src/worker/aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,30 @@ export async function aggregatorWorker(_logger: Logger) {
const aggregatorsWithAdapters = mergeAggregatorsAdapters(aggregators, adapters)
logger.debug(aggregatorsWithAdapters, 'aggregatorsWithAdapters')

// Launch all aggregators to be executed with random heartbeat
const heartbeatQueue = new Queue(RANDOM_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION)
const randomHeartbeatQueue = new Queue(RANDOM_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION)
const fixedHeartbeatQueue = new Queue(FIXED_HEARTBEAT_QUEUE_NAME, BULLMQ_CONNECTION)

// Launch all aggregators to be executed with random and fixed heartbeat
for (const aggregatorAddress in aggregatorsWithAdapters) {
const aggregator = aggregatorsWithAdapters[aggregatorAddress]
if (aggregator.fixedHeartbeatRate.active) {
await fixedHeartbeatQueue.add(
'fixed-heartbeat',
{ aggregatorAddress },
{
delay: await getSynchronizedDelay(
aggregatorAddress,
aggregator.fixedHeartbeatRate.value,
_logger
),
removeOnComplete: true,
removeOnFail: true
}
)
}

if (aggregator.randomHeartbeatRate.active) {
await heartbeatQueue.add('random-heartbeat', addReportProperty(aggregator, undefined), {
await randomHeartbeatQueue.add('random-heartbeat', addReportProperty(aggregator, undefined), {
delay: uniform(0, aggregator.randomHeartbeatRate.value),
removeOnComplete: REMOVE_ON_COMPLETE,
removeOnFail: REMOVE_ON_FAIL
Expand All @@ -72,7 +90,12 @@ export async function aggregatorWorker(_logger: Logger) {
// Random heartbeat worker
new Worker(
RANDOM_HEARTBEAT_QUEUE_NAME,
randomHeartbeatJob(RANDOM_HEARTBEAT_QUEUE_NAME, REPORTER_AGGREGATOR_QUEUE_NAME, _logger),
randomHeartbeatJob(
RANDOM_HEARTBEAT_QUEUE_NAME,
REPORTER_AGGREGATOR_QUEUE_NAME,
aggregatorsWithAdapters,
_logger
),
BULLMQ_CONNECTION
)
}
Expand Down Expand Up @@ -100,6 +123,7 @@ function aggregatorJob(
const outData = await prepareDataForReporter({
data: aggregator,
workerSource: inData.workerSource,
< 10000 /td> delay: aggregator.fixedHeartbeatRate.value,
roundId,
_logger
})
Expand Down Expand Up @@ -160,6 +184,7 @@ function fixedHeartbeatJob(aggregatorJobQueueName: string, _logger: Logger) {
function randomHeartbeatJob(
heartbeatQueueName: string,
reporterQueueName: string,
aggregatorsWithAdapters: IAggregatorJob[],
_logger: Logger
) {
const logger = _logger.child({ name: 'randomHeartbeatJob', file: FILE_NAME })
Expand All @@ -172,11 +197,17 @@ function randomHeartbeatJob(
logger.debug(inData, 'inData')

const aggregatorAddress = inData.address
const aggregator = aggregatorsWithAdapters[aggregatorAddress]

if (!aggregator) {
throw new IcnError(IcnErrorCode.UndefinedAggregator)
}

try {
const outData = await prepareDataForReporter({
data: inData,
workerSource: 'random',
delay: aggregator.fixedHeartbeatRate.value,
_logger
})
logger.debug(outData, 'outData')
Expand Down Expand Up @@ -209,17 +240,23 @@ function randomHeartbeatJob(
* Fetch the latest data and prepare them to be sent to reporter.
*
* @param {IAggregatorHeartbeatWorker} data
* @return {Promise<IAggregatorWorkerReporter>}
* @param {string} workerSource
* @param {number} delay
* @param {number} roundId
* @param {Logger} _logger
* @return {Promise<IAggregatorJob}
* @exception {InvalidPriceFeed} raised from `fetchDataWithadapter`
*/
async function prepareDataForReporter({
data,
workerSource,
delay,
roundId,
_logger
}: {
data: IAggregatorJob
workerSource: string
delay: number
roundId?: number
_logger: Logger
}): Promise<IAggregatorWorkerReporter> {
Expand Down Expand Up @@ -254,6 +291,7 @@ async function prepareDataForReporter({
report,
callbackAddress,
workerSource,
delay,
submission,
roundId: roundId || oracleRoundState._roundId
}
Expand Down Expand Up @@ -298,3 +336,33 @@ function shouldReport(
function addReportProperty(o, report: boolean | undefined) {
return Object.assign({}, ...[o, { report }])
}

async function getSynchronizedDelay(
aggregatorAddress: string,
delay: number,
_logger: Logger
): Promise<number> {
// FIXME modify aggregator to use single contract call

let startedAt = 0
const { _startedAt, _roundId } = await oracleRoundStateCall({
aggregatorAddress,
operatorAddress: OPERATOR_ADDRESS
})

if (_startedAt.toNumber() != 0) {
startedAt = _startedAt.toNumber()
} else {
const { _startedAt } = await oracleRoundStateCall({
aggregatorAddress,
operatorAddress: OPERATOR_ADDRESS,
roundId: Math.max(0, _roundId - 1)
})
startedAt = _startedAt.toNumber()
}

_logger.debug({ startedAt }, 'synchronizedDelay')
const synchronizedDelay = delay - (startedAt % delay)
_logger.debug({ synchronizedDelay }, 'synchronizedDelay')
return synchronizedDelay
}
0