-import redis
+import asyncio
import json
-import requests
+import aiohttp
+import redis.asyncio as redis # Note the new async redis import!
-print("Initializing Redis Bridge...")
-
-# 1. Force the correct Windows IP address and test the connection immediately
-try:
- r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
- r.ping() # This will crash immediately if the connection is bad
- print("SUCCESS: Connected to the Docker Redis database!")
-except Exception as e:
- print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
- exit()
+# --- CONFIGURATION ---
+# The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once.
+# If 10 players talk at once, 3 will process immediately, 7 will wait in the async line.
+MAX_CONCURRENT_OLLAMA_REQUESTS = 3
+semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
NPC_SYSTEM_PROMPT = """
You are Elrendur Arna. You are currently residing in Alentejo Sem Lei.
Keep your responses under 3 sentences.
"""
-chat_history = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
-print("\nReady! Listening for game messages on 'nwn_to_llm'...")
+# Dictionary to keep individual player conversations separate
+# Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]}
+chat_memory = {}
-while True:
+async def process_message(r, session, message_data):
+ """This function runs in parallel for every single message received."""
try:
- # This line freezes the script until a message arrives
- result = r.blpop('nwn_to_llm')
-
- # --- IF IT HEARS THE GAME, IT WILL PRINT THIS ---
- print(f"\n--- WAKE UP! NEW MESSAGE RECEIVED ---")
-
- queue_name, message_data = result
- print(f"Raw data from Redis: {message_data}")
-
- # Try to parse the JSON
data = json.loads(message_data)
player_name = data.get('player', 'Unknown')
npc_tag = data.get('npc_tag', 'UnknownNPC')
message = data.get('message', '')
- print(f"Parsed: {player_name} said to {npc_tag}: '{message}'")
-
- chat_history.append({"role": "user", "content": f"{player_name} says: {message}"})
+ # 1. Create a unique session ID for this specific player and NPC interaction
+ session_id = f"{player_name}_{npc_tag}"
- # Talk to Ollama
- print("Thinking... (Sending to local Ollama)")
- response = requests.post('http://localhost:11434/api/chat', json={
- "model": "llama3",
- "messages": chat_history,
- "stream": False
- }, timeout=45)
-
- response.raise_for_status() # Triggers an error if Ollama is broken
- reply_text = response.json()['message']['content']
-
- print(f"Ollama Replied: {reply_text}")
- chat_history.append({"role": "assistant", "content": reply_text})
+ # 2. If they haven't spoken before, initialize their memory with the system prompt
+ if session_id not in chat_memory:
+ chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
+
+ # 3. Add the player's new message to their specific history
+ print(f"\n[RECEIVED] {player_name} -> {npc_tag}: '{message}'")
+ chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
+
+ # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full
+ async with semaphore:
+ print(f"[THINKING] Processing reply for {player_name}...")
+
+ # Use aiohttp to make a non-blocking network request to Ollama
+ async with session.post('http://localhost:11434/api/chat', json={
+ "model": "llama3",
+ "messages": chat_memory[session_id],
+ "stream": False
+ }, timeout=45) as response:
+
+ response.raise_for_status()
+ result = await response.json()
+ reply_text = result['message']['content']
+
+ # 5. Save the AI's reply to the memory and print it
+ print(f"[REPLY] Elrendur to {player_name}: {reply_text}")
+ chat_memory[session_id].append({"role": "assistant", "content": reply_text})
- # Package and send back to the game
+ # 6. Package and send back to the game (Awaiting the async Redis push)
reply_payload = {
- "player": player_name,
"npc_tag": npc_tag,
"reply": reply_text
}
-
- r.rpush('llm_to_nwn', json.dumps(reply_payload))
- print("SUCCESS: Sent reply to the 'llm_to_nwn' queue for the game to pick up!")
+ await r.rpush('llm_to_nwn', json.dumps(reply_payload))
+
+ except Exception as e:
+ print(f"[ERROR] Failed to process message: {e}")
+
+
+async def main():
+ print("Initializing Async Redis Bridge...")
+
+ # Notice we use redis.asyncio.Redis now
+ r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
- except json.JSONDecodeError as e:
- print(f"ERROR: The game sent bad JSON data: {e}")
- except requests.exceptions.RequestException as e:
- print(f"ERROR: Failed to talk to Ollama. Is it running? {e}")
+ try:
+ await r.ping()
+ print("SUCCESS: Connected to the Docker Redis database!")
except Exception as e:
- print(f"UNEXPECTED ERROR: {e}")
\ No newline at end of file
+ print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
+ return
+
+ print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
+
+ # Open a single persistent HTTP session for efficiency
+ async with aiohttp.ClientSession() as session:
+ while True:
+ try:
+ # This line yields control back to the event loop while waiting for Redis
+ result = await r.blpop('nwn_to_llm')
+
+ if result:
+ queue_name, message_data = result
+
+ # THE MAGIC: Spawn a background worker to handle this message
+ # and instantly go back to listening to the Redis queue!
+ asyncio.create_task(process_message(r, session, message_data))
+
+ except Exception as e:
+ print(f"[LOOP ERROR] {e}")
+ await asyncio.sleep(1) # Prevent infinite crash loops
+
+if __name__ == "__main__":
+ # This starts the Asyncio Event Loop
+ asyncio.run(main())
\ No newline at end of file