跳转至

开始使用

警告

當前頁面任然沒有此語言的翻譯。 但是你可以幫忙翻譯一下: 貢獻.

让我们开始使用吧。

使用fastapi-channels不需要你做什么过多的操作,只需通过简单的几步就可以快速开始使用,并且方便的定制你的频道

from fastapi import FastAPI, WebSocket

from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel

app = FastAPI()
add_channel(
    app,
    add_exception_handlers=True,
    url="memory://",
    limiter_url="redis://localhost:6379",
)
c = BaseChannel()


@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
    return await c.connect(websocket, "default_channel")

就像上面这段代码这么简单,你就实现了一个简易的频道的搭建

接下来让我们仔细讲一讲这个代码吧😀

全局注册fastapi-channels

from fastapi import FastAPI, WebSocket

from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel

app = FastAPI()
add_channel(
    app,
    add_exception_handlers=True,
    url="memory://",
    limiter_url="redis://localhost:6379",
)
c = BaseChannel()


@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
    return await c.connect(websocket, "default_channel")
  • *add_exception_handlers 设置为True将为我封装的WebSocketException添加异常捕获的处理机制,如果你不想使用这个可以设置为False再进行自定义,详细的异常机制你可以在处理错误进行了解。
  • *url 为fastapi-channels添加对应的broadcaster后端地址,也就是你用来进行消息订阅的后端应用程序的地址。
  • *limiter_url 为fastapi-channels添加对应的fastapi-limiter地址,这个对应你用来做限流缓存的redis数据库。可能这里之后会进行扩展,但是现在请按照我们给你提供的方式来设置

这些只是简单的使用,实际上你还可以定义一些其他的内容来实现全局化设置:限流器、权限验证。

fastapi_channels/api.py
    default_identifier,
    http_default_callback,
    ws_default_callback,
)

ParentT = TypeVar("ParentT", APIRouter, FastAPI)

DEFAULT_QUERY_TOKEN_KEY = "token"
DEFAULT_COOKIE_TOKEN_KEY = "token"
DEFAULT_PERMISSION_CLASSES = (AllowAny,)


class FastAPIChannel:
    """
    为fastapi-channels全局注册类变量,在使用Channel的时候部分变量没有指定将会使用这个
    """

    broadcast: Optional[Broadcast] = None
    # throttle: Optional[Throttle] = None # 有没有必要被反复注册? or not 有没有必要全局使用
    limiter_url: Optional[str] = None
    _new_broadcast: bool = False
    _new_limiter: bool = False
    # authentication
    query_token_key: Optional[str] = None
    cookie_token_key: Optional[str] = None

自定义的范畴

你还可以自定义BroadcasterFastAPI-limiter, 就跟平常使用这些一样,把定义好的对象传入,或者直接对对应的库进行自主设置

main.py
from contextlib import asynccontextmanager
from typing import AsyncIterator

from broadcaster import Broadcast
from fastapi import FastAPI, WebSocket
from fastapi_limiter import FastAPILimiter
from redis.asyncio import Redis

from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator:
    await FastAPILimiter.init(redis=await Redis.from_url("redis://localhost:6379"))
    yield
    await FastAPILimiter.close()


app = FastAPI(lifespan=lifespan)

add_channel(
    app,
    add_exception_handlers=True,
    broadcast=Broadcast(url="memory://"),
    limiter_url="redis://localhost:6379",
)
c = BaseChannel()


@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
    return await c.connect(websocket, "default_channel")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)

Note

FastAPILimiter的init方法是个类方法,所以我这里没有考虑传入。

而且不止这一点,如果你想自己在lifespan中来初始化FastAPILimiter,像是app(lifspan=lifspan)是不可行的。

因为通过add_channel合并了fastapi-channel的生命周期函数和fastapi的生命周期函数,合并的结果是我的初始化步骤在fastapi的start_up之前

我只是把他聚合到一个方法里,在你不想单独的一个个指定的时候,帮你完成全局注册和关闭程序的后续处理。

但是还是期望你能够使用add_channel来帮助你完成fastapi-channels的注册,同时这样与你的http接口中使用FastAPI-limiter的并不冲突

from fastapi import APIRouter, Depends
from fastapi_limiter.depends import RateLimiter

app = APIRouter()


@app.get("/index", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def index():
    return {"msg": "Hello World"}

评论