]> vgcfreebox.myrthtech.pt Git - alentejosemlei.git/blob - redis_bridge.py
4aebb3ab5de0985393063194d9ba10c77da8dfa4
[alentejosemlei.git] / redis_bridge.py
1 import asyncio
2 import json
3 import aiohttp
4 import redis.asyncio as redis
5 import re
6 import os
7 import time
8 import chromadb
9
10 # --- CONFIGURATION ---
11 MAX_CONCURRENT_OLLAMA_REQUESTS = 3
12 ALLOW_TEXT_EMOTES = False
13
14 # =====================================================================
15 # 1. INITIALIZE VECTOR DATABASES (Lore & Episodic Memory)
16 # =====================================================================
17 print("Initializing ChromaDB Vector Databases...")
18 chroma_client = chromadb.PersistentClient(path="./asl_vectordb")
19
20 # The Lore Database
21 lore_collection = chroma_client.get_or_create_collection(name="world_lore")
22
23 # --- NEW: The Episodic Memory Database ---
24 memory_collection = chroma_client.get_or_create_collection(name="episodic_memories")
25 memory_queue = asyncio.Queue()
26
27 if os.path.exists("asl_lore.md"):
28 if lore_collection.count() == 0:
29 print("[VECTOR DB] Reading asl_lore.txt and vectorizing chunks...")
30 with open("asl_lore.txt", "r", encoding="utf-8") as f:
31 raw_lore = f.read()
32
33 lore_chunks = [chunk.strip() for chunk in raw_lore.split('\n\n') if chunk.strip()]
34
35 if lore_chunks:
36 chunk_ids = [f"lore_{i}" for i in range(len(lore_chunks))]
37 lore_collection.add(documents=lore_chunks, ids=chunk_ids)
38 print(f"[VECTOR DB] Successfully stored {len(lore_chunks)} lore chunks!")
39 else:
40 print("[WARNING] asl_lore.txt not found.")
41
42 semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
43 chat_memory = {}
44
45 # =====================================================================
46 # --- BACKGROUND MEMORY SUMMARIZER (The "Dream State") ---
47 # =====================================================================
48 async def memory_summarizer_worker(session):
49 print("[BACKGROUND] Memory Summarizer Worker is active.")
50 while True:
51 job = await memory_queue.get()
52 session_id = job['session_id']
53 player_name = job['player_name']
54 npc_tag = job['npc_tag']
55 chat_log = job['chat_log']
56
57 prompt = f"Summarize the key events, facts, and the emotional tone of this conversation snippet between {player_name} and {npc_tag}. Keep it to 2 brief sentences in the past tense.\nConversation Log:\n{chat_log}"
58
59 try:
60 print(f"[MEMORY DB] Generating background memory for {player_name} and {npc_tag}...")
61 # We use /api/generate here because we just want a raw text summary, not a JSON macro
62 async with session.post('http://localhost:11434/api/generate', json={
63 "model": "gemma4",
64 "prompt": prompt,
65 "stream": False,
66 "options": {
67 "temperature": 0.1
68 }
69 }) as response:
70 result = await response.json()
71 summary = result['response'].strip()
72
73 if summary:
74 doc_id = f"{session_id}_{int(time.time())}"
75 # We store the session_id as metadata so NPCs only recall their OWN memories with this specific player
76 memory_collection.add(
77 documents=[summary],
78 metadatas=[{"session_id": session_id}],
79 ids=[doc_id]
80 )
81 print(f"[MEMORY DB] Memory Saved: {summary}")
82
83 except Exception as e:
84 print(f"[MEMORY ERROR] Failed to summarize memory: {e}")
85
86 memory_queue.task_done()
87 # =====================================================================
88
89
90 async def process_message(r, session, message_data):
91 try:
92 data = json.loads(message_data)
93 player_name = data.get('player', data.get('target_player', 'Unknown'))
94 npc_tag = data.get('npc_tag', 'UnknownNPC')
95 message = data.get('message', '')
96
97 if not ALLOW_TEXT_EMOTES:
98 message = re.sub(r'\*.*?\*', '', message).strip()
99
100 player_race = data.get('player_race', 'Unknown')
101 player_alignment = data.get('player_alignment', 'Unknown')
102 nearby_players = data.get('nearby_players', '')
103
104 npc_persona = data.get('persona', 'You are a generic citizen.')
105 npc_profession = data.get('profession', 'Commoner')
106 npc_mood = data.get('mood', 'Neutral')
107 npc_secret = data.get('secret', '')
108
109 npc_alignment = data.get('npc_alignment', 'True Neutral')
110 npc_gender = data.get('npc_gender', 'Unknown')
111 npc_race = data.get('npc_race', 'Creature')
112 npc_routine = data.get('npc_routine', '')
113
114 player_state = data.get('player_state', 'Relaxed and unarmed.')
115 world_state = data.get('world_state', 'Nothing of note is happening.')
116 npc_health = data.get('npc_health', 'Healthy and uninjured.')
117 relationship = data.get('relationship', 'Neutral or Friendly.')
118 location_context = data.get('location_context', 'You are in a generic area.')
119
120 group_context = f"Be aware that these other players are listening nearby: {nearby_players}." if nearby_players else ""
121 secret_context = f"YOUR SECRET (Reveal only if players are persuasive): {npc_secret}" if npc_secret else ""
122 routine_context = f"YOUR REQUIRED ROUTINE: {npc_routine}" if npc_routine else ""
123
124 session_id = f"{player_name}_{npc_tag}"
125
126 # =====================================================================
127 # 2. THE DUAL RAG QUERY (Lore + Memories)
128 # =====================================================================
129 search_query = f"{location_context} {message}"
130 retrieved_lore = "No specific local lore currently relevant."
131 past_memories = ""
132
133 # Fetch Lore
134 if lore_collection.count() > 0:
135 results = lore_collection.query(query_texts=[search_query], n_results=1)
136 if results['documents'] and results['documents'][0]:
137 retrieved_lore = f"- {results['documents'][0][0]}"
138
139 # Fetch Episodic Memories (ONLY memories between this specific NPC and this specific Player)
140 if memory_collection.count() > 0:
141 mem_results = memory_collection.query(
142 query_texts=[search_query],
143 n_results=2,
144 where={"session_id": session_id}
145 )
146 if mem_results['documents'] and mem_results['documents'][0]:
147 formatted_mems = "\n- ".join(mem_results['documents'][0])
148 past_memories = f"\nPAST MEMORIES OF {player_name}:\n- {formatted_mems}"
149 # =====================================================================
150
151 dynamic_system_prompt = f"""
152
153 {npc_persona}
154
155 CURRENT STATUS & TRAITS:
156 - Race & Gender: {npc_gender} {npc_race}
157 - Profession: {npc_profession}
158 - Alignment: {npc_alignment}
159 - Conversational Charisma: Low/Gruff unless otherwise specified.
160 - Current Mood: {npc_mood}
161 - Current Physical State: {npc_health}
162 {secret_context}
163 {routine_context}
164
165 CURRENT LOCATION: {location_context}
166
167 RELEVANT WORLD KNOWLEDGE:
168 {retrieved_lore}
169 {past_memories}
170
171 CURRENT WORLD RUMORS/EVENTS:
172 {world_state}
173
174 CURRENT TARGET: You are speaking to {player_name}, who is a {player_alignment} {player_race}.
175 Their physical state: {player_state}
176 Relationship to you: {relationship}
177 {group_context}
178 React appropriately based on your personality, alignment, and mood.
179
180 CRITICAL ENGINE RULES:
181 Respond ONLY in valid JSON. You MUST use exactly these FIVE keys: "thought", "speech", "emotion", "action", and "action_target".
182
183 ACTION RULE:
184 Your "action" key MUST be exactly one of the following words:
185 [WANDER, PATROL, FOLLOW, GUARD, GO_TO, INTERACT, USE_OBJECT, RETURN_TO_POST, ATTACK, REST, STEALTH, SEARCH, UNSTEALTH, PEACE, COMMAND]
186
187 EMOTION RULE:
188 Your "emotion" key MUST be exactly one of the following words:
189 [NEUTRAL, LAUGHING, ANGRY, PLEADING, BOW, TAUNT, CHEER]. Do not invent new emotions. Do not perform writen emotions in text with **.
190
191 YOUR RESPONSE MUST BE A SINGLE, VALID JSON OBJECT. YOU MUST USE THIS EXACT TEMPLATE:
192 {{
193 "thought": "Your internal reasoning here.",
194 "speech": "You MUST say something out loud. If you don't want to talk, output something your character would do.",
195 "emotion": "MACRO WORD",
196 "action": "MACRO WORD",
197 "action_target": "Target name"
198 }}
199 """
200
201 if session_id not in chat_memory:
202 chat_memory[session_id] = [{"role": "system", "content": dynamic_system_prompt}]
203 else:
204 chat_memory[session_id][0] = {"role": "system", "content": dynamic_system_prompt}
205
206 chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
207
208 # =====================================================================
209 # --- THE MEMORY EXTRACTION TRIGGER ---
210 # =====================================================================
211 if len(chat_memory[session_id]) > 10:
212 # Grab the 5 oldest conversation messages (skipping the system prompt at [0])
213 messages_to_summarize = chat_memory[session_id][1:6]
214 chat_log_str = "\n".join([m['content'] for m in messages_to_summarize])
215
216 # Fire and forget: push it to the background queue!
217 await memory_queue.put({
218 'session_id': session_id,
219 'player_name': player_name,
220 'npc_tag': npc_tag,
221 'chat_log': chat_log_str
222 })
223
224 # Slide the window to keep live generation fast
225 chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-5:]
226 # =====================================================================
227
228 async with semaphore:
229 print(f"[THINKING] Processing reply for {player_name}...")
230 async with session.post('http://localhost:11434/api/chat', json={
231 "model": "llama3",
232 "messages": chat_memory[session_id],
233 "format": "json",
234 "stream": False,
235 "options": {
236 "temperature": 0.2
237 }
238 }, timeout=45) as response:
239
240 response.raise_for_status()
241 result = await response.json()
242 raw_reply_text = result['message']['content']
243
244 try:
245 agent_brain = json.loads(raw_reply_text)
246 agent_brain = {k.lower(): v for k, v in agent_brain.items()}
247
248 if "thought" not in agent_brain: agent_brain["thought"] = ""
249 if "speech" not in agent_brain: agent_brain["speech"] = ""
250 if "emotion" not in agent_brain: agent_brain["emotion"] = "NEUTRAL"
251 if "action" not in agent_brain: agent_brain["action"] = "GUARD"
252 if "action_target" not in agent_brain: agent_brain["action_target"] = ""
253
254 if not agent_brain["speech"].strip():
255 agent_brain["speech"] = "*grunts quietly*"
256
257 agent_brain["action_target"] = agent_brain["action_target"].replace("?", "").replace(".", "").strip()
258
259 clean_reply_text = json.dumps(agent_brain)
260
261 except json.JSONDecodeError:
262 print(f"[WARNING] AI Hallucinated! Overriding with safe defaults.")
263 clean_reply_text = json.dumps({
264 "thought": "I lost my train of thought.",
265 "speech": "*grunts quietly*",
266 "emotion": "NEUTRAL",
267 "action": "WANDER",
268 "action_target": ""
269 })
270
271 print(f"[REPLY] from {npc_tag} to {player_name}: {clean_reply_text}")
272 chat_memory[session_id].append({"role": "assistant", "content": clean_reply_text})
273
274 reply_payload = {
275 "npc_tag": npc_tag,
276 "target_player": player_name,
277 "reply": clean_reply_text
278 }
279 await r.rpush('llm_to_nwn', json.dumps(reply_payload))
280
281 except Exception as e:
282 print(f"[ERROR] Failed to process message: {e}")
283
284 async def main():
285 print("Initializing Async Redis Bridge...")
286 r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
287
288 try:
289 await r.ping()
290 print("SUCCESS: Connected to the Docker Redis database!")
291 except Exception as e:
292 print(f"CRITICAL ERROR: Could not connect to Redis. {e}")
293 return
294
295 print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
296
297 async with aiohttp.ClientSession() as session:
298 # --- NEW: Start the background memory worker ---
299 asyncio.create_task(memory_summarizer_worker(session))
300
301 while True:
302 try:
303 result = await r.blpop('nwn_to_llm')
304 if result:
305 queue_name, message_data = result
306 asyncio.create_task(process_message(r, session, message_data))
307 except Exception as e:
308 print(f"[LOOP ERROR] {e}")
309 await asyncio.sleep(1)
310
311 if __name__ == "__main__":
312 asyncio.run(main())