This documentation provides an overview of the implementation of a WebSocket server for the trip planner pipeline using
SuperagentX. The server processes incoming WebSocket requests, authenticates users via query parameters, and handles
real-time data processing through the pipeline.
Overview
The code sets up a WebSocket server using the WSPipe class from SuperagentX. It integrates a custom authentication
mechanism to validate client requests based on a token passed in the query parameters.
Setup Environment
$ pip install superagentx
Implementation
Import the dependencies.
import asyncio
import http
import urllib.parse
from rich import print as rprint
from superagentx.pipeimpl.wspipe import WSPipe # https://websockets.readthedocs.io/en/stable/
from trip_planner.config import AUTH_TOKEN
from trip_planner.pipe import get_trip_planner_pipe
The query_param_auth function authenticates users based on a token included in the WebSocket request’s query parameters.
It verifies the token’s presence and validity against the predefined AUTH_TOKEN. If the token is missing or invalid,
the server responds with an HTTP 401 Unauthorized status.
async def query_param_auth(connection, request):
"""Authenticate user from token in query parameter."""
query = urllib.parse.urlparse(request.path).query
params = urllib.parse.parse_qs(query)
values = params.get('token', [])
if values:
token = values[0]
if token is None:
return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Missing token\n")
if token != AUTH_TOKEN:
return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Invalid token\n")
The main function initializes the trip planner pipeline using get_trip_planner_pipe. It then creates a WSPipe object,
which serves as the WebSocket server for processing requests. The server uses query_param_auth for authentication and
dynamically handles incoming data.
async def main():
"""
Launches the trip_planner pipeline websocket server for processing requests and handling data.
"""
pipe = await get_trip_planner_pipe()
ws_pipe = WSPipe(
search_name='SuperAgentX trip_planner Websocket Server',
agentx_pipe=pipe,
process_request=query_param_auth
)
await ws_pipe.start()
if __name__ == '__main__':
try:
asyncio.run(main())
except (KeyboardInterrupt, asyncio.CancelledError):
rprint("\nUser canceled the [bold yellow][i]pipe[/i]!")
This WebSocket server provides a secure and efficient interface for real-time interaction with the trip planner pipeline.
By leveraging token-based authentication, it ensures that only authorized users can access the service.
Run the script
Result
(venv) ➜ trip_planner git:(dev) ✗ python3 wspipe.py
Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.
───────────────────────────────────────────────────────────────────── SuperAgentX trip_planner Websocket Server ─────────────────────────────────────────────────────────────────────
🚀 Starting SuperagentX websocket server
😃 Host: localhost
😃 Port: 8765
INFO:websockets.server:server listening on 127.0.0.1:8765