django如何启用websocket,并使用动态路由参数


一、前言

小落同学原先没有做多用户,想改版一下可以支持多用户,所以需要改造原来的websocket API接口
目标示例的请求如下:
ws://x.rg4.net/ws/b7e14e7c1a1d42019d866166c81119d1?llm=chatgpt&character_id=2&chatmode=text&token=

OK,那让我们来试试。

二、让Django支持websocket

在 Django 中启用 WebSocket 通常需要借助第三方库,因为 Django 自身的框架并不直接支持WebSocket。
一个流行的选择是使用 Django Channels,这是一个为 Django 提供异步能力的扩展,包括 WebSocket 支持。

以下是如何在 Django 项目中启用 WebSocket 的基本步骤:

1. 安装 Django Channels

pip install channels

2. 更新 Django 设置

在你的 Django 项目的 settings.py 文件中,你需要添加 Channels 到你的 INSTALLED_APPS 列表中,并设置

ASGI_APPLICATION。

INSTALLED_APPS = [
    ...
    'channels',
    ...
]

ASGI_APPLICATION = 'xiaoluo.asgi.application'

注意:’xiaoluo.asgi.application’ 应该替换为你的 Django 项目的实际名称和 ASGI 应用程序的路径。

3. 创建 ASGI 应用程序:

在你的项目根目录下(与 settings.py 同一目录),你应该有一个名为 asgi.py 的文件(如果没有,你需要创建一个)。这个文件将配置你的 ASGI 应用程序。

import os  
from django.core.asgi import get_asgi_application  
from channels.routing import ProtocolTypeRouter, URLRouter  
from channels.auth import AuthMiddlewareStack  

from omserver.mainservice.messages.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xiaoluo.settings')

# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": AllowedHostsOriginValidator(
            AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
        ),
})

注意:websocket_urlpatterns以及xiaoluo.settings 应该指向你定义的 WebSocket URL 路由。

4. 定义 WebSocket 路由

在你的应用中(通常是 routing.py 文件,如果没有,你需要创建一个),你需要定义 WebSocket 的 URL 路由。

# omserver.mainservice.messages/routing.py
from django.urls import re_path, path

from . import consumers

websocket_urlpatterns = [
    # TODO 检验
    re_path(r"ws/$", consumers.ChatConsumer.as_asgi()),
    path('websocket/<str:session>', consumers.ChatConsumer.as_asgi())
]

5. 创建 WebSocket Consumer

WebSocket Consumer 是一个处理 WebSocket 连接的类。你需要在你的应用中创建一个 consumers.py 文件(如果没有),并定义一个 Consumer 类,如:ChatConsumer。

import json
import logging
from urllib.parse import parse_qs
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync

from ...mainservice import singleton_main_service

chat_channel = "chat_channel"
logger = logging.getLogger(__name__)

class ChatConsumer(WebsocketConsumer):
    session_id: str = None
    llm: str = "ernie"
    character_id: int = 1
    chatmode: str = "text"
    language: str = "CN"
    platform: str = "web"
    user_id: int = 0
    kb_id: int = 0

    def connect(self):
        # 解析传过来的各参数
        self.session_id = self.scope["url_route"]["kwargs"]["session"]
        # 获取原始查询字符串  
        query_string = self.scope["query_string"].decode("utf-8")  
        # 解析查询字符串为字典  
        query_params = parse_qs(query_string)  
        self.llm = query_params.get("llm", [None])[0]  
        self.character_id = query_params.get("character_id", [None])[0]  
        self.chatmode = query_params.get("chatmode", [None])[0]
        self.language = query_params.get("language", None)
        self.platform = query_params.get("platform", None)

        logger.info(f'=> ws connect group : {chat_channel}, session={self.session_id}, llm={self.llm}, character_id={self.character_id}, chatmode={self.chatmode}, language={self.language}, platform={self.platform}')

        self.accept()

        # 从老的前端过来的连接
        # if self.session_id == "":
        if 1:
            # 将连接的客户端添加到特定频道
            async_to_sync(self.channel_layer.group_add)(
                chat_channel,  # 设置频道名称
                self.channel_name
            )
            logger.info(f'=> ws connect group : {chat_channel}, session={self.session_id}')

    def disconnect(self, close_code):
        # 在客户端断开连接时从频道中移除
        async_to_sync(self.channel_layer.group_discard)(
            chat_channel,  # 设置频道名称
            self.channel_name
        )

    def receive(self, text_data):
        # Receive message from room group
        if self.session_id == "":
            pass
        else:
            logger.info(f"=> run receive:{text_data}, session={self.session_id}, llm={self.llm}, language={self.language}, character={self.character_id}, chatmode={self.chatmode}")
            # 发送响应给客户端
            # response = 'Hello, you have connected with session ID: ' + self.session_id + '[end]'
            # self.send(response)

            singleton_main_service.chat(role_id=self.character_id, your_name='', query=text_data, llmtype=self.llm, kb_id=self.kb_id, user_id=self.user_id)

    def chat_message(self, event):
        # Receive message from room group
        message = event["message"]
        logger.info(f"=> run chat_message :{message}")
        text_data = json.dumps({"message": message})
        self.send(text_data=text_data)

在小落同学里测试通过。

Leave a comment

Your email address will not be published. Required fields are marked *