This documentation provides an overview of the implementation of a WebSocket server for the trip planner pipeline using SuperagentX. The server processes incoming WebSocket requests, authenticates users via query parameters, and handles real-time data processing through the pipeline.

Overview

The code sets up a WebSocket server using the WSPipe class from SuperagentX. It integrates a custom authentication mechanism to validate client requests based on a token passed in the query parameters.

Setup Environment

$ pip install superagentx

Implementation

Import the dependencies.


import asyncio
import http
import urllib.parse

from rich import print as rprint
from superagentx.pipeimpl.wspipe import WSPipe  # https://websockets.readthedocs.io/en/stable/

from trip_planner.config import AUTH_TOKEN
from trip_planner.pipe import get_trip_planner_pipe

The query_param_auth function authenticates users based on a token included in the WebSocket request’s query parameters. It verifies the token’s presence and validity against the predefined AUTH_TOKEN. If the token is missing or invalid, the server responds with an HTTP 401 Unauthorized status.


async def query_param_auth(connection, request):
    """Authenticate user from token in query parameter."""
    query = urllib.parse.urlparse(request.path).query
    params = urllib.parse.parse_qs(query)
    values = params.get('token', [])
    if values:
        token = values[0]
        if token is None:
            return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Missing token\n")
        if token != AUTH_TOKEN:
            return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Invalid token\n")

The main function initializes the trip planner pipeline using get_trip_planner_pipe. It then creates a WSPipe object, which serves as the WebSocket server for processing requests. The server uses query_param_auth for authentication and dynamically handles incoming data.


async def main():
    """
    Launches the trip_planner pipeline websocket server for processing requests and handling data.
    """
    pipe = await get_trip_planner_pipe()
    ws_pipe = WSPipe(
        search_name='SuperAgentX trip_planner Websocket Server',
        agentx_pipe=pipe,
        process_request=query_param_auth
    )
    await ws_pipe.start()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, asyncio.CancelledError):
        rprint("\nUser canceled the [bold yellow][i]pipe[/i]!")

This WebSocket server provides a secure and efficient interface for real-time interaction with the trip planner pipeline. By leveraging token-based authentication, it ensures that only authorized users can access the service.

Run the script

$ python3 wspipe.py

Result

(venv) ➜  trip_planner git:(dev) ✗ python3 wspipe.py
Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.
───────────────────────────────────────────────────────────────────── SuperAgentX trip_planner Websocket Server ─────────────────────────────────────────────────────────────────────
🚀 Starting SuperagentX websocket server
😃 Host: localhost
😃 Port: 8765
INFO:websockets.server:server listening on 127.0.0.1:8765