-
Notifications
You must be signed in to change notification settings - Fork 547
[Async] Implement asynchronous event processing in python runtime #3461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# logger isn't available yet | ||
print('Failed to connect to ' + socket_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I know it's unrelated to your changes) I think that the logger is available, according to https://github.com/nuclio/nuclio/pull/3461/files#diff-725534bace834296150d3e0c0525fd5ef3ec7b57ddaa193e5cc98ed5d062e2eaR180-R182
# make a writeable file from processor | ||
self._control_sock_wfile = self._control_sock.makefile('w') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is not really needed (never was)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor actually, i'm not sure here. Why do you think it's not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not used anywhere.
Unlike the event socket wfile, which is used here for logging.
@@ -56,6 +57,7 @@ type Configuration struct { | |||
TriggerKind string | |||
WorkerTerminationTimeout time.Duration | |||
ControlMessageBroker *controlcommunication.AbstractControlMessageBroker | |||
Mode functionconfig.TriggerWorkMode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed both here and in the trigger configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor user configures on trigger level and then we just propagate it to all workers of given trigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very well, Id add more doc comments to critical path to explain the story. mainly on _nuclio*_wrapper.py
🚀
|
||
// Use retryable dial for the first connection | ||
if i == 0 { | ||
conn, err = retryableDial(ca.serverAddress, 30, 1*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when does it try to start? after the runtime is started? in parallel? perhaps it might take some time to the runtime to start (e.g loading large model)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liranbg that's a good point. We indeed want to be able to wait much more. What I'm thinking about is that we can avoid having a timeout. Instead, we can use wrapperInitialized
message from control message socket. Not for this PR as there are too many changes already. What do you think about this idea @TomerShor?
except KeyboardInterrupt: | ||
self._logger.info("Shutting down server") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for tests right?
Termination signal will not actually break the while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job 👏
Quick question here what is a potential use case for an async handler? Why would one want to add one? |
@dberardo-com async handlers are very useful if you have something to await and it takes quite a time. For instance, your handler request external api. |
@dberardo-com this essentially enables your function to process multiple events asynchronously, instead of processing them one after the other. |
Alright, is this valid also within mqtt triggered functions? Can they be processed in parallel? |
@dberardo-com currently async mode is only available for http trigger. |
Within this PR:
sync
(default) orasync
Jira: