-
Notifications
You must be signed in to change notification settings - Fork 700
OPIK-1765: Implement closing trace threads mechanism #2475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OPIK-1765: Implement closing trace threads mechanism #2475
Conversation
…hiagohora/OPIK-1765_make_threads_first_class_citizens
… into thiagohora/OPIK-1765_implement_closing_thread_mechanism
Backend Tests Results 174 files +2 174 suites +2 15m 46s ⏱️ - 1m 53s For more details on these errors, see this check. Results for commit 9ff73e4. ± Comparison against base commit 3e919f8. ♻️ This comment has been updated with latest results. |
SDK E2E Tests Results0 tests 0 ✅ 0s ⏱️ Results for commit 9bc640f. ♻️ This comment has been updated with latest results. |
fc8b90f
to
f18e7a4
Compare
f18e7a4
to
30bca98
Compare
cbf1d5c
to
7c60193
Compare
… into thiagohora/OPIK-1765_implement_closing_thread_mechanism
4b2a674
to
139797b
Compare
96d3f09
to
787b01c
Compare
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Outdated
Show resolved
Hide resolved
|
||
meter = GlobalOpenTelemetry.getMeter(metricNamespace); | ||
|
||
this.messageProcessingTime = meter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have the ability to add customer dimension to the metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's possible to add markers. However, not need so far
.build(); | ||
|
||
this.messageQueueDelay = meter | ||
.histogramBuilder("%s_%s_queue_delay".formatted(metricNamespace, metricsBaseName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have the ability to add customer dimension to the metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
@Override | ||
public void stop() { | ||
log.info("Shutting down '{}' and closing stream", getSubscriberName()); | ||
if (stream != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not big of an issue, but for readability i would switch the logic of - if(stream == null) return; to avoid nested ifs and to have more clean code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, there is no nested if; this is a fail-fast condition to avoid trying to initialize the streams if they are already started. The usual path is to initiate it if this didn't happen.
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java
Show resolved
Hide resolved
...ckend/src/main/java/com/comet/opik/api/resources/v1/events/ClosingTraceThreadSubscriber.java
Outdated
Show resolved
Hide resolved
var lock = new Lock("job", TraceThreadsClosingJob.class.getSimpleName()); | ||
var timeoutToMarkThreadAsInactive = traceThreadConfig | ||
.getTimeoutToMarkThreadAsInactive().toJavaDuration(); // This is the timeout to mark threads as inactive | ||
int limit = 1000; // Limit to a process in each job execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move it to be configureable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but it's not critical for a first iteration since all our batch operations so far have this max limit of 1000 items
log.info("Could not acquire lock for TraceThreadsClosingJob, skipping execution"); | ||
return null; | ||
}), | ||
Duration.ofSeconds(4), // Timeout to release the lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to the above one, we can make all timeouts configurable, but I'm not sure it's critical right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just want to avoid the need of releasing a new version just because of the need of changing this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, the time is already generous, the query should run a lot faster than this, but I can make it configurable in a following PR.
return null; | ||
}), | ||
Duration.ofSeconds(4), // Timeout to release the lock | ||
Duration.ofMillis(300)); // Timeout to acquiring the lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it configurable?
@NonNull Instant lastUpdatedUntil, int limit) { | ||
return asyncTemplate.stream(connection -> { | ||
var statement = connection.createStatement(FIND_PENDING_CLOSURE_THREADS_SQL) | ||
.bind("last_updated_at", lastUpdatedUntil.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
return asyncTemplate.stream(connection -> { | ||
var statement = connection.createStatement(FIND_PENDING_CLOSURE_THREADS_SQL) | ||
.bind("last_updated_at", lastUpdatedUntil.toString()) | ||
.bind("limit", limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
return asyncTemplate.nonTransaction(connection -> { | ||
var statement = connection.createStatement(CLOSURE_THREADS_SQL) | ||
.bind("project_id", projectId) | ||
.bind("last_updated_at", lastUpdatedUntil.toString()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
LOCK_DURATION).then(); | ||
} | ||
|
||
private Mono<Long> closeThreadWith(UUID projectId, Instant lastUpdatedUntil, ContextView contextView) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add any metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
@@ -69,6 +74,34 @@ private void setupDailyJob() { | |||
} | |||
} | |||
|
|||
// This method sets up a job that periodically checks for trace threads that need to be closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the other, we can add this, which would provide a more granular view of the data, but it's not critical since we already have the metrics in the subscriber. We can always continue improving our observability, but I would be careful not to overdo it in the first iterations.
.flatMap(locked -> expire(actionTimeout, locked, semaphore)) | ||
.flatMap(lockInstance -> runAction(lock, action, lockInstance)))) | ||
.onErrorResume(RedisException.class, | ||
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add metrics?
There was a problem hiding this comment.
Choose a reason for hiding this c 47C5 omment
The reason will be displayed to describe this comment to others. Learn more.
We can add later, but I would not recommend adding in this PR, as it would increase even further its scope
.onErrorResume(RedisException.class, | ||
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty())) | ||
.onErrorResume(IllegalStateException.class, | ||
e -> handleError(lock, failToAcquireLockAction, e).then(Mono.empty())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments
205cb6f
to
9ff73e4
Compare
Sorry, I just pushed a commit addressing some of your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thiagohora @YarivHashaiComet Let's track the copied and pasted code as tech debt with TODOs and in our backlog tickets. We should try to avoid too much copy and paste in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @YarivHashaiComet @thiagohora ,
The implementation of this BaseRedisSubscriber
is practically a copy and paste of the OnlineScoringBaseScorer
. Many methods are entirely copied.
Any plans to centralise this into a single implementation? If not, at least a tech debt items should be created in our backlog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I do, I couldn't reuse it entirely because the OnlineScoringBaseScorer
has some specific method but I believe we can refactor it to reuse the BaseRedisSubscriber. However, doing it here would increase the scope of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to acquire some tech debt if needed, but let's track it if possible both in the code and backlog. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added ticket OPIK-1925
import io.dropwizard.util.Duration; | ||
import org.redisson.client.codec.Codec; | ||
|
||
public interface StreamConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an already existing StreamConfiguration
class in OnlineScoringConfig
. It should have been extracted to a common class instead of copying and pasting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, not copying, but the OnlineScoringConfig has things specific to scoring, so that we couldn't reuse it. The idea of introducing the interface is to establish the foundation for a new implementation, and later, we can refactor the OnlineScoringBaseScorer to utilize it.
import java.util.concurrent.TimeUnit; | ||
|
||
@Data | ||
public class TraceThreadConfig implements StreamConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overlaps a lot with OnlineScoringConfig
. It should have been extracted to a common POJO and referenced, to prevent or reduce the amount of copy and paste.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
Details
Add a Job to run on a schedule-based (by default, 5 seconds) basis to retrieve all projects with pending "Trace Threads" to be closed. Once found, all projects are sent to the Redis queue to distribute the process. Then, the subscribers retrieve projects from the queue and process the closure of these threads.
Testing