8000 Replaced Queue w/ Pipe in SiL Module + Time Series in Broker by marvin-steinke · Pull Request #199 · dos-group/vessim · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Replaced Queue w/ Pipe in SiL Module + Time Series in Broker #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 18, 2024

Conversation

marvin-steinke
Copy link
Contributor
@marvin-steinke marvin-steinke commented Apr 9, 2024
  • Replaced Queue w/ Pipe
  • Broker now has a thread running that listens for incoming objects via recv(). Objects are removed after reading which solved the problem of the queue overflowing if the the api is not used frequently.
  • Broker now saves incoming data as time series and offers new getters for this
  • Removed actor from the pickeled representation of Microgrid. Users can decide for themselves what values they want to be able to access via Actor.state() which is sent to the broker instead. This should solve the problem of the size of the pickled Microgrid being to large.

@Impelon Is this what you had in mind for your use case?

@Impelon
Copy link
Contributor
Impelon commented Apr 9, 2024

Yes, this is almost what I had in mind :)
The only thing I miss is the ability to get the time series for a given start&end datetime.

Of course one could extract that manually from a copy of the dict in the API implementation, but I'm imagining this is quite a useful feature to have built-in.

i have code for doing this with the builtin bisect module, if the time series is stored as a list of (datetime, value)-tuples.
Alternatively one could use an external library for timeseries or rb-trees or something... though, I'd say using bisect is not that complicated.

The only thing is that insertion through bisect is not atomic (like dict insertion is), so one would need to also use Lock's to synchronize access to the time series.

I can post some example code using bisect a bit later!

@Impelon
Copy link
Contributor
Impelon commented Apr 10, 2024
Ok, so here is a suggestion using `bisect` adapted from the code I have already.
# Keep in mind I did not check the validity of the type hints (I think they are correct though), take this as a suggestion that should be further adapted.
T = typing.TypeVar("T") # T could be repaced with Any in the type annotations. This just allows for more robust type checks.

def insert_into_timeseries(series: list[tuple[DatetimeLike, T]], time: DatetimeLike, value: T) -> None:
    n = len(series)
    if n <= 0 or series[-1][0] <= time: # We can just append at the end in this case.
        idx = n
    else: # This else case (and thus this function) is not needed, if values are *always* inserted with monotonically increasing time, because the if statement will always be true then.
        idx = bisect.bisect(series, (time,))
    series.insert(idx, (time, value))

def get_range_from_timeseries(series: list[tuple[DatetimeLike, T]], start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, T]]:
    if start is None:
        start_idx = 0
    else:
        start_idx = bisect.bisect_left(series, (start_time,))
    if end is None:
        end_idx = None
    else:
        end_idx = bisect.bisect_right(series, (end_time,))
    return series[start_idx:end_idx]

# And then I'd suggest extending get_microgrid_ts & Co. like so:
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
    return get_range_from_timeseries(self._microgrid_ts, start_time, end_time)

Of course, using bisect to find an index and then operating on said index is not atomic at all and needs synchronization.
The easiest way would be to rely on the implicit atomicity of inserts and copy, and drop the more complicated insert_into_timeseries. Meaning:

def insert_into_timeseries(series, time, value):
    # This function is so trivial it does not to have its own name, I just wanted to illustrate the difference to before.
    series.append((time, value))

def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
    return get_range_from_timeseries(self._microgrid_ts.copy(), start_time, end_time)
    # The important difference here is, that get_range_from_timeseries operates on a copy, so there are no conflicts with the thread adding data. And the copy operation is atomic.

Though, I'd actually suggest using locks, because they are clearer, will work regardless of the python implementation (not relying on implicit atomicity in current CPython versions), and show that there is thread synchronization necessary.

from threading import Lock

class Broker:
    def __init__(self, data_pipe_out: Connection, events_pipe_in: Connection):
    # [...]
    self._ts_lock = Lock()

    def _recv_data(self) -> None:
        while True:
            time, data = self._data_pipe_out.recv()
            self._microgrid = data["microgrid"]
            self._actor_infos = data["actor_infos"]
            self._p_delta = data["p_delta"]
            with self._ts_lock:
                insert_into_timeseries(self._microgrid_ts, time, self._microgrid)
                insert_into_timeseries(self._actor_infos_ts, time, self._actor_infos)
                insert_into_timeseries(self._p_delta_ts, time, self._p_delta)

    def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
        with self._ts_lock:
            return get_range_from_timeseries(self._microgrid_ts, start_time, end_time)
# Or alternatively:
    def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
        with self._ts_lock:
            ts = self._microgrid_ts.copy()
        return get_range_from_timeseries(ts, start_time, end_time)

Copy link
Contributor
@kilianp14 kilianp14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you merge the storage_simulator branch into this so that we also have the e_delta? Also there are probably merge errors incoming :D

@marvin-steinke
Copy link
Contributor Author

Ok, so here is a suggestion using bisect adapted from the code I have already.

# Keep in mind I did not check the validity of the type hints (I think they are correct though), take this as a suggestion that should be further adapted.
T = typing.TypeVar("T") # T could be repaced with Any in the type annotations. This just allows for more robust type checks.

