8000 feat(scheduling): add telemetry for scheduler tasks by basert · Pull Request #9721 · appwrite/appwrite · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(scheduling): add telemetry for scheduler tasks #9721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions app/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
use Utopia\Queue\Publisher;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Utopia\Telemetry\Adapter\None as NoTelemetry;

use function Swoole\Coroutine\run;

// Overwriting runtimes to be architecture agnostic for CLI
Config::setParam('runtimes', (new Runtimes('v4'))->getAll(supported: false));
Expand Down Expand Up @@ -200,7 +203,7 @@
};
}, ['pools', 'cache']);

CLI::setResource('queueForStatsUsage', function (Connection $publisher) {
CLI::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
CLI::setResource('queueForStatsResources', function (Publisher $publisher) {
Expand Down Expand Up @@ -264,6 +267,8 @@

CLI::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST')));

CLI::setResource('telemetry', fn () => new NoTelemetry());

$platform = new Appwrite();
$args = $platform->getEnv('argv');

Expand Down Expand Up @@ -293,4 +298,4 @@

$cli->shutdown()->action(fn () => Timer::clearAll());

$cli->run();
run($cli->run(...));
6 changes: 1 addition & 5 deletions src/Appwrite/Platform/Tasks/Migrate.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public function __construct()
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('register')
->callback(function ($version, $dbForPlatform, $getProjectDB, Registry $register) {
\Co\run(function () use ($version, $dbForPlatform, $getProjectDB, $register) {
$this->action($version, $dbForPlatform, $getProjectDB, $register);
});
});
->callback($this->action(...));
}

private function clearProjectsCache(Document $project)
Expand Down
185 changes: 91 additions & 94 deletions src/Appwrite/Platform/Tasks/ScheduleBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Appwrite\Platform\Tasks;

use Swoole\Runtime;
use Swoole\Timer;
use Utopia\CLI\Console;
use Utopia\Database\Database;
Expand All @@ -13,8 +14,9 @@
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\System\System;

use function Swoole\Coroutine\run;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Gauge;
use Utopia\Telemetry\Histogram;

abstract class ScheduleBase extends Action
{
Expand All @@ -23,6 +25,11 @@ abstract class ScheduleBase extends Action

protected array $schedules = [];

private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
private ?Gauge $scheduleTelemetryCount = null;
private ?Histogram $enqueueDelayTelemetry = null;

abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract public static function getCollectionId(): string;
Expand All @@ -37,7 +44,8 @@ public function __construct()
->inject('pools')
->inject('dbForPlatform')
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
->inject('telemetry')
->callback($this->action(...));
}

protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
Expand All @@ -56,26 +64,61 @@ protected function updateProjectAccess(Document $project, Database $dbForPlatfor
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Runtime::enableCoroutine();

Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');

$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');
$this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count');
$this->enqueueDelayTelemetry = $telemetry->createHistogram('task.schedule.enqueue_delay', 's');

// start with "0" to load all active documents.
$lastSyncUpdate = "0";
$this->collectSchedules($pools, $dbForPlatform, $getProjectDB, $lastSyncUpdate);

Console::success("Starting timers at " . DateTime::now());
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($pools, $dbForPlatform, $getProjectDB, &$lastSyncUpdate) {
$time = DateTime::now();
Console::log("Sync tick: Running at $time");
$this->collectSchedules($pools, $dbForPlatform, $getProjectDB, $lastSyncUpdate);
});

while (true) {
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
$this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]);
sleep(static::ENQUEUE_TIMER);
}
}

