开始使用
让我们开始使用吧。
使用fastapi-channels不需要你做什么过多的操作,只需通过简单的几步就可以快速开始使用,并且方便的定制你的频道
from fastapi import FastAPI, WebSocket
from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel
app = FastAPI()
add_channel(
app,
add_exception_handlers=True,
url="memory://",
limiter_url="redis://localhost:6379",
)
c = BaseChannel()
@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
return await c.connect(websocket, "default_channel")
就像上面这段代码这么简单,你就实现了一个简易的频道的搭建
接下来让我们仔细讲一讲这个代码吧😀
全局注册fastapi-channels
from fastapi import FastAPI, WebSocket
from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel
app = FastAPI()
add_channel(
app,
add_exception_handlers=True,
url="memory://",
limiter_url="redis://localhost:6379",
)
c = BaseChannel()
@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
return await c.connect(websocket, "default_channel")
- *add_exception_handlers 设置为True将为我封装的
WebSocketException
添加异常捕获的处理机制,如果你不想使用这个可以设置为False再进行自定义,详细的异常机制你可以在处理错误进行了解。 - *url 为fastapi-channels添加对应的broadcaster后端地址,也就是你用来进行消息订阅的后端应用程序的地址。
- *limiter_url 为fastapi-channels添加对应的fastapi-limiter地址,这个对应你用来做限流缓存的redis数据库。可能这里之后会进行扩展,但是现在请按照我们给你提供的方式来设置
这些只是简单的使用,实际上你还可以定义一些其他的内容来实现全局化设置:限流器、权限验证。
fastapi_channels/api.py
default_identifier,
http_default_callback,
ws_default_callback,
)
ParentT = TypeVar("ParentT", APIRouter, FastAPI)
DEFAULT_QUERY_TOKEN_KEY = "token"
DEFAULT_COOKIE_TOKEN_KEY = "token"
DEFAULT_PERMISSION_CLASSES = (AllowAny,)
class FastAPIChannel:
"""
为fastapi-channels全局注册类变量,在使用Channel的时候部分变量没有指定将会使用这个
"""
broadcast: Optional[Broadcast] = None
# throttle: Optional[Throttle] = None # 有没有必要被反复注册? or not 有没有必要全局使用
limiter_url: Optional[str] = None
_new_broadcast: bool = False
_new_limiter: bool = False
# authentication
query_token_key: Optional[str] = None
cookie_token_key: Optional[str] = None
自定义的范畴
你还可以自定义Broadcaster 和FastAPI-limiter, 就跟平常使用这些一样,把定义好的对象传入,或者直接对对应的库进行自主设置
main.py
from contextlib import asynccontextmanager
from typing import AsyncIterator
from broadcaster import Broadcast
from fastapi import FastAPI, WebSocket
from fastapi_limiter import FastAPILimiter
from redis.asyncio import Redis
from fastapi_channels import add_channel
from fastapi_channels.channels import BaseChannel
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator:
await FastAPILimiter.init(redis=await Redis.from_url("redis://localhost:6379"))
yield
await FastAPILimiter.close()
app = FastAPI(lifespan=lifespan)
add_channel(
app,
add_exception_handlers=True,
broadcast=Broadcast(url="memory://"),
limiter_url="redis://localhost:6379",
)
c = BaseChannel()
@app.websocket("/")
async def ws_endpoint(websocket: WebSocket):
return await c.connect(websocket, "default_channel")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
Note
FastAPILimiter的init方法是个类方法,所以我这里没有考虑传入。
而且不止这一点,如果你想自己在lifespan中来初始化FastAPILimiter,像是app(lifspan=lifspan)
是不可行的。
因为通过add_channel合并了fastapi-channel的生命周期函数和fastapi的生命周期函数,合并的结果是我的初始化步骤在fastapi的start_up
之前
我只是把他聚合到一个方法里,在你不想单独的一个个指定的时候,帮你完成全局注册和关闭程序的后续处理。
但是还是期望你能够使用add_channel来帮助你完成fastapi-channels的注册,同时这样与你的http接口中使用FastAPI-limiter的并不冲突
from fastapi import APIRouter, Depends
from fastapi_limiter.depends import RateLimiter
app = APIRouter()
@app.get("/index", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def index():
return {"msg": "Hello World"}