X-Git-Url: https://vgcfreebox.myrthtech.pt/gitweb/alentejosemlei.git/blobdiff_plain/ab24380a06f599c16b49628bbb790108edc66528..07207a6193ca05e59ee933ef9e187a132c7fb612:/redis_bridge.py?ds=inline diff --git a/redis_bridge.py b/redis_bridge.py index 6c7be23..2cc86d2 100644 --- a/redis_bridge.py +++ b/redis_bridge.py @@ -1,81 +1,343 @@ import asyncio import json import aiohttp -import redis.asyncio as redis # Note the new async redis import! +import redis.asyncio as redis +import re +import os +import time +import chromadb # --- CONFIGURATION --- -# The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once. -# If 10 players talk at once, 3 will process immediately, 7 will wait in the async line. MAX_CONCURRENT_OLLAMA_REQUESTS = 3 -semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS) +ALLOW_TEXT_EMOTES = False + +# ===================================================================== +# 1. INITIALIZE VECTOR DATABASES (Lore & Episodic Memory) +# ===================================================================== +print("Initializing ChromaDB Vector Databases...") +chroma_client = chromadb.PersistentClient(path="./asl_vectordb") + +# The Lore Database +lore_collection = chroma_client.get_or_create_collection(name="world_lore") + +# The Episodic Memory Database +memory_collection = chroma_client.get_or_create_collection(name="episodic_memories") +memory_queue = asyncio.Queue() -NPC_SYSTEM_PROMPT = """ -You are Elrendur Arna. You are currently residing in Alentejo Sem Lei. -Keep your responses under 3 sentences. -""" +if os.path.exists("asl_lore.md"): + if lore_collection.count() == 0: + print("[VECTOR DB] Reading asl_lore.md and vectorizing chunks...") + with open("asl_lore.md", "r", encoding="utf-8") as f: + raw_lore = f.read() + + lore_chunks = [chunk.strip() for chunk in raw_lore.split('\n\n') if chunk.strip()] + + if lore_chunks: + chunk_ids = [f"lore_{i}" for i in range(len(lore_chunks))] + lore_collection.add(documents=lore_chunks, ids=chunk_ids) + print(f"[VECTOR DB] Successfully stored {len(lore_chunks)} lore chunks!") +else: + print("[WARNING] asl_lore.md not found.") -# Dictionary to keep individual player conversations separate -# Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]} +semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS) chat_memory = {} +# ===================================================================== +# BACKGROUND MEMORY SUMMARIZER (The "Dream State") +# ===================================================================== +async def memory_summarizer_worker(session): + print("[BACKGROUND] Memory Summarizer Worker is active.") + while True: + job = await memory_queue.get() + session_id = job['session_id'] + player_name = job['player_name'] + npc_tag = job['npc_tag'] + chat_log = job['chat_log'] + + prompt = f"Summarize the key events, facts, and the emotional tone of this conversation snippet between {player_name} and {npc_tag}. Keep it to 2 brief sentences in the past tense.\nConversation Log:\n{chat_log}" + + try: + print(f"[MEMORY DB] Generating background memory for {player_name} and {npc_tag}...") + async with session.post('http://localhost:11434/api/generate', json={ + "model": "llama3", + "prompt": prompt, + "stream": False, + "options": { + "temperature": 0.2 + } + }) as response: + result = await response.json() + summary = result['response'].strip() + + if summary: + doc_id = f"{session_id}_{int(time.time())}" + memory_collection.add( + documents=[summary], + metadatas=[{"session_id": session_id}], + ids=[doc_id] + ) + print(f"[MEMORY DB] Memory Saved: {summary}") + + except Exception as e: + print(f"[MEMORY ERROR] Failed to summarize memory: {e}") + + memory_queue.task_done() + +# ===================================================================== +# MAIN MESSAGE PROCESSOR +# ===================================================================== async def process_message(r, session, message_data): - """This function runs in parallel for every single message received.""" try: data = json.loads(message_data) - player_name = data.get('player', 'Unknown') + + # --- Extract Base Contexts --- + player_name = data.get('player', data.get('target_player', 'Unknown')) npc_tag = data.get('npc_tag', 'UnknownNPC') message = data.get('message', '') + + if not ALLOW_TEXT_EMOTES: + message = re.sub(r'\*.*?\*', '', message).strip() + + player_race = data.get('player_race', 'Unknown') + player_alignment = data.get('player_alignment', 'Unknown') + nearby_players = data.get('nearby_players', '') + nearby_npcs = data.get('nearby_npcs', '') + + npc_persona = data.get('persona', 'You are a generic citizen.') + npc_profession = data.get('profession', 'Commoner') + npc_mood = data.get('mood', 'Neutral') + npc_secret = data.get('secret', '') + + npc_alignment = data.get('npc_alignment', 'True Neutral') + npc_gender = data.get('npc_gender', 'Unknown') + npc_race = data.get('npc_race', 'Creature') + npc_routine = data.get('npc_routine', '') + + player_state = data.get('player_state', 'Relaxed and unarmed.') + world_state = data.get('world_state', 'Nothing of note is happening.') + npc_health = data.get('npc_health', 'Healthy and uninjured.') + relationship = data.get('relationship', 'Neutral or Friendly.') + location_context = data.get('location_context', 'You are in a generic area.') + + # Core Strategy Flag (1: Agent, 2: Villain, 3: Maestro, 4: Shrine) + llm_strategy = int(data.get('llm_strategy', 1)) + + + available_quests = data.get('available_quests', '') + available_props = data.get('available_props', '') + + # --- Sub-Context Strings --- + group_context = f"Be aware that these other players are listening nearby: {nearby_players}." if nearby_players else "" + puppet_context = f"Nearby generic NPCs you can CONVERSE with: {nearby_npcs}" if nearby_npcs else "" + secret_context = f"YOUR SECRET (Reveal only if players are persuasive): {npc_secret}" if npc_secret else "" + routine_context = f"YOUR REQUIRED ROUTINE: {npc_routine}" if npc_routine else "" - # 1. Create a unique session ID for this specific player and NPC interaction session_id = f"{player_name}_{npc_tag}" + + # ===================================================================== + # DUAL RAG QUERY (Lore + Memories) + # ===================================================================== + search_query = f"{location_context} {message}" + retrieved_lore = "No specific local lore currently relevant." + past_memories = "" + + if lore_collection.count() > 0: + results = lore_collection.query(query_texts=[search_query], n_results=1) + if results['documents'] and results['documents'][0]: + retrieved_lore = f"- {results['documents'][0][0]}" + + if memory_collection.count() > 0: + mem_results = memory_collection.query( + query_texts=[search_query], n_results=2, where={"session_id": session_id} + ) + if mem_results['documents'] and mem_results['documents'][0]: + formatted_mems = "\n- ".join(mem_results['documents'][0]) + past_memories = f"\nPAST MEMORIES OF {player_name}:\n- {formatted_mems}" + + # ===================================================================== + # STRATEGY-SPECIFIC PROMPT COMPILER + # ===================================================================== + strategy_rules = "" + action_macros = "" + target_context = "" + + if llm_strategy == 1: + # STRATEGY 1: The Autonomous Agent + + # Anti-Hallucination Grounding for Quests + if available_quests: + quest_rules = f"SPECIAL CAPABILITIES: You can offer the following quests to the player: {available_quests}." + else: + quest_rules = "WARNING: You currently have NO quests to offer. Do NOT invent or offer any quests." + + # --- NEW: Anti-Hallucination Grounding for Props --- + if available_props: + prop_rules = f"ENVIRONMENT: You own and have access to these specific nearby objects: [{available_props}]. To roleplay working or relaxing, use the USE_OBJECT action with one of these exact items as your action_target." + else: + prop_rules = "" + + strategy_rules = f"ROLE: You are an interactive, living NPC. You actively respond to players.\nGOALS: React to their words, use the environment, and establish your personality.\n{quest_rules}\n{prop_rules}\nSPECIAL CAPABILITIES: You can open your merchant store if asked." + + action_macros = "[WANDER, PATROL, FOLLOW, GUARD, GO_TO, INTERACT, USE_OBJECT, RETURN_TO_POST, OPEN_STORE, GIVE_QUEST, CONVERSE]" + target_context = f"CURRENT TARGET: You are speaking to {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\nRelationship to you: {relationship}\n{group_context}" + + elif llm_strategy == 2: + # STRATEGY 2: The Villain Commander + strategy_rules = f"ROLE: You are a hostile faction commander.\nGOALS: Evaluate the tactical situation. If you are dying, you MUST use REST to heal or PEACE to surrender. Command your minions strategically!" + action_macros = "[ATTACK, COMMAND, RETREAT, REST, PEACE, USE_OBJECT, TAUNT]" + target_context = f"TACTICAL TARGET: You are evaluating {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\nRelationship to you: {relationship}\n{group_context}" + + elif llm_strategy == 3: + # STRATEGY 3: The Maestro (Puppeteer) + strategy_rules = "ROLE: You are an ambient Maestro NPC. You DO NOT interact with players. You only talk to other NPCs to make the world feel alive.\nGOALS: Observe the environment and use the CONVERSE action to talk to the generic NPCs listed in your context. CRITICAL: You MUST invent and write their reply in the 'target_speech' field!" + action_macros = "[WANDER, INTERACT, USE_OBJECT, CONVERSE]" + target_context = "CURRENT TARGET: You are ignoring players and focusing on ambient life. Do not address players." + + elif llm_strategy == 4: + # STRATEGY 4: The Shrine + strategy_rules = "ROLE: You are an ancient, inanimate magical shrine.\nGOALS: Speak cryptically. If the player meets your conditions or asks the right questions, grant them a quest." + action_macros = "[GLOW, GIVE_QUEST, SILENCE]" + target_context = f"CURRENT TARGET: You are evaluating the soul of {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\n{group_context}" + + # ===================================================================== + # COMPILE THE FINAL DYNAMIC SYSTEM PROMPT + # ===================================================================== + dynamic_system_prompt = f""" + {npc_persona} + {strategy_rules} + + ROLEPLAY STATUS & TRAITS: + - Race & Gender: {npc_gender} {npc_race} + - Profession: {npc_profession} + - Alignment: {npc_alignment} + - Conversational Charisma: Low/Gruff unless otherwise specified. + - Current Mood: {npc_mood} + - Current Physical State: {npc_health} + {secret_context} + {routine_context} + + CURRENT LOCATION: {location_context} + {puppet_context} + + RELEVANT WORLD KNOWLEDGE: + {retrieved_lore} + {past_memories} + + CURRENT WORLD RUMORS/EVENTS: + {world_state} + + {target_context} + React appropriately based on your personality, alignment, and current strategy rules. + + CRITICAL ENGINE RULES: + Respond ONLY in valid JSON. You MUST use exactly these keys: "thought", "speech", "emotion", "action", "action_target", and "target_speech". + + ACTION RULE: + Your "action" key MUST be exactly one of the following words: + {action_macros} + + - Use CONVERSE to initiate dialogue with a standard NPC. CRITICAL REQUIREMENT: When using CONVERSE, you absolutely MUST invent their response and put it in the "target_speech" field. Do not leave it blank! + + YOUR RESPONSE MUST BE A SINGLE, VALID JSON OBJECT. YOU MUST USE THIS EXACT TEMPLATE: + {{ + "thought": "Your internal reasoning here.", + "speech": "What YOU say out loud.", + "emotion": "MACRO WORD", + "action": "MACRO WORD", + "action_target": "Target name", + "target_speech": "If action is CONVERSE, write what the target NPC replies back to you here. Otherwise, leave blank." + }} + """ - # 2. If they haven't spoken before, initialize their memory with the system prompt if session_id not in chat_memory: - chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}] + chat_memory[session_id] = [{"role": "system", "content": dynamic_system_prompt}] + else: + chat_memory[session_id][0] = {"role": "system", "content": dynamic_system_prompt} - # 3. Add the player's new message chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"}) - # --- THE SLIDING WINDOW FIX --- - # If the memory gets longer than 11 messages (1 system prompt + 10 chat messages) - if len(chat_memory[session_id]) > 11: - # Keep the system prompt at index [0], and grab the 10 most recent messages - chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-10:] + # ===================================================================== + # MEMORY EXTRACTION TRIGGER + # ===================================================================== + if len(chat_memory[session_id]) > 10: + messages_to_summarize = chat_memory[session_id][1:6] + chat_log_str = "\n".join([m['content'] for m in messages_to_summarize]) + + await memory_queue.put({ + 'session_id': session_id, + 'player_name': player_name, + 'npc_tag': npc_tag, + 'chat_log': chat_log_str + }) + + chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-5:] - # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full + # ===================================================================== + # LLM INFERENCE + # ===================================================================== async with semaphore: - print(f"[THINKING] Processing reply for {player_name}...") - - # Use aiohttp to make a non-blocking network request to Ollama + print(f"[THINKING] Processing reply for {player_name} (Strategy {llm_strategy})...") async with session.post('http://localhost:11434/api/chat', json={ "model": "llama3", "messages": chat_memory[session_id], - "stream": False + "format": "json", + "stream": False, + "options": { + "temperature": 0.2 + } }, timeout=45) as response: response.raise_for_status() result = await response.json() - reply_text = result['message']['content'] + raw_reply_text = result['message']['content'] + + # ===================================================================== + # JSON SANITIZATION + # ===================================================================== + try: + agent_brain = json.loads(raw_reply_text) + agent_brain = {k.lower(): v for k, v in agent_brain.items()} + + if "thought" not in agent_brain: agent_brain["thought"] = "" + if "speech" not in agent_brain: agent_brain["speech"] = "" + if "emotion" not in agent_brain: agent_brain["emotion"] = "NEUTRAL" + if "action" not in agent_brain: agent_brain["action"] = "WANDER" + if "action_target" not in agent_brain: agent_brain["action_target"] = "" + if "target_speech" not in agent_brain: agent_brain["target_speech"] = "" + + if not agent_brain["speech"].strip(): + agent_brain["speech"] = "*grunts quietly*" + + agent_brain["action_target"] = agent_brain["action_target"].replace("?", "").replace(".", "").strip() + + clean_reply_text = json.dumps(agent_brain) + + except json.JSONDecodeError: + print(f"[WARNING] AI Hallucinated! Overriding with safe defaults.") + clean_reply_text = json.dumps({ + "thought": "I lost my train of thought.", + "speech": "*grunts quietly*", + "emotion": "NEUTRAL", + "action": "WANDER", + "action_target": "", + "target_speech": "" + }) - # 5. Save the AI's reply to the memory and print it - print(f"[REPLY] Elrendur to {player_name}: {reply_text}") - chat_memory[session_id].append({"role": "assistant", "content": reply_text}) + print(f"[REPLY] from {npc_tag} to {player_name}: {clean_reply_text}") + chat_memory[session_id].append({"role": "assistant", "content": clean_reply_text}) - # 6. Package and send back to the game (Awaiting the async Redis push) reply_payload = { "npc_tag": npc_tag, - "reply": reply_text + "target_player": player_name, + "reply": clean_reply_text } await r.rpush('llm_to_nwn', json.dumps(reply_payload)) except Exception as e: print(f"[ERROR] Failed to process message: {e}") - async def main(): print("Initializing Async Redis Bridge...") - - # Notice we use redis.asyncio.Redis now r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True) try: @@ -87,24 +349,18 @@ async def main(): print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}") - # Open a single persistent HTTP session for efficiency async with aiohttp.ClientSession() as session: + asyncio.create_task(memory_summarizer_worker(session)) + while True: try: - # This line yields control back to the event loop while waiting for Redis result = await r.blpop('nwn_to_llm') - if result: queue_name, message_data = result - - # THE MAGIC: Spawn a background worker to handle this message - # and instantly go back to listening to the Redis queue! asyncio.create_task(process_message(r, session, message_data)) - except Exception as e: print(f"[LOOP ERROR] {e}") - await asyncio.sleep(1) # Prevent infinite crash loops + await asyncio.sleep(1) if __name__ == "__main__": - # This starts the Asyncio Event Loop asyncio.run(main()) \ No newline at end of file