-
Notifications
You must be signed in to change notification settings - Fork 6
Replaced Queue w/ Pipe in SiL Module + Time Series in Broker #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Yes, this is almost what I had in mind :) Of course one could extract that manually from a copy of the dict in the API implementation, but I'm imagining this is quite a useful feature to have built-in. i have code for doing this with the builtin bisect module, if the time series is stored as a list of (datetime, value)-tuples. The only thing is that insertion through bisect is not atomic (like dict insertion is), so one would need to also use Lock's to synchronize access to the time series. I can post some example code using bisect a bit later! |
Ok, so here is a suggestion using `bisect` adapted from the code I have already.# Keep in mind I did not check the validity of the type hints (I think they are correct though), take this as a suggestion that should be further adapted.
T = typing.TypeVar("T") # T could be repaced with Any in the type annotations. This just allows for more robust type checks.
def insert_into_timeseries(series: list[tuple[DatetimeLike, T]], time: DatetimeLike, value: T) -> None:
n = len(series)
if n <= 0 or series[-1][0] <= time: # We can just append at the end in this case.
idx = n
else: # This else case (and thus this function) is not needed, if values are *always* inserted with monotonically increasing time, because the if statement will always be true then.
idx = bisect.bisect(series, (time,))
series.insert(idx, (time, value))
def get_range_from_timeseries(series: list[tuple[DatetimeLike, T]], start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, T]]:
if start is None:
start_idx = 0
else:
start_idx = bisect.bisect_left(series, (start_time,))
if end is None:
end_idx = None
else:
end_idx = bisect.bisect_right(series, (end_time,))
return series[start_idx:end_idx]
# And then I'd suggest extending get_microgrid_ts & Co. like so:
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
return get_range_from_timeseries(self._microgrid_ts, start_time, end_time) Of course, using bisect to find an index and then operating on said index is not atomic at all and needs synchronization. def insert_into_timeseries(series, time, value):
# This function is so trivial it does not to have its own name, I just wanted to illustrate the difference to before.
series.append((time, value))
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
return get_range_from_timeseries(self._microgrid_ts.copy(), start_time, end_time)
# The important difference here is, that get_range_from_timeseries operates on a copy, so there are no conflicts with the thread adding data. And the copy operation is atomic. Though, I'd actually suggest using locks, because they are clearer, will work regardless of the python implementation (not relying on implicit atomicity in current CPython versions), and show that there is thread synchronization necessary. from threading import Lock
class Broker:
def __init__(self, data_pipe_out: Connection, events_pipe_in: Connection):
# [...]
self._ts_lock = Lock()
def _recv_data(self) -> None:
while True:
time, data = self._data_pipe_out.recv()
self._microgrid = data["microgrid"]
self._actor_infos = data["actor_infos"]
self._p_delta = data["p_delta"]
with self._ts_lock:
insert_into_timeseries(self._microgrid_ts, time, self._microgrid)
insert_into_timeseries(self._actor_infos_ts, time, self._actor_infos)
insert_into_timeseries(self._p_delta_ts, time, self._p_delta)
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
with self._ts_lock:
return get_range_from_timeseries(self._microgrid_ts, start_time, end_time)
# Or alternatively:
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
with self._ts_lock:
ts = self._microgrid_ts.copy()
return get_range_from_timeseries(ts, start_time, end_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you merge the storage_simulator branch into this so that we also have the e_delta? Also there are probably merge errors incoming :D
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
What usecase do you have for manipulating the time series? It should simply grant access to past values, right? |
Oh, no, I do not want to manipulate the time series,
Yes, exactly, that's what I would like. More specifically, access to a range of past values, (because past values at certain time instances are already easily accessible via the dict, but time-ranges would be more important for me.) |
Aight, let's go w/ a simple append then :) A lock is still a good idea though, let me quickly implement that! Thanks for your input :) |
360b97b
to
b60cb03
Compare
b60cb03
to
e21b89e
Compare
7ccd58c
to
0b4b4bb
Compare
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
b96ffe6
to
e8d36d4
Compare
e8d36d4
to
e4eedab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise I'm happy now :) Thanks for taking my feedback into account!
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
@Impelon Is this what you had in mind for your use case?