]> vgcfreebox.myrthtech.pt Git - alentejosemlei.git/blobdiff - redis_bridge.py
locking redis db to conections from machine only
[alentejosemlei.git] / redis_bridge.py
index bc40dfab6d461f4d7b05102d14a7a70a93b7348e..6c7be23673e29713b533a370ab281db03f739a4f 100644 (file)
-import redis
+import asyncio
 import json
-import requests
+import aiohttp
+import redis.asyncio as redis # Note the new async redis import!
 
-print("Initializing Redis Bridge...")
-
-# 1. Force the correct Windows IP address and test the connection immediately
-try:
-    r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
-    r.ping() # This will crash immediately if the connection is bad
-    print("SUCCESS: Connected to the Docker Redis database!")
-except Exception as e:
-    print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
-    exit()
+# --- CONFIGURATION ---
+# The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once.
+# If 10 players talk at once, 3 will process immediately, 7 will wait in the async line.
+MAX_CONCURRENT_OLLAMA_REQUESTS = 3 
+semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
 
 NPC_SYSTEM_PROMPT = """
 You are Elrendur Arna. You are currently residing in Alentejo Sem Lei.
 Keep your responses under 3 sentences.
 """
-chat_history = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
 
-print("\nReady! Listening for game messages on 'nwn_to_llm'...")
+# Dictionary to keep individual player conversations separate
+# Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]}
+chat_memory = {}
 
-while True:
+async def process_message(r, session, message_data):
+    """This function runs in parallel for every single message received."""
     try:
-        # This line freezes the script until a message arrives
-        result = r.blpop('nwn_to_llm')
-        
-        # --- IF IT HEARS THE GAME, IT WILL PRINT THIS ---
-        print(f"\n--- WAKE UP! NEW MESSAGE RECEIVED ---")
-        
-        queue_name, message_data = result
-        print(f"Raw data from Redis: {message_data}")
-        
-        # Try to parse the JSON
         data = json.loads(message_data)
         player_name = data.get('player', 'Unknown')
         npc_tag = data.get('npc_tag', 'UnknownNPC')
         message = data.get('message', '')
 
-        print(f"Parsed: {player_name} said to {npc_tag}: '{message}'")
-
-        chat_history.append({"role": "user", "content": f"{player_name} says: {message}"})
+        # 1. Create a unique session ID for this specific player and NPC interaction
+        session_id = f"{player_name}_{npc_tag}"
         
-        # Talk to Ollama
-        print("Thinking... (Sending to local Ollama)")
-        response = requests.post('http://localhost:11434/api/chat', json={
-            "model": "llama3",
-            "messages": chat_history,
-            "stream": False
-        }, timeout=45)
-        
-        response.raise_for_status() # Triggers an error if Ollama is broken
-        reply_text = response.json()['message']['content']
-        
-        print(f"Ollama Replied: {reply_text}")
-        chat_history.append({"role": "assistant", "content": reply_text})
+        # 2. If they haven't spoken before, initialize their memory with the system prompt
+        if session_id not in chat_memory:
+            chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
+
+        # 3. Add the player's new message
+        chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
+
+        # --- THE SLIDING WINDOW FIX ---
+        # If the memory gets longer than 11 messages (1 system prompt + 10 chat messages)
+        if len(chat_memory[session_id]) > 11:
+            # Keep the system prompt at index [0], and grab the 10 most recent messages
+            chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-10:]
+
+        # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full
+        async with semaphore:
+            print(f"[THINKING] Processing reply for {player_name}...")
+            
+            # Use aiohttp to make a non-blocking network request to Ollama
+            async with session.post('http://localhost:11434/api/chat', json={
+                "model": "llama3",
+                "messages": chat_memory[session_id],
+                "stream": False
+            }, timeout=45) as response:
+                
+                response.raise_for_status()
+                result = await response.json()
+                reply_text = result['message']['content']
 
-        # Package and send back to the game
+        # 5. Save the AI's reply to the memory and print it
+        print(f"[REPLY] Elrendur to {player_name}: {reply_text}")
+        chat_memory[session_id].append({"role": "assistant", "content": reply_text})
+
+        # 6. Package and send back to the game (Awaiting the async Redis push)
         reply_payload = {
-            "player": player_name,
             "npc_tag": npc_tag,
             "reply": reply_text
         }
-        
-        r.rpush('llm_to_nwn', json.dumps(reply_payload))
-        print("SUCCESS: Sent reply to the 'llm_to_nwn' queue for the game to pick up!")
+        await r.rpush('llm_to_nwn', json.dumps(reply_payload))
 
-    except json.JSONDecodeError as e:
-        print(f"ERROR: The game sent bad JSON data: {e}")
-    except requests.exceptions.RequestException as e:
-        print(f"ERROR: Failed to talk to Ollama. Is it running? {e}")
     except Exception as e:
-        print(f"UNEXPECTED ERROR: {e}")
\ No newline at end of file
+        print(f"[ERROR] Failed to process message: {e}")
+
+
+async def main():
+    print("Initializing Async Redis Bridge...")
+    
+    # Notice we use redis.asyncio.Redis now
+    r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
+
+    try:
+        await r.ping()
+        print("SUCCESS: Connected to the Docker Redis database!")
+    except Exception as e:
+        print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
+        return
+
+    print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
+
+    # Open a single persistent HTTP session for efficiency
+    async with aiohttp.ClientSession() as session:
+        while True:
+            try:
+                # This line yields control back to the event loop while waiting for Redis
+                result = await r.blpop('nwn_to_llm')
+                
+                if result:
+                    queue_name, message_data = result
+                    
+                    # THE MAGIC: Spawn a background worker to handle this message 
+                    # and instantly go back to listening to the Redis queue!
+                    asyncio.create_task(process_message(r, session, message_data))
+                    
+            except Exception as e:
+                print(f"[LOOP ERROR] {e}")
+                await asyncio.sleep(1) # Prevent infinite crash loops
+
+if __name__ == "__main__":
+    # This starts the Asyncio Event Loop
+    asyncio.run(main())
\ No newline at end of file