def insert_into_timeseries(series: list[tuple[DatetimeLike, T]], time: DatetimeLike, value: T) -> None:
    n = len(series)
    if n <= 0 or series[-1][0] <= time: # We can just append at the end in this case.
        idx = n
    else: # This else case (and thus this function) is not needed, if values are *always* inserted with monotonically increasing time, because the if statement will always be true then.
        idx = bisect.bisect(series, (time,))
    series.insert(idx, (time, value))

def get_range_from_timeseries(series: list[tuple[DatetimeLike, T]], start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, T]]:
    if start is None:
        start_idx = 0
    else:
        bisect.bisect_left(series, (start_time,))
    if end is None:
        end_idx = -1
    else:
        end_idx = bisect.bisect_right(series, (end_time,))
    return series[start_idx:end_idx]

# And then I'd suggest extending get_microgrid_ts & Co. like so:
def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
    return get_range_from_timeseries(self._microgrid_ts, start_time, end_time)

Of course, using bisect to find an index and then operating on said index is not atomic at all and needs synchronization. The easiest way would be to rely on the implicit atomicity of inserts and copy, and drop the more complicated insert_into_timeseries. Meaning:

def insert_into_timeseries(series, time, value):
    # This function is so trivial it does not to have its own name, I just wanted to illustrate the difference to before.
    series.append((time, value))

def get_m
8000
icrogrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
    return get_range_from_timeseries(self._microgrid_ts.copy(), start_time, end_time)
    # The important difference here is, that get_range_from_timeseries operates on a copy, so there are no conflicts with the thread adding data. And the copy operation is atomic.

Though, I'd actually suggest using locks, because they are clearer, will work regardless of the python implementation (not relying on implicit atomicity in current CPython versions), and show that there is thread synchronization necessary.

from threading import Lock

class Broker:
    def __init__(self, data_pipe_out: Connection, events_pipe_in: Connection):
    # [...]
    self._ts_lock = Lock()

    def _recv_data(self) -> None:
        while True:
            time, data = self._data_pipe_out.recv()
            self._microgrid = data["microgrid"]
            self._actor_infos = data["actor_infos"]
            self._p_delta = data["p_delta"]
            with self._ts_lock:
                insert_into_timeseries(self._microgrid_ts, time, self._microgrid)
                insert_into_timeseries(self._actor_infos_ts, time, self._actor_infos)
                insert_into_timeseries(self._p_delta_ts, time, self._p_delta)

    def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
        with self._ts_lock:
            return get_range_from_timeseries(self._microgrid_ts, start_time, end_time)
# Or alternatively:
    def get_microgrid_ts(self, start_time: Optional[DatetimeLike], end_time: Optional[DatetimeLike]) -> list[tuple[DatetimeLike, Microgrid]]:
        with self._ts_lock:
            ts = self._microgrid_ts.copy()
        return get_range_from_timeseries(ts, start_time, end_time)

What usecase do you have for manipulating the time series? It should simply grant access to past values, right?

@Impelon
Copy link
Contributor
Impelon commented Apr 15, 2024

@marvin-steinke

What usecase do you have for manipulating the time series?

Oh, no, I do not want to manipulate the time series, insert_into_timeseries was meant for internal use for the Broker, since it needs to insert values into the time series itself. Of course, in that case an append will be enough, because any entries to be added will always have a newer timestamp - I just wanted to show how the insert would work in general, if that assumption was not true. But you can replace insert_into_timeseries with append in the example code I provided above for sure!
I should have made that more clear, I apologize :)

It should simply grant access to past values, right?

Yes, exactly, that's what I would like. More specifically, access to a range of past values, (because past values at certain time instances are already easily accessible via the dict, but time-ranges would be more important for me.)

@marvin-steinke
Copy link
Contributor Author

@marvin-steinke

What usecase do you have for manipulating the time series?

Oh, no, I do not want to manipulate the time series, insert_into_timeseries was meant for internal use for the Broker, since it needs to insert values into the time series itself. Of course, in that case an append will be enough, because any entries to be added will always have a newer timestamp - I just wanted to show how the insert would work in general, if that assumption was not true. But you can replace insert_into_timeseries with append in the example code I provided above for sure! I should have made that more clear, I apologize :)

It should simply grant access to past values, right?

Yes, exactly, that's what I would like. More specifically, access to a range of past values, (because past values at certain time instances are already easily accessible via the dict, but time-ranges would be more important for me.)

Aight, let's go w/ a simple append then :) A lock is still a good idea though, let me quickly implement that! Thanks for your input :)

@marvin-steinke marvin-steinke requested a review from Impelon April 15, 2024 14:28
marvin-steinke and others added 2 commits April 15, 2024 18:23
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
@marvin-steinke marvin-steinke force-pushed the sil_pipe branch 2 times, most recently from b96ffe6 to e8d36d4 Compare April 15, 2024 16:57
Copy link
Contributor
@Impelon Impelon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise I'm happy now :) Thanks for taking my feedback into account!

Co-authored-by: Ovi T. <ovi.tatar@googlemail.com>
@marvin-steinke marvin-steinke marked this pull request as ready for review April 16, 2024 07:24
@birnbaum birnbaum changed the base branch from main to storage_simulator April 18, 2024 12:11
Base automatically changed from storage_simulator to main April 18, 2024 13:06
@marvin-steinke marvin-steinke merged commit c5bfaff into main Apr 18, 2024
@marvin-steinke marvin-steinke deleted the sil_pipe branch April 18, 2024 13:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0