8000 OPIK-1765: Implement closing trace threads mechanism by thiagohora · Pull Request #2475 · comet-ml/opik · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

OPIK-1765: Implement closing trace threads mechanism #2475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

thiagohora
Copy link
Contributor
@thiagohora thiagohora commented Jun 13, 2025

Details

Add a Job to run on a schedule-based (by default, 5 seconds) basis to retrieve all projects with pending "Trace Threads" to be closed. Once found, all projects are sent to the Redis queue to distribute the process. Then, the subscribers retrieve projects from the queue and process the closure of these threads.

Testing

  • We added tests simulating the closure of threads.

Copy link
Contributor
github-actions bot commented Jun 13, 2025

Backend Tests Results

  174 files  +2    174 suites  +2   15m 46s ⏱️ - 1m 53s
3 825 tests +3  3 821 ✅ +2  3 💤 ±0  0 ❌ ±0  1 🔥 +1 
3 823 runs  +1  3 820 ✅ +1  3 💤 ±0  0 ❌ ±0 

For more details on these errors, see this check.

Results for commit 9ff73e4. ± Comparison against base commit 3e919f8.

♻️ This comment has been updated with latest results.

Copy link
Contributor
github-actions bot commented Jun 13, 2025

SDK E2E Tests Results

0 tests   0 ✅  0s ⏱️
0 suites  0 💤
0 files    0 ❌

Results for commit 9bc640f.

♻️ This comment has been updated with latest results.

@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch 2 times, most recently from fc8b90f to f18e7a4 Compare June 14, 2025 12:25
@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch from f18e7a4 to 30bca98 Compare June 14, 2025 12:34
@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch from cbf1d5c to 7c60193 Compare June 15, 2025 15:21
… into thiagohora/OPIK-1765_implement_closing_thread_mechanism
@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch from 4b2a674 to 139797b Compare June 16, 2025 14:29
Base automatically changed from thiagohora/OPIK-1765_make_threads_first_class_citizens to main June 18, 2025 10:13
@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch from 96d3f09 to 787b01c Compare June 18, 2025 10:25
@thiagohora thiagohora requested a review from Copilot June 18, 2025 10:26
@thiagohora thiagohora marked this pull request as ready for review June 18, 2025 10:26

meter = GlobalOpenTelemetry.getMeter(metricNamespace);

this.messageProcessingTime = meter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have the ability to add customer dimension to the metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's possible to add markers. However, not need so far

.build();

this.messageQueueDelay = meter
.histogramBuilder("%s_%s_queue_delay".formatted(metricNamespace, metricsBaseName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have the ability to add customer dimension to the metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@Override
public void stop() {
log.info("Shutting down '{}' and closing stream", getSubscriberName());
if (stream != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not big of an issue, but for readability i would switch the logic of - if(stream == null) return; to avoid nested ifs and to have more clean code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, there is no nested if; this is a fail-fast condition to avoid trying to initialize the streams if they are already started. The usual path is to initiate it if this didn't happen.

var lock = new Lock("job", TraceThreadsClosingJob.class.getSimpleName());
var timeoutToMarkThreadAsInactive = traceThreadConfig
.getTimeoutToMarkThreadAsInactive().toJavaDuration(); // This is the timeout to mark threads as inactive
int limit = 1000; // Limit to a process in each job execution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move it to be configureable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but it's not critical for a first iteration since all our batch operations so far have this max limit of 1000 items

log.info("Could not acquire lock for TraceThreadsClosingJob, skipping execution");
return null;
}),
Duration.ofSeconds(4), // Timeout to release the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the above one, we can make all timeouts configurable, but I'm not sure it's critical right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just want to avoid the need of releasing a new version just because of the need of changing this value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the time is already generous, the query should run a lot faster than this, but I can make it configurable in a following PR.

return null;
}),
Duration.ofSeconds(4), // Timeout to release the lock
Duration.ofMillis(300)); // Timeout to acquiring the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it configurable?

