]> vgcfreebox.myrthtech.pt Git - alentejosemlei.git/blob - redis_bridge.py
locking redis db to conections from machine only
[alentejosemlei.git] / redis_bridge.py
1 import asyncio
2 import json
3 import aiohttp
4 import redis.asyncio as redis # Note the new async redis import!
5
6 # --- CONFIGURATION ---
7 # The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once.
8 # If 10 players talk at once, 3 will process immediately, 7 will wait in the async line.
9 MAX_CONCURRENT_OLLAMA_REQUESTS = 3
10 semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
11
12 NPC_SYSTEM_PROMPT = """
13 You are Elrendur Arna. You are currently residing in Alentejo Sem Lei.
14 Keep your responses under 3 sentences.
15 """
16
17 # Dictionary to keep individual player conversations separate
18 # Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]}
19 chat_memory = {}
20
21 async def process_message(r, session, message_data):
22 """This function runs in parallel for every single message received."""
23 try:
24 data = json.loads(message_data)
25 player_name = data.get('player', 'Unknown')
26 npc_tag = data.get('npc_tag', 'UnknownNPC')
27 message = data.get('message', '')
28
29 # 1. Create a unique session ID for this specific player and NPC interaction
30 session_id = f"{player_name}_{npc_tag}"
31
32 # 2. If they haven't spoken before, initialize their memory with the system prompt
33 if session_id not in chat_memory:
34 chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
35
36 # 3. Add the player's new message
37 chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
38
39 # --- THE SLIDING WINDOW FIX ---
40 # If the memory gets longer than 11 messages (1 system prompt + 10 chat messages)
41 if len(chat_memory[session_id]) > 11:
42 # Keep the system prompt at index [0], and grab the 10 most recent messages
43 chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-10:]
44
45 # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full
46 async with semaphore:
47 print(f"[THINKING] Processing reply for {player_name}...")
48
49 # Use aiohttp to make a non-blocking network request to Ollama
50 async with session.post('http://localhost:11434/api/chat', json={
51 "model": "llama3",
52 "messages": chat_memory[session_id],
53 "stream": False
54 }, timeout=45) as response:
55
56 response.raise_for_status()
57 result = await response.json()
58 reply_text = result['message']['content']
59
60 # 5. Save the AI's reply to the memory and print it
61 print(f"[REPLY] Elrendur to {player_name}: {reply_text}")
62 chat_memory[session_id].append({"role": "assistant", "content": reply_text})
63
64 # 6. Package and send back to the game (Awaiting the async Redis push)
65 reply_payload = {
66 "npc_tag": npc_tag,
67 "reply": reply_text
68 }
69 await r.rpush('llm_to_nwn', json.dumps(reply_payload))
70
71 except Exception as e:
72 print(f"[ERROR] Failed to process message: {e}")
73
74
75 async def main():
76 print("Initializing Async Redis Bridge...")
77
78 # Notice we use redis.asyncio.Redis now
79 r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
80
81 try:
82 await r.ping()
83 print("SUCCESS: Connected to the Docker Redis database!")
84 except Exception as e:
85 print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
86 return
87
88 print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
89
90 # Open a single persistent HTTP session for efficiency
91 async with aiohttp.ClientSession() as session:
92 while True:
93 try:
94 # This line yields control back to the event loop while waiting for Redis
95 result = await r.blpop('nwn_to_llm')
96
97 if result:
98 queue_name, message_data = result
99
100 # THE MAGIC: Spawn a background worker to handle this message
101 # and instantly go back to listening to the Redis queue!
102 asyncio.create_task(process_message(r, session, message_data))
103
104 except Exception as e:
105 print(f"[LOOP ERROR] {e}")
106 await asyncio.sleep(1) # Prevent infinite crash loops
107
108 if __name__ == "__main__":
109 # This starts the Asyncio Event Loop
110 asyncio.run(main())