private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, ?string &$lastSyncUpdate): void
{
// If we haven't synced yet, load all active schedules
$initialLoad = $lastSyncUpdate === "0";

/**
* Extract only necessary attributes to lower memory used.
*
* @return array
* @throws Exception
* @var Document $schedule
*/
$getSchedule = function (Document $schedule) use ($dbForPlatform, $getProjectDB): array {
$getSchedule = function (Document $schedule) use ($pools, $dbForPlatform, $getProjectDB): array {
$project = $dbForPlatform->getDocument('projects', $schedule->getAttribute('projectId'));

$resource = $getProjectDB($project)->getDocument(
static::getCollectionId(),
$schedule->getAttribute('resourceId')
);

$pools->reclaim();

return [
'$internalId' => $schedule->getInternalId(),
'$id' => $schedule->getId(),
Expand All @@ -88,12 +131,12 @@ public function action(Group $pools, Database $dbForPlatform, callable $getProje
];
};

$lastSyncUpdate = DateTime::now();
$loadStart = microtime(true);
$time = DateTime::now();

$limit = 10_000;
$sum = $limit;
$total = 0;
$loadStart = \microtime(true);
$latestDocument = null;

while ($sum === $limit) {
Expand All @@ -110,105 +153,59 @@ public function action(Group $pools, Database $dbForPlatform, callable $getProje
$regions[] = 'default';
}

$results = $dbForPlatform->find('schedules', \array_merge($paginationQueries, [
$paginationQueries = [
...$paginationQueries,
Query::equal('region', $regions),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::equal('active', [true]),
]));

$sum = \count($results);
$total = $total + $sum;
];

foreach ($results as $document) {
try {
$this->schedules[$document->getInternalId()] = $getSchedule($document);
} catch (\Throwable $th) {
$collectionId = static::getCollectionId();
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
Console::error($th->getMessage());
}
if ($initialLoad) {
$paginationQueries[] = Query::equal('active', [true]);
} else {
$paginationQueries[] = Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate);
}

$latestDocument = \end($results);
}
$results = $dbForPlatform->find('schedules', $paginationQueries);

$pools->reclaim();

Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");

Console::success("Starting timers at " . DateTime::now());

run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);

$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;

Console::log("Sync tick: Running at $time");

while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];

if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$sum = count($results);
$total = $total + $sum;

// Temporarly accepting both 'fra' and 'default'
// When all migrated, only use _APP_REGION with 'default' as default value
$regions = [System::getEnv('_APP_REGION', 'default')];
if (!in_array('default', $regions)) {
$regions[] = 'default';
foreach ($results as $document) {
$localDocument = $this->schedules[$document->getInternalId()] ?? null;

if ($localDocument !== null) {
if (!$document['active']) {
Console::info("Removing: {$document['resourceType']}::{$document['resourceId']}");
unset($this->schedules[$document->getInternalId()]);
} elseif (strtotime($localDocument['resourceUpdatedAt']) !== strtotime($document['resourceUpdatedAt'])) {
Console::info("Updating: {$document['resourceType']}::{$document['resourceId']}");
$this->schedules[$document->getInternalId()] = $getSched 2E18 ule($document);
}

$results = $dbForPlatform->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', $regions),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));

$sum = count($results);
$total = $total + $sum;

foreach ($results as $document) {
$localDocument = $this->schedules[$document->getInternalId()] ?? null;

// Check if resource has been updated since last sync
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
$new = \strtotime($document['resourceUpdatedAt']);

if (!$document['active']) {
Console::info("Removing: {$document['resourceType']}::{$document['resourceId']}"); F438
unset($this->schedules[$document->getInternalId()]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceType']}::{$document['resourceId']}");
$this->schedules[$document->getInternalId()] = $getSchedule($document);
}
} else {
try {
$this->schedules[$document->getInternalId()] = $getSchedule($document);
} catch (\Throwable $th) {
$collectionId = static::getCollectionId();
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
Console::error($th->getMessage());
}

$latestDocument = \end($results);
}
}

$lastSyncUpdate = $time;
$timerEnd = \microtime(true);

$pools->reclaim();

Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
$latestDocument = \end($results);
}

Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)
);
$lastSyncUpdate = $time;
$duration = microtime(true) - $loadStart;
$this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
$this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
Console::success("{$total} resources were loaded in " . $duration . " seconds");
}

$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
});
protected function recordEnqueueDelay(string $expectedExecutionSchedule): void
{
$now = strtotime('now');
$scheduledAt = strtotime($expectedExecutionSchedule);
$this->enqueueDelayTelemetry->record($now - $scheduledAt, ['resourceType' => static::getSupportedResource()]);
}
}
2 changes: 2 additions & 0 deletions src/Appwrite/Platform/Tasks/ScheduleExecutions.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ protected function enqueueResources(Group $pools, Database $dbForPlatform, calla
->setProject($schedule['project'])
->setUserId($data['userId'] ?? '')
->trigger();

$this->recordEnqueueDelay($schedule['schedule']);
});

$dbForPlatform->deleteDocument(
Expand Down
2 changes: 2 additions & 0 deletions src/Appwrite/Platform/Tasks/ScheduleFunctions.php
CFCE
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ protected function enqueueResources(Group $pools, Database $dbForPlatform, calla
->setPath('/')
->setProject($schedule['project'])
->trigger();

$this->recordEnqueueDelay($schedule['schedule']);
}

$queue->reclaim();
Expand Down
2 changes: 1 addition & 1 deletion src/Appwrite/Platform/Tasks/ScheduleMessages.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected function enqueueResources(Group $pools, Database $dbForPlatform, calla
);

$queue->reclaim();

$this->recordEnqueueDelay($schedule['schedule']);
unset($this->schedules[$schedule['$internalId']]);
});
}
Expand Down
0