@NonNull Instant lastUpdatedUntil, int limit) {
return asyncTemplate.stream(connection -> {
var statement = connection.createStatement(FIND_PENDING_CLOSURE_THREADS_SQL)
.bind("last_updated_at", lastUpdatedUntil.toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

return asyncTemplate.stream(connection -> {
var statement = connection.createStatement(FIND_PENDING_CLOSURE_THREADS_SQL)
.bind("last_updated_at", lastUpdatedUntil.toString())
.bind("limit", limit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

Comment on lines +228 to +232
return asyncTemplate.nonTransaction(connection -> {
var statement = connection.createStatement(CLOSURE_THREADS_SQL)
.bind("project_id", projectId)
.bind("last_updated_at", lastUpdatedUntil.toString());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

LOCK_DURATION).then();
}

private Mono<Long> closeThreadWith(UUID projectId, Instant lastUpdatedUntil, ContextView contextView) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add any metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@@ -69,6 +74,34 @@ private void setupDailyJob() {
}
}

// This method sets up a job that periodically checks for trace threads that need to be closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other, we can add this, which would provide a more granular view of the data, but it's not critical since we already have the metrics in the subscriber. We can always continue improving our observability, but I would be careful not to overdo it in the first iterations.

.flatMap(locked -> expire(actionTimeout, locked, semaphore))
.flatMap(lockInstance -> runAction(lock, action, lockInstance))))
.onErrorResume(RedisException.class,
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add metrics?

Copy link
Contributor Author

Choose a reason for hiding this c 47C5 omment

The reason will be displayed to describe this comment to others. Learn more.

We can add later, but I would not recommend adding in this PR, as it would increase even further its scope

.onErrorResume(RedisException.class,
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty()))
.onErrorResume(IllegalStateException.class,
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor
@YarivHashaiComet YarivHashaiComet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments

@thiagohora thiagohora force-pushed the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch from 205cb6f to 9ff73e4 Compare June 19, 2025 13:53
@thiagohora
Copy link
Contributor Author

Hi @YarivHashaiComet

Sorry, I just pushed a commit addressing some of your comments

@thiagohora thiagohora merged commit 2082b3e into main Jun 19, 2025
12 checks passed
@thiagohora thiagohora deleted the thiagohora/OPIK-1765_implement_closing_thread_mechanism branch June 19, 2025 14:31
Copy link
Collaborator
@andrescrz andrescrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thiagohora @YarivHashaiComet Let's track the copied and pasted code as tech debt with TODOs and in our backlog tickets. We should try to avoid too much copy and paste in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @YarivHashaiComet @thiagohora ,

The implementation of this BaseRedisSubscriber is practically a copy and paste of the OnlineScoringBaseScorer. Many methods are entirely copied.

Any plans to centralise this into a single implementation? If not, at least a tech debt items should be created in our backlog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do, I couldn't reuse it entirely because the OnlineScoringBaseScorer has some specific method but I believe we can refactor it to reuse the BaseRedisSubscriber. However, doing it here would increase the scope of the PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to acquire some tech debt if needed, but let's track it if possible both in the code and backlog. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added ticket OPIK-1925

import io.dropwizard.util.Duration;
import org.redisson.client.codec.Codec;

public interface StreamConfiguration {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an already existing StreamConfiguration class in OnlineScoringConfig. It should have been extracted to a common class instead of copying and pasting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, not copying, but the OnlineScoringConfig has things specific to scoring, so that we couldn't reuse it. The idea of introducing the interface is to establish the foundation for a new implementation, and later, we can refactor the OnlineScoringBaseScorer to utilize it.

import java.util.concurrent.TimeUnit;

@Data
public class TraceThreadConfig implements StreamConfiguration {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overlaps a lot with OnlineScoringConfig. It should have been extracted to a common POJO and referenced, to prevent or reduce the amount of copy and paste.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0