From: vitler Date: Fri, 10 Apr 2026 23:09:05 +0000 (+0100) Subject: parallel llm access with python asyncio X-Git-Url: https://vgcfreebox.myrthtech.pt/gitweb/alentejosemlei.git/commitdiff_plain/edddb07cf6c42360b20c036bc9a30f0f5a48cb78?hp=95ab26eae90065d40149b3a7c3ae7b823f95864f parallel llm access with python asyncio --- diff --git a/redis_bridge.py b/redis_bridge.py index bc40dfa..97f27e9 100644 --- a/redis_bridge.py +++ b/redis_bridge.py @@ -1,74 +1,105 @@ -import redis +import asyncio import json -import requests +import aiohttp +import redis.asyncio as redis # Note the new async redis import! -print("Initializing Redis Bridge...") - -# 1. Force the correct Windows IP address and test the connection immediately -try: - r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True) - r.ping() # This will crash immediately if the connection is bad - print("SUCCESS: Connected to the Docker Redis database!") -except Exception as e: - print(f"CRITICAL ERROR: Could not connect to Redis. {e}") - exit() +# --- CONFIGURATION --- +# The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once. +# If 10 players talk at once, 3 will process immediately, 7 will wait in the async line. +MAX_CONCURRENT_OLLAMA_REQUESTS = 3 +semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS) NPC_SYSTEM_PROMPT = """ You are Elrendur Arna. You are currently residing in Alentejo Sem Lei. Keep your responses under 3 sentences. """ -chat_history = [{"role": "system", "content": NPC_SYSTEM_PROMPT}] -print("\nReady! Listening for game messages on 'nwn_to_llm'...") +# Dictionary to keep individual player conversations separate +# Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]} +chat_memory = {} -while True: +async def process_message(r, session, message_data): + """This function runs in parallel for every single message received.""" try: - # This line freezes the script until a message arrives - result = r.blpop('nwn_to_llm') - - # --- IF IT HEARS THE GAME, IT WILL PRINT THIS --- - print(f"\n--- WAKE UP! NEW MESSAGE RECEIVED ---") - - queue_name, message_data = result - print(f"Raw data from Redis: {message_data}") - - # Try to parse the JSON data = json.loads(message_data) player_name = data.get('player', 'Unknown') npc_tag = data.get('npc_tag', 'UnknownNPC') message = data.get('message', '') - print(f"Parsed: {player_name} said to {npc_tag}: '{message}'") - - chat_history.append({"role": "user", "content": f"{player_name} says: {message}"}) + # 1. Create a unique session ID for this specific player and NPC interaction + session_id = f"{player_name}_{npc_tag}" - # Talk to Ollama - print("Thinking... (Sending to local Ollama)") - response = requests.post('http://localhost:11434/api/chat', json={ - "model": "llama3", - "messages": chat_history, - "stream": False - }, timeout=45) - - response.raise_for_status() # Triggers an error if Ollama is broken - reply_text = response.json()['message']['content'] - - print(f"Ollama Replied: {reply_text}") - chat_history.append({"role": "assistant", "content": reply_text}) + # 2. If they haven't spoken before, initialize their memory with the system prompt + if session_id not in chat_memory: + chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}] + + # 3. Add the player's new message to their specific history + print(f"\n[RECEIVED] {player_name} -> {npc_tag}: '{message}'") + chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"}) + + # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full + async with semaphore: + print(f"[THINKING] Processing reply for {player_name}...") + + # Use aiohttp to make a non-blocking network request to Ollama + async with session.post('http://localhost:11434/api/chat', json={ + "model": "llama3", + "messages": chat_memory[session_id], + "stream": False + }, timeout=45) as response: + + response.raise_for_status() + result = await response.json() + reply_text = result['message']['content'] + + # 5. Save the AI's reply to the memory and print it + print(f"[REPLY] Elrendur to {player_name}: {reply_text}") + chat_memory[session_id].append({"role": "assistant", "content": reply_text}) - # Package and send back to the game + # 6. Package and send back to the game (Awaiting the async Redis push) reply_payload = { - "player": player_name, "npc_tag": npc_tag, "reply": reply_text } - - r.rpush('llm_to_nwn', json.dumps(reply_payload)) - print("SUCCESS: Sent reply to the 'llm_to_nwn' queue for the game to pick up!") + await r.rpush('llm_to_nwn', json.dumps(reply_payload)) + + except Exception as e: + print(f"[ERROR] Failed to process message: {e}") + + +async def main(): + print("Initializing Async Redis Bridge...") + + # Notice we use redis.asyncio.Redis now + r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True) - except json.JSONDecodeError as e: - print(f"ERROR: The game sent bad JSON data: {e}") - except requests.exceptions.RequestException as e: - print(f"ERROR: Failed to talk to Ollama. Is it running? {e}") + try: + await r.ping() + print("SUCCESS: Connected to the Docker Redis database!") except Exception as e: - print(f"UNEXPECTED ERROR: {e}") \ No newline at end of file + print(f"CRITICAL ERROR: Could not connect to Redis. {e}") + return + + print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}") + + # Open a single persistent HTTP session for efficiency + async with aiohttp.ClientSession() as session: + while True: + try: + # This line yields control back to the event loop while waiting for Redis + result = await r.blpop('nwn_to_llm') + + if result: + queue_name, message_data = result + + # THE MAGIC: Spawn a background worker to handle this message + # and instantly go back to listening to the Redis queue! + asyncio.create_task(process_message(r, session, message_data)) + + except Exception as e: + print(f"[LOOP ERROR] {e}") + await asyncio.sleep(1) # Prevent infinite crash loops + +if __name__ == "__main__": + # This starts the Asyncio Event Loop + asyncio.run(main()) \ No newline at end of file