8000 feat: Add AsyncAPI HTTP Support by tmulligan98 · Pull Request #2142 · ag2ai/faststream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: Add AsyncAPI HTTP Support #2142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c84dd9a
Pass through param
tmulligan98 Mar 12, 2025
fc640b9
Remove comments
tmulligan98 Mar 18, 2025
582252f
Add basic route schema
tmulligan98 Mar 19, 2025
e102c9d
Insert routes
tmulligan98 Mar 19, 2025
cefb09c
Add schema test
tmulligan98 Mar 22, 2025
6efdfab
Lint
tmulligan98 Mar 22, 2025
51590d4
Tidy up typing
tmulligan98 Mar 23, 2025
57b535f
Remove comment
tmulligan98 Mar 23, 2025
1894c5b
Clean types
tmulligan98 Mar 27, 2025
4286719
Update docs
tmulligan98 Mar 27, 2025
b888da9
docs: generate API References
tmulligan98 Mar 31, 2025
b8f00e8
update overloads
tmulligan98 Apr 30, 2025
41c1573
docs: generate API References
tmulligan98 Apr 30, 2025
14e2e6f
clean up typing
tmulligan98 Apr 30, 2025
f7210e4
restore: release.md
tmulligan98 Apr 30, 2025
d75edd4
docs: generate API References
tmulligan98 Apr 30, 2025
8ef6867
Merge branch 'main' into asyncapi-http
Lancetnik May 1, 2025
8f75dcc
docs: generate API References
Lancetnik May 1, 2025
f480cba
refactor asgi handler
tmulligan98 May 3, 2025
25dfdfe
remove attr check
tmulligan98 May 3, 2025
d733ab9
add description class attr
tmulligan98 May 3, 2025
068807c
resolved test comment
tmulligan98 May 3, 2025
b8f3a77
Correct typing
tmulligan98 May 3, 2025
4c0fdff
docs: generate API References
tmulligan98 May 3, 2025
780ec50
Update handlers.py
Lancetnik May 5, 2025
d114fef
docs: generate API References
Lancetnik May 5, 2025
d9cd84d
Merge branch 'main' into asyncapi-http
Lancetnik May 20, 2025
8a4c520
refactor: polish impl
Lancetnik May 20, 2025
1deed4a
t erge branch 'asyncapi-http' of github.com:tmulligan98/faststream in…
Lancetnik May 20, 2025
3c31165
tests: mark test by aiokafka
Lancetnik May 20, 2025
5d761ac
docs: generate API References
Lancetnik May 20, 2025
036c617
Merge branch 'main' into asyncapi-http
Lancetnik May 20, 2025
51e4d86
Merge branch 'main' into asyncapi-http
Lancetnik May 22, 2025
18a4275
Delete tests/asyncapi/test_asgi.py
Lancetnik May 22, 2025
013ba39
chore: revert AsyncAPI changes
Lancetnik May 22, 2025
ce0f1ae
chore: revert docstring changes
Lancetnik May 22, 2025
3e414ca
docs: generate API References
Lancetnik May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/docs/SUMMARY.md
10000
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ search:
- [make_asyncapi_asgi](api/faststream/asgi/factories/make_asyncapi_asgi.md)
- [make_ping_asgi](api/faststream/asgi/factories/make_ping_asgi.md)
- handlers
- [GetHandler](api/faststream/asgi/handlers/GetHandler.md)
- [HttpHandler](api/faststream/asgi/handlers/HttpHandler.md)
- [get](api/faststream/asgi/handlers/get.md)
- response
- [AsgiResponse](api/faststream/asgi/response/AsgiResponse.md)
Expand All @@ -254,6 +256,7 @@ search:
- [AsyncAPIOperation](api/faststream/asyncapi/abc/AsyncAPIOperation.md)
- generate
- [get_app_schema](api/faststream/asyncapi/generate/get_app_schema.md)
- [get_asgi_routes](api/faststream/asyncapi/generate/get_asgi_routes.md)
- [get_broker_channels](api/faststream/asyncapi/generate/get_broker_channels.md)
- [get_broker_server](api/faststream/asyncapi/generate/get_broker_server.md)
- message
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/asgi/handlers/GetHandler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asgi.handlers.GetHandler
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/asgi/handlers/HttpHandler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asgi.handlers.HttpHandler
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/asyncapi/generate/get_asgi_routes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asyncapi.generate.get_asgi_routes
27 changes: 26 additions & 1 deletion docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,31 @@ app = AsgiFastStream(
You do not need to setup all routes using the `asgi_routes=[]` parameter.<br/>
You can use the `#!python app.mount("/health", asgi_endpoint)` method also.

### ASGI Documentation
By default, any ASGI routes will be added to your AsyncAPI documentation. If you wish to exclude these routes, just do the following:

```
app = AsgiFastStream(
broker,
asgi_routes=[
("/health", make_ping_asgi(broker, timeout=5.0, include_in_schema=False)),
]
)
```

Or, for custom ASGI routes:

```
@get(include_in_schema=False)
async def liveness_ping(scope):
return AsgiResponse(b"", status_code=200)

app = AsgiFastStream(
broker,
asgi_routes=[("/health", liveness_ping)]
)
```

### AsyncAPI Documentation

You can also host your **AsyncAPI** documentation in the same process, by running [`#!shell faststream docs serve ...`](./asyncapi/hosting.md){.internal-link}, in the same container and runtime.
Expand Down Expand Up @@ -158,7 +183,7 @@ app = FastStream(broker).as_asgi(

## Other ASGI Compatibility

Moreover, our wrappers can be used as ready-to-use endpoins for other **ASGI** frameworks. This can be very helpful When you are running **FastStream** in the same runtime as any other **ASGI** frameworks.
Moreover, our wrappers can be used as ready-to-use endpoints for other **ASGI** frameworks. This can be very helpful When you are running **FastStream** in the same runtime as any other **ASGI** frameworks.

Just follow the following example in such cases:

Expand Down
9 changes: 5 additions & 4 deletions faststream/asgi/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ def make_ping_asgi(
broker: "BrokerUsecase[Any, Any]",
/,
timeout: Optional[float] = None,
include_in_schema: bool = True,
) -> "ASGIApp":
healthy_response = AsgiResponse(b"", 204)
unhealthy_response = AsgiResponse(b"", 500)

@get
@get(include_in_schema=include_in_schema)
async def ping(scope: "Scope") -> AsgiResponse:
if await broker.ping(timeout):
return healthy_response
else:
return unhealthy_response
return unhealthy_response

return ping

Expand All @@ -50,10 +50,11 @@ def make_asyncapi_asgi(
title: str = "FastStream",
asyncapi_js_url: str = ASYNCAPI_JS_DEFAULT_URL,
asyncapi_css_url: str = ASYNCAPI_CSS_DEFAULT_URL,
include_in_schema: bool = True,
) -> "ASGIApp":
cached_docs = None

@get
@get(include_in_schema=include_in_schema)
async def docs(scope: "Scope") -> AsgiResponse:
nonlocal cached_docs
if not cached_docs:
Expand Down
90 changes: 69 additions & 21 deletions faststream/asgi/handlers.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,88 @@
from functools import wraps
from typing import (
TYPE_CHECKING,
Sequence,
)
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Union, overload

from faststream.asgi.response import AsgiResponse

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send, UserApp


def get(func: "UserApp") -> "ASGIApp":
methods = ("GET", "HEAD")
class HttpHandler:
def __init__(
self,
func: "UserApp",
*,
include_in_schema: bool = True,
description: Optional[str] = None,
methods: Optional[Sequence[str]] = None,
):
self.func = func
self.methods = methods or ()
self.include_in_schema = include_in_schema
self.description = description or func.__doc__

method_now_allowed_response = _get_method_not_allowed_response(methods)
error_response = AsgiResponse(body=b"Internal Server Error", status_code=500)

@wraps(func)
async def asgi_wrapper(
scope: "Scope",
receive: "Receive",
send: "Send",
) -> None:
if scope["method"] not in methods:
response: ASGIApp = method_now_allowed_response
async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
if scope["method"] not in self.methods:
response: ASGIApp = _get_method_not_allowed_response(self.methods)

else:
try:
response = await func(scope)
response = await self.func(scope)
except Exception:
response = error_response
response = AsgiResponse(body=b"Internal Server Error", status_code=500)

await response(scope, receive, send)
return

return asgi_wrapper

class GetHandler(HttpHandler):
def __init__(
self,
func: "UserApp",
*,
include_in_schema: bool = True,
description: Optional[str] = None,
):
super().__init__(
func,
include_in_schema=include_in_schema,
description=description,
methods=("GET", "HEAD"),
)


@overload
def get(
func: "UserApp",
*,
include_in_schema: bool = True,
description: Optional[str] = None,
) -> "ASGIApp": ...


@overload
def get(
func: None = None,
*,
include_in_schema: bool = True,
description: Optional[str] = None,
) -> Callable[["UserApp"], "ASGIApp"]: ...


def get(
func: Optional["UserApp"] = None,
*,
include_in_schema: bool = True,
description: Optional[str] = None,
) -> Union[Callable[["UserApp"], "ASGIApp"], "ASGIApp"]:
def decorator(inner_func: "UserApp") -> "ASGIApp":
return GetHandler(
inner_func, include_in_schema=include_in_schema, description=description
)

if func is None:
return decorator

return decorator(func)


def _get_method_not_allowed_response(methods: Sequence[str]) -> AsgiResponse:
Expand Down
27 changes: 27 additions & 0 deletions faststream/asyncapi/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def get_app_schema(app: "AsyncAPIApplication") -> Schema:
servers = get_broker_server(broker)
channels = get_broker_channels(broker)

# TODO: generate HTTP channels
# asgi_routes = get_asgi_routes(app)

messages: Dict[str, Message] = {}
payloads: Dict[str, Dict[str, Any]] = {}
for channel_name, ch in channels.items():
Expand All @@ -54,6 +57,7 @@ def get_app_schema(app: "AsyncAPIApplication") -> Schema:
payloads,
messages,
)

schema = Schema(
info=Info(
title=app.title,
Expand Down Expand Up @@ -136,6 +140,29 @@ def get_broker_channels(
return channels


def get_asgi_routes(
app: "AsyncAPIApplication",
) -> Any:
"""Get the ASGI routes for an application."""
# We should import this here due
# ASGI > Application > asynciapi.proto
# so it looks like a circular import
from faststream.asgi import AsgiFastStream
from faststream.asgi.handlers import HttpHandler

if not isinstance(app, AsgiFastStream):
return None

for route in app.routes:
path, asgi_app = route

if isinstance(asgi_app, HttpHandler) and asgi_app.include_in_schema:
# TODO: generate HTTP channel for handler
pass

return


def _resolve_msg_payloads(
m: Message,
channel_name: str,
Expand Down
1 change: 0 additions & 1 deletion faststream/asyncapi/schema/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class Schema(BaseModel):
to_jsonable() -> Any: Convert the schema to a JSON-serializable object.
to_json() -> str: Convert the schema to a JSON string.
to_yaml() -> str: Convert the schema to a YAML string.

"""

asyncapi: str = ASYNC_API_VERSION
Expand Down
1 change: 1 addition & 0 deletions tests/asyncapi/base/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async def test_fastapi_full_information(self):
contact={"name": "support", "url": "https://support.com"},
license_info={"name": "some", "url": "https://some.com"},
)

app.include_router(broker)

async with self.broker_wrapper(broker.broker):
Expand Down
Loading
0