This documentation provides an overview of the implementation of a WebSocket server for the trip planner pipeline using
SuperagentX. The server processes incoming WebSocket requests, authenticates users via query parameters, and handles
real-time data processing through the pipeline.
The code sets up a WebSocket server using the WSPipe class from SuperagentX. It integrates a custom authentication
mechanism to validate client requests based on a token passed in the query parameters.
The query_param_auth function authenticates users based on a token included in the WebSocket request’s query parameters.
It verifies the token’s presence and validity against the predefined AUTH_TOKEN. If the token is missing or invalid,
the server responds with an HTTP 401 Unauthorized status.
Copy
async def query_param_auth(connection, request): """Authenticate user from token in query parameter.""" query = urllib.parse.urlparse(request.path).query params = urllib.parse.parse_qs(query) values = params.get('token', []) if values: token = values[0] if token is None: return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Missing token\n") if token != AUTH_TOKEN: return connection.respond(http.HTTPStatus.UNAUTHORIZED, "Invalid token\n")
The main function initializes the trip planner pipeline using get_trip_planner_pipe. It then creates a WSPipe object,
which serves as the WebSocket server for processing requests. The server uses query_param_auth for authentication and
dynamically handles incoming data.
Copy
async def main(): """ Launches the trip_planner pipeline websocket server for processing requests and handling data. """ pipe = await get_trip_planner_pipe() ws_pipe = WSPipe( search_name='SuperAgentX trip_planner Websocket Server', agentx_pipe=pipe, process_request=query_param_auth ) await ws_pipe.start()if __name__ == '__main__': try: asyncio.run(main()) except (KeyboardInterrupt, asyncio.CancelledError): rprint("\nUser canceled the [bold yellow][i]pipe[/i]!")
This WebSocket server provides a secure and efficient interface for real-time interaction with the trip planner pipeline.
By leveraging token-based authentication, it ensures that only authorized users can access the service.
(venv) ➜ trip_planner git:(dev) ✗ python3 wspipe.pyWarning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.───────────────────────────────────────────────────────────────────── SuperAgentX trip_planner Websocket Server ─────────────────────────────────────────────────────────────────────🚀 Starting SuperagentX websocket server😃 Host: localhost😃 Port: 8765INFO:websockets.server:server listening on 127.0.0.1:8765