]> vgcfreebox.myrthtech.pt Git - alentejosemlei.git/blob - redis_bridge.py
97f27e9b93e564982bdc16e6d563790fafa425fa
[alentejosemlei.git] / redis_bridge.py
1 import asyncio
2 import json
3 import aiohttp
4 import redis.asyncio as redis # Note the new async redis import!
5
6 # --- CONFIGURATION ---
7 # The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once.
8 # If 10 players talk at once, 3 will process immediately, 7 will wait in the async line.
9 MAX_CONCURRENT_OLLAMA_REQUESTS = 3
10 semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
11
12 NPC_SYSTEM_PROMPT = """
13 You are Elrendur Arna. You are currently residing in Alentejo Sem Lei.
14 Keep your responses under 3 sentences.
15 """
16
17 # Dictionary to keep individual player conversations separate
18 # Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]}
19 chat_memory = {}
20
21 async def process_message(r, session, message_data):
22 """This function runs in parallel for every single message received."""
23 try:
24 data = json.loads(message_data)
25 player_name = data.get('player', 'Unknown')
26 npc_tag = data.get('npc_tag', 'UnknownNPC')
27 message = data.get('message', '')
28
29 # 1. Create a unique session ID for this specific player and NPC interaction
30 session_id = f"{player_name}_{npc_tag}"
31
32 # 2. If they haven't spoken before, initialize their memory with the system prompt
33 if session_id not in chat_memory:
34 chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
35
36 # 3. Add the player's new message to their specific history
37 print(f"\n[RECEIVED] {player_name} -> {npc_tag}: '{message}'")
38 chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
39
40 # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full
41 async with semaphore:
42 print(f"[THINKING] Processing reply for {player_name}...")
43
44 # Use aiohttp to make a non-blocking network request to Ollama
45 async with session.post('http://localhost:11434/api/chat', json={
46 "model": "llama3",
47 "messages": chat_memory[session_id],
48 "stream": False
49 }, timeout=45) as response:
50
51 response.raise_for_status()
52 result = await response.json()
53 reply_text = result['message']['content']
54
55 # 5. Save the AI's reply to the memory and print it
56 print(f"[REPLY] Elrendur to {player_name}: {reply_text}")
57 chat_memory[session_id].append({"role": "assistant", "content": reply_text})
58
59 # 6. Package and send back to the game (Awaiting the async Redis push)
60 reply_payload = {
61 "npc_tag": npc_tag,
62 "reply": reply_text
63 }
64 await r.rpush('llm_to_nwn', json.dumps(reply_payload))
65
66 except Exception as e:
67 print(f"[ERROR] Failed to process message: {e}")
68
69
70 async def main():
71 print("Initializing Async Redis Bridge...")
72
73 # Notice we use redis.asyncio.Redis now
74 r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
75
76 try:
77 await r.ping()
78 print("SUCCESS: Connected to the Docker Redis database!")
79 except Exception as e:
80 print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
81 return
82
83 print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
84
85 # Open a single persistent HTTP session for efficiency
86 async with aiohttp.ClientSession() as session:
87 while True:
88 try:
89 # This line yields control back to the event loop while waiting for Redis
90 result = await r.blpop('nwn_to_llm')
91
92 if result:
93 queue_name, message_data = result
94
95 # THE MAGIC: Spawn a background worker to handle this message
96 # and instantly go back to listening to the Redis queue!
97 asyncio.create_task(process_message(r, session, message_data))
98
99 except Exception as e:
100 print(f"[LOOP ERROR] {e}")
101 await asyncio.sleep(1) # Prevent infinite crash loops
102
103 if __name__ == "__main__":
104 # This starts the Asyncio Event Loop
105 asyncio.run(main())