8000 [pull] main from comet-ml:main by pull[bot] · Pull Request #100 · AKJUS/opik · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[pull] main from comet-ml:main #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agre 8000 e to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/sdk-e2e-library-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ on:
- litellm
schedule:
- cron: "0 0 */2 * *"
pull_request:
pull_request_target:
paths:
- 'sdks/python/**'
push:
Expand All @@ -33,7 +33,6 @@ jobs:
run_e2e_lib_integration:
name: E2E Lib Integration Python ${{matrix.python_version}}
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
Expand All @@ -47,7 +46,9 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4.1.1

with:
ref: ${{ github.event.pull_request.base.ref }}

- name: Install uv and set the Python version ${{ matrix.python_version }}
uses: astral-sh/setup-uv@v5
with:
Expand Down
152 changes: 118 additions & 34 deletions sdks/python/src/opik/context_storage.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,161 @@
import contextvars
import contextlib

from typing import List, Optional, Generator
from typing import List, Optional, Generator, Tuple
from opik.api_objects import span, trace


class OpikContextStorage:
"""
Class responsible for keeping and providing access to the context of
spans and traces.
Manages span and trace context using Python's contextvars.

Read if you are going to change this class.
## IMPORTANT: Working with ContextVars

If you want to add or pop the span from the stack
you must NOT do it like that: spans_stack_context.get().append()
This class uses ContextVars to maintain isolated stacks across different
execution contexts (like threads or async tasks). To ensure proper isolation and safety,
this implementation uses immutable tuples for stack storage.

If you modify the default object in ContextVar (which is a MUTABLE list)
these changes will be reflected in any new context which breaks the whole idea
of ContextVar.
### DO use immutable data structures and create-new-set pattern:

You should append to it like that:
span_data_stack = span_data_stack_context.get().copy()
span_data_stack_context.set(span_data_stack + [new_element])
For adding elements:
```python
# Get current tuple and create a new one with added element
stack = spans_stack_context.get()
spans_stack_context.set(stack + (new_element,))
```

And pop like that:
span_data_stack = spans_stack_context.get().copy()
spans_stack_context.set(span_data_stack[:-1])
For removing elements:
```python
# Get current tuple and create a new one without the last element
stack = spans_stack_context.get()
spans_stack_context.set(stack[:-1])
```

The following functions provide an API to work with ContextVars this way
The methods in this class follow these patterns and provide a safe API
for manipulating the context stacks.
"""

def __init__(self) -> None:
self._current_trace_data_context: contextvars.ContextVar[
Optional[trace.TraceData]
] = contextvars.ContextVar("current_trace_data", default=None)
self._spans_data_stack_context: contextvars.ContextVar[List[span.SpanData]] = (
contextvars.ContextVar("spans_data_stack", default=[])
)
default_span_stack: Tuple[span.SpanData, ...] = tuple()
self._spans_data_stack_context: contextvars.ContextVar[
Tuple[span.SpanData, ...]
] = contextvars.ContextVar("spans_data_stack", default=default_span_stack)

def _has_span_id(self, span_id: str) -> bool:
return any(span.id == span_id for span in self._spans_data_stack_context.get())

def trim_span_data_stack_to_certain_span(self, span_id: str) -> None:
"""
If span with the given id exists in the stack, eliminates the spans from the stack
until the span with the given id is at the top.

Intended to be used in the modules that perform unsafe manipulations with the
span data stack (when there is a risk of missing the pop operation, e.g. in callback-based integrations).

When the id of the span that SHOULD be on top is known, we can trim
the stack to remove hanged spans if there are any.

Args:
span_id: The id of the span to trim the stack to.
Returns:
None
"""
if not self._has_span_id(span_id):
return

stack = self._spans_data_stack_context.get()
new_stack_list: List[span.SpanData] = []
for span_data in stack:
new_stack_list.append(span_data)
if span_data.id == span_id:
break

self._spans_data_stack_context.set(tuple(new_stack_list))

def top_span_data(self) -> Optional[span.SpanData]:
if self.span_data_stack_empty():
return None
stack = self._get_data_stack()
stack = self._spans_data_stack_context.get()
return stack[-1]

def pop_span_data(self) -> Optional[span.SpanData]:
def pop_span_data(
self,
ensure_id: Optional[str] = None,
) -> Optional[span.SpanData]:
"""
Pops the span from the stack.
Args:
ensure_id: If provided, it will pop the span only if it has the given id.
Intended to be used in the modules that perform unsafe manipulations with the
span data stack (when there is a risk of missing the add or pop operation,
e.g. in callback-based integrations), to make sure the correct span is popped.
Returns:
The span that was popped from the stack or None.
"""
if self.span_data_stack_empty():
return None
stack = self._get_data_stack()
self._spans_data_stack_context.set(stack[:-1])
return stack[-1]

if ensure_id is None:
stack = self._spans_data_stack_context.get()
self._spans_data_stack_context.set(stack[:-1])
return stack[-1]

if self.top_span_data().id == ensure_id: # type: ignore
return self.pop_span_data()

STACK_IS_EMPTY_OR_THE_ID_DOES_NOT_MATCH = None
return STACK_IS_EMPTY_OR_THE_ID_DOES_NOT_MATCH

def add_span_data(self, span: span.SpanData) -> None:
stack = self._get_data_stack()
self._spans_data_stack_context.set(stack + [span])
stack = self._spans_data_stack_context.get()
self._spans_data_stack_context.set(stack + (span,))

def span_data_stack_empty(self) -> bool:
return len(self._spans_data_stack_context.get()) == 0

def _get_data_stack(self) -> List[span.SpanData]:
return self._spans_data_stack_context.get().copy()
def span_data_stack_size(self) -> int:
return len(self._spans_data_stack_context.get())

def get_trace_data(self) -> Optional[trace.TraceData]:
trace = self._current_trace_data_context.get()
return trace
trace_data = self._current_trace_data_context.get()
return trace_data

def pop_trace_data(
self, ensure_id: Optional[str] = None
) -> Optional[trace.TraceData]:
"""
Pops the trace from the context.
Args:
ensure_id: If provided, it will pop the trace only if it has the given id.
Intended to be used in the modules that perform unsafe manipulations with the
trace data (when there is a risk of missing the set operation,
e.g. in callback-based integrations), to make sure the correct trace is popped.
Returns:
The trace that was popped from the context or None.
"""
trace_data = self._current_trace_data_context.get()

if trace_data is None:
return None

if ensure_id is not None and trace_data.id != ensure_id:
return None

def pop_trace_data(self) -> Optional[trace.TraceData]:
trace = self._current_trace_data_context.get()
self.set_trace_data(None)
return trace
return trace_data

def set_trace_data(self, trace: Optional[trace.TraceData]) -> None:
self._current_trace_data_context.set(trace)

def clear_spans(self) -> None:
self._spans_data_stack_context.set(tuple())

def clear_all(self) -> None:
self._current_trace_data_context.set(None)
self._spans_data_stack_context.set([])
self.clear_spans()


_context_storage = OpikContextStorage()
Expand All @@ -88,6 +168,10 @@ def clear_all(self) -> None:
pop_trace_data = _context_storage.pop_trace_data
set_trace_data = _context_storage.set_trace_data
clear_all = _context_storage.clear_all
span_data_stack_size = _context_storage.span_data_stack_size
trim_span_data_stack_to_certain_span = (
_context_storage.trim_span_data_stack_to_certain_span
)


def get_current_context_instance() -> OpikContextStorage:
Expand Down
Loading
Loading
0