8000 Sync with WebSocket by tdroxler · Pull Request #616 · alephium/explorer-backend · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Sync with WebSocket #616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/src/main/scala/org/alephium/explorer/ExplorerState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import slick.jdbc.PostgresProfile

import org.alephium.explorer.cache.{BlockCache, MetricCache, TransactionCache}
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
import org.alephium.explorer.config.ExplorerConfig.Consensus
import org.alephium.explorer.persistence.Database
import org.alephium.explorer.service._
import org.alephium.explorer.util.Scheduler
Expand All @@ -46,6 +47,8 @@ sealed trait ExplorerState extends Service with StrictLogging {
lazy val database: Database =
new Database(config.bootMode)(executionContext, databaseConfig)

implicit lazy val consensus: Consensus = config.consensus

implicit lazy val blockCache: BlockCache =
BlockCache(
config.cacheRowCountReloadPeriod,
Expand Down
3 changes: 3 additions & 0 deletions app/src/main/scala/org/alephium/explorer/SyncServices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.alephium.api.model.{ChainParams, PeerAddress}
import org.alephium.explorer.RichAVector._
import org.alephium.explorer.cache.BlockCache
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
import org.alephium.explorer.config.ExplorerConfig.Consensus
import org.alephium.explorer.error.ExplorerError._
import org.alephium.explorer.service._
import org.alephium.explorer.util.Scheduler
Expand All @@ -46,6 +47,7 @@ object SyncServices extends StrictLogging {
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
consensus: Consensus,
blockCache: BlockCache,
groupSetting: GroupSetting
): Future[Unit] =
Expand Down Expand Up @@ -88,6 +90,7 @@ object SyncServices extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
blockCache: BlockCache,
consensus: Consensus,
groupSetting: GroupSetting
): Future[Unit] =
Future.fromTry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ object ExplorerError {
extends Exception(s"Cannot parse config file: $file", exception)
with ConfigError

final case class WebSocketError(cause: Throwable)
extends Exception(s"WebSocket error. $cause")
with ExplorerError

/** ****** Group: [[ConfigError]] *******
*/
final case class InvalidGroupNumber(groupNum: Int)
extends Exception(s"Invalid groupNum: $groupNum. It should be > 0")
with ConfigError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.alephium.explorer.service
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.immutable.ArraySeq
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.{Duration => ScalaDuration, FiniteDuration}
import scala.util.{Failure, Success}

Expand All @@ -31,6 +31,7 @@ import sttp.model.Uri
import org.alephium.explorer.{foldFutures, GroupSetting}
import org.alephium.explorer.api.model.Height
import org.alephium.explorer.cache.BlockCache
import org.alephium.explorer.config.ExplorerConfig.Consensus
import org.alephium.explorer.error.ExplorerError.BlocksInDifferentChains
import org.alephium.explorer.persistence.DBRunner._
import org.alephium.explorer.persistence.dao.BlockDao
Expand All @@ -53,6 +54,8 @@ import org.alephium.util.{Duration, TimeStamp}
* 5. For each last block of each chains, mark it as part of the main chain and travel
* down the parents recursively until we found back a parent that is part of the main chain.
* 6. During step 5, if a parent is missing, we download it and continue the procces at 5.
* 7. Once the blocks are up-to-date with the node, we switch to websocket syncing
* 8. If the websocket close or is late, in case of network issue, we go back to step 1.
*
* TODO: Step 5 is costly, but it's an easy way to handle reorg. In step 3 we know we receive the current main chain
* for that timerange, so in step 4 we could directly insert them as `mainChain = true`, but we need to sync
Expand All @@ -66,13 +69,15 @@ case object BlockFlowSyncService extends StrictLogging {
private val defaultStep = Duration.ofMinutesUnsafe(30L)
private val defaultBackStep = Duration.ofSecondsUnsafe(10L)
private val initialBackStep = Duration.ofMinutesUnsafe(30L)
private val upToDateDelta = Duration.ofSecondsUnsafe(30L)
// scalastyle:on magic.number

def start(nodeUris: ArraySeq[Uri], interval: FiniteDuration)(implicit
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
consensus: Consensus,
groupSetting: GroupSetting,
scheduler: Scheduler
): Future[Unit] =
Expand All @@ -87,14 +92,38 @@ case object BlockFlowSyncService extends StrictLogging {
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
consensus: Consensus,
cache: BlockCache,
groupSetting: GroupSetting
): Future[Unit] = {
if (initialBackStepDone.get()) {
syncOnceWith(nodeUris, defaultStep, defaultBackStep)
} else {
syncOnceWith(nodeUris, defaultStep, initialBackStep).map { _ =>
initialBackStepDone.set(true)
val syncResult =
if (initialBackStepDone.get()) {
syncOnceWith(nodeUris, defaultStep, defaultBackStep)
} else {
syncOnceWith(nodeUris, defaultStep, initialBackStep).map { result =>
initialBackStepDone.set(true)
result
}
}

syncResult.flatMap { isUpToDate =>
if (isUpToDate) {
logger.info("Blocks are up to date, switching to web socket syncing")
val stopPromise = Promise[Unit]()
// TODO Use config values
// scalastyle:off magic.number
WebSocketSyncService.sync(
stopPromise,
host = "127.0.0.1",
port = 22973,
flushInterval = Duration.ofMillisUnsafe(500)
)
// scalastyle:on magic.number
stopPromise.future.map { _ =>
logger.info("WebSocket syncing stopped, resuming http syncing&q 6D4E uot;)
}
} else {
Future.successful(())
}
}
}
Expand All @@ -106,7 +135,7 @@ case object BlockFlowSyncService extends StrictLogging {
blockFlowClient: BlockFlowClient,
cache: BlockCache,
groupSetting: GroupSetting
): Future[Unit] = {
): Future[Boolean] = {
getTimeStampRange(step, backStep)
.flatMap { ranges =>
Future.sequence {
Expand All @@ -116,12 +145,16 @@ case object BlockFlowSyncService extends StrictLogging {
s"Syncing from ${TimeUtil.toInstant(from)} to ${TimeUtil
.toInstant(to)} (${from.millis} - ${to.millis})"
)
syncTimeRange(from, to, uri)
syncTimeRange(from, to, uri).map { _ =>
(TimeStamp.now() -- to).map(_ < upToDateDelta).getOrElse(false)
}
}
}
}
}
.map(_ => ())
.map { upToDates =>
upToDates.flatten.contains(true)
}
}
// scalastyle:on magic.number

Expand Down Expand Up @@ -360,7 +393,7 @@ case object BlockFlowSyncService extends StrictLogging {
}
}

private def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit
def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
Expand Down
Loading
Loading
0