import asyncio
import json
import aiohttp
-import redis.asyncio as redis # Note the new async redis import!
+import redis.asyncio as redis
+import re
+import os
+import time
+import chromadb
# --- CONFIGURATION ---
-# The "Bouncer": How many simultaneous thoughts the 5060 Ti is allowed to process at once.
-# If 10 players talk at once, 3 will process immediately, 7 will wait in the async line.
MAX_CONCURRENT_OLLAMA_REQUESTS = 3
-semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
+ALLOW_TEXT_EMOTES = False
+
+# =====================================================================
+# 1. INITIALIZE VECTOR DATABASES (Lore & Episodic Memory)
+# =====================================================================
+print("Initializing ChromaDB Vector Databases...")
+chroma_client = chromadb.PersistentClient(path="./asl_vectordb")
+
+# The Lore Database
+lore_collection = chroma_client.get_or_create_collection(name="world_lore")
+
+# The Episodic Memory Database
+memory_collection = chroma_client.get_or_create_collection(name="episodic_memories")
+memory_queue = asyncio.Queue()
-NPC_SYSTEM_PROMPT = """
-You are Elrendur Arna. You are currently residing in Alentejo Sem Lei.
-Keep your responses under 3 sentences.
-"""
+if os.path.exists("asl_lore.md"):
+ if lore_collection.count() == 0:
+ print("[VECTOR DB] Reading asl_lore.md and vectorizing chunks...")
+ with open("asl_lore.md", "r", encoding="utf-8") as f:
+ raw_lore = f.read()
+
+ lore_chunks = [chunk.strip() for chunk in raw_lore.split('\n\n') if chunk.strip()]
+
+ if lore_chunks:
+ chunk_ids = [f"lore_{i}" for i in range(len(lore_chunks))]
+ lore_collection.add(documents=lore_chunks, ids=chunk_ids)
+ print(f"[VECTOR DB] Successfully stored {len(lore_chunks)} lore chunks!")
+else:
+ print("[WARNING] asl_lore.md not found.")
-# Dictionary to keep individual player conversations separate
-# Format: {"PlayerName_NpcTag": [{"role": "system", "content": "..."}, ...]}
+semaphore = asyncio.Semaphore(MAX_CONCURRENT_OLLAMA_REQUESTS)
chat_memory = {}
+# =====================================================================
+# BACKGROUND MEMORY SUMMARIZER (The "Dream State")
+# =====================================================================
+async def memory_summarizer_worker(session):
+ print("[BACKGROUND] Memory Summarizer Worker is active.")
+ while True:
+ job = await memory_queue.get()
+ session_id = job['session_id']
+ player_name = job['player_name']
+ npc_tag = job['npc_tag']
+ chat_log = job['chat_log']
+
+ prompt = f"Summarize the key events, facts, and the emotional tone of this conversation snippet between {player_name} and {npc_tag}. Keep it to 2 brief sentences in the past tense.\nConversation Log:\n{chat_log}"
+
+ try:
+ print(f"[MEMORY DB] Generating background memory for {player_name} and {npc_tag}...")
+ async with session.post('http://localhost:11434/api/generate', json={
+ "model": "llama3",
+ "prompt": prompt,
+ "stream": False,
+ "options": {
+ "temperature": 0.2
+ }
+ }) as response:
+ result = await response.json()
+ summary = result['response'].strip()
+
+ if summary:
+ doc_id = f"{session_id}_{int(time.time())}"
+ memory_collection.add(
+ documents=[summary],
+ metadatas=[{"session_id": session_id}],
+ ids=[doc_id]
+ )
+ print(f"[MEMORY DB] Memory Saved: {summary}")
+
+ except Exception as e:
+ print(f"[MEMORY ERROR] Failed to summarize memory: {e}")
+
+ memory_queue.task_done()
+
+# =====================================================================
+# MAIN MESSAGE PROCESSOR
+# =====================================================================
async def process_message(r, session, message_data):
- """This function runs in parallel for every single message received."""
try:
data = json.loads(message_data)
- player_name = data.get('player', 'Unknown')
+
+ # --- Extract Base Contexts ---
+ player_name = data.get('player', data.get('target_player', 'Unknown'))
npc_tag = data.get('npc_tag', 'UnknownNPC')
message = data.get('message', '')
+
+ if not ALLOW_TEXT_EMOTES:
+ message = re.sub(r'\*.*?\*', '', message).strip()
+
+ player_race = data.get('player_race', 'Unknown')
+ player_alignment = data.get('player_alignment', 'Unknown')
+ nearby_players = data.get('nearby_players', '')
+ nearby_npcs = data.get('nearby_npcs', '')
+
+ npc_persona = data.get('persona', 'You are a generic citizen.')
+ npc_profession = data.get('profession', 'Commoner')
+ npc_mood = data.get('mood', 'Neutral')
+ npc_secret = data.get('secret', '')
+
+ npc_alignment = data.get('npc_alignment', 'True Neutral')
+ npc_gender = data.get('npc_gender', 'Unknown')
+ npc_race = data.get('npc_race', 'Creature')
+ npc_routine = data.get('npc_routine', '')
+
+ player_state = data.get('player_state', 'Relaxed and unarmed.')
+ world_state = data.get('world_state', 'Nothing of note is happening.')
+ npc_health = data.get('npc_health', 'Healthy and uninjured.')
+ relationship = data.get('relationship', 'Neutral or Friendly.')
+ location_context = data.get('location_context', 'You are in a generic area.')
+
+ # Core Strategy Flag (1: Agent, 2: Villain, 3: Maestro, 4: Shrine)
+ llm_strategy = int(data.get('llm_strategy', 1))
+
+
+ available_quests = data.get('available_quests', '')
+ available_props = data.get('available_props', '')
+
+ # --- Sub-Context Strings ---
+ group_context = f"Be aware that these other players are listening nearby: {nearby_players}." if nearby_players else ""
+ puppet_context = f"Nearby generic NPCs you can CONVERSE with: {nearby_npcs}" if nearby_npcs else ""
+ secret_context = f"YOUR SECRET (Reveal only if players are persuasive): {npc_secret}" if npc_secret else ""
+ routine_context = f"YOUR REQUIRED ROUTINE: {npc_routine}" if npc_routine else ""
- # 1. Create a unique session ID for this specific player and NPC interaction
session_id = f"{player_name}_{npc_tag}"
+
+ # =====================================================================
+ # DUAL RAG QUERY (Lore + Memories)
+ # =====================================================================
+ search_query = f"{location_context} {message}"
+ retrieved_lore = "No specific local lore currently relevant."
+ past_memories = ""
+
+ if lore_collection.count() > 0:
+ results = lore_collection.query(query_texts=[search_query], n_results=1)
+ if results['documents'] and results['documents'][0]:
+ retrieved_lore = f"- {results['documents'][0][0]}"
+
+ if memory_collection.count() > 0:
+ mem_results = memory_collection.query(
+ query_texts=[search_query], n_results=2, where={"session_id": session_id}
+ )
+ if mem_results['documents'] and mem_results['documents'][0]:
+ formatted_mems = "\n- ".join(mem_results['documents'][0])
+ past_memories = f"\nPAST MEMORIES OF {player_name}:\n- {formatted_mems}"
+
+ # =====================================================================
+ # STRATEGY-SPECIFIC PROMPT COMPILER
+ # =====================================================================
+ strategy_rules = ""
+ action_macros = ""
+ target_context = ""
+
+ if llm_strategy == 1:
+ # STRATEGY 1: The Autonomous Agent
+
+ # Anti-Hallucination Grounding for Quests
+ if available_quests:
+ quest_rules = f"SPECIAL CAPABILITIES: You can offer the following quests to the player: {available_quests}."
+ else:
+ quest_rules = "WARNING: You currently have NO quests to offer. Do NOT invent or offer any quests."
+
+ # --- NEW: Anti-Hallucination Grounding for Props ---
+ if available_props:
+ prop_rules = f"ENVIRONMENT: You own and have access to these specific nearby objects: [{available_props}]. To roleplay working or relaxing, use the USE_OBJECT action with one of these exact items as your action_target."
+ else:
+ prop_rules = ""
+
+ strategy_rules = f"ROLE: You are an interactive, living NPC. You actively respond to players.\nGOALS: React to their words, use the environment, and establish your personality.\n{quest_rules}\n{prop_rules}\nSPECIAL CAPABILITIES: You can open your merchant store if asked."
+
+ action_macros = "[WANDER, PATROL, FOLLOW, GUARD, GO_TO, INTERACT, USE_OBJECT, RETURN_TO_POST, OPEN_STORE, GIVE_QUEST, CONVERSE]"
+ target_context = f"CURRENT TARGET: You are speaking to {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\nRelationship to you: {relationship}\n{group_context}"
+
+ elif llm_strategy == 2:
+ # STRATEGY 2: The Villain Commander
+ strategy_rules = f"ROLE: You are a hostile faction commander.\nGOALS: Evaluate the tactical situation. If you are dying, you MUST use REST to heal or PEACE to surrender. Command your minions strategically!"
+ action_macros = "[ATTACK, COMMAND, RETREAT, REST, PEACE, USE_OBJECT, TAUNT]"
+ target_context = f"TACTICAL TARGET: You are evaluating {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\nRelationship to you: {relationship}\n{group_context}"
+
+ elif llm_strategy == 3:
+ # STRATEGY 3: The Maestro (Puppeteer)
+ strategy_rules = "ROLE: You are an ambient Maestro NPC. You DO NOT interact with players. You only talk to other NPCs to make the world feel alive.\nGOALS: Observe the environment and use the CONVERSE action to talk to the generic NPCs listed in your context. CRITICAL: You MUST invent and write their reply in the 'target_speech' field!"
+ action_macros = "[WANDER, INTERACT, USE_OBJECT, CONVERSE]"
+ target_context = "CURRENT TARGET: You are ignoring players and focusing on ambient life. Do not address players."
+
+ elif llm_strategy == 4:
+ # STRATEGY 4: The Shrine
+ strategy_rules = "ROLE: You are an ancient, inanimate magical shrine.\nGOALS: Speak cryptically. If the player meets your conditions or asks the right questions, grant them a quest."
+ action_macros = "[GLOW, GIVE_QUEST, SILENCE]"
+ target_context = f"CURRENT TARGET: You are evaluating the soul of {player_name}, a {player_alignment} {player_race}.\nTheir physical state: {player_state}\n{group_context}"
+
+ # =====================================================================
+ # COMPILE THE FINAL DYNAMIC SYSTEM PROMPT
+ # =====================================================================
+ dynamic_system_prompt = f"""
+ {npc_persona}
+ {strategy_rules}
+
+ ROLEPLAY STATUS & TRAITS:
+ - Race & Gender: {npc_gender} {npc_race}
+ - Profession: {npc_profession}
+ - Alignment: {npc_alignment}
+ - Conversational Charisma: Low/Gruff unless otherwise specified.
+ - Current Mood: {npc_mood}
+ - Current Physical State: {npc_health}
+ {secret_context}
+ {routine_context}
+
+ CURRENT LOCATION: {location_context}
+ {puppet_context}
+
+ RELEVANT WORLD KNOWLEDGE:
+ {retrieved_lore}
+ {past_memories}
+
+ CURRENT WORLD RUMORS/EVENTS:
+ {world_state}
+
+ {target_context}
+ React appropriately based on your personality, alignment, and current strategy rules.
+
+ CRITICAL ENGINE RULES:
+ Respond ONLY in valid JSON. You MUST use exactly these keys: "thought", "speech", "emotion", "action", "action_target", and "target_speech".
+
+ ACTION RULE:
+ Your "action" key MUST be exactly one of the following words:
+ {action_macros}
+
+ - Use CONVERSE to initiate dialogue with a standard NPC. CRITICAL REQUIREMENT: When using CONVERSE, you absolutely MUST invent their response and put it in the "target_speech" field. Do not leave it blank!
+
+ YOUR RESPONSE MUST BE A SINGLE, VALID JSON OBJECT. YOU MUST USE THIS EXACT TEMPLATE:
+ {{
+ "thought": "Your internal reasoning here.",
+ "speech": "What YOU say out loud.",
+ "emotion": "MACRO WORD",
+ "action": "MACRO WORD",
+ "action_target": "Target name",
+ "target_speech": "If action is CONVERSE, write what the target NPC replies back to you here. Otherwise, leave blank."
+ }}
+ """
- # 2. If they haven't spoken before, initialize their memory with the system prompt
if session_id not in chat_memory:
- chat_memory[session_id] = [{"role": "system", "content": NPC_SYSTEM_PROMPT}]
+ chat_memory[session_id] = [{"role": "system", "content": dynamic_system_prompt}]
+ else:
+ chat_memory[session_id][0] = {"role": "system", "content": dynamic_system_prompt}
- # 3. Add the player's new message to their specific history
- print(f"\n[RECEIVED] {player_name} -> {npc_tag}: '{message}'")
chat_memory[session_id].append({"role": "user", "content": f"{player_name} says: {message}"})
- # 4. The Semaphore Bouncer: Wait in line if the GPU is currently full
- async with semaphore:
- print(f"[THINKING] Processing reply for {player_name}...")
+ # =====================================================================
+ # MEMORY EXTRACTION TRIGGER
+ # =====================================================================
+ if len(chat_memory[session_id]) > 10:
+ messages_to_summarize = chat_memory[session_id][1:6]
+ chat_log_str = "\n".join([m['content'] for m in messages_to_summarize])
- # Use aiohttp to make a non-blocking network request to Ollama
+ await memory_queue.put({
+ 'session_id': session_id,
+ 'player_name': player_name,
+ 'npc_tag': npc_tag,
+ 'chat_log': chat_log_str
+ })
+
+ chat_memory[session_id] = [chat_memory[session_id][0]] + chat_memory[session_id][-5:]
+
+ # =====================================================================
+ # LLM INFERENCE
+ # =====================================================================
+ async with semaphore:
+ print(f"[THINKING] Processing reply for {player_name} (Strategy {llm_strategy})...")
async with session.post('http://localhost:11434/api/chat', json={
"model": "llama3",
"messages": chat_memory[session_id],
- "stream": False
+ "format": "json",
+ "stream": False,
+ "options": {
+ "temperature": 0.2
+ }
}, timeout=45) as response:
response.raise_for_status()
result = await response.json()
- reply_text = result['message']['content']
+ raw_reply_text = result['message']['content']
+
+ # =====================================================================
+ # JSON SANITIZATION
+ # =====================================================================
+ try:
+ agent_brain = json.loads(raw_reply_text)
+ agent_brain = {k.lower(): v for k, v in agent_brain.items()}
+
+ if "thought" not in agent_brain: agent_brain["thought"] = ""
+ if "speech" not in agent_brain: agent_brain["speech"] = ""
+ if "emotion" not in agent_brain: agent_brain["emotion"] = "NEUTRAL"
+ if "action" not in agent_brain: agent_brain["action"] = "WANDER"
+ if "action_target" not in agent_brain: agent_brain["action_target"] = ""
+ if "target_speech" not in agent_brain: agent_brain["target_speech"] = ""
+
+ if not agent_brain["speech"].strip():
+ agent_brain["speech"] = "*grunts quietly*"
+
+ agent_brain["action_target"] = agent_brain["action_target"].replace("?", "").replace(".", "").strip()
+
+ clean_reply_text = json.dumps(agent_brain)
+
+ except json.JSONDecodeError:
+ print(f"[WARNING] AI Hallucinated! Overriding with safe defaults.")
+ clean_reply_text = json.dumps({
+ "thought": "I lost my train of thought.",
+ "speech": "*grunts quietly*",
+ "emotion": "NEUTRAL",
+ "action": "WANDER",
+ "action_target": "",
+ "target_speech": ""
+ })
- # 5. Save the AI's reply to the memory and print it
- print(f"[REPLY] Elrendur to {player_name}: {reply_text}")
- chat_memory[session_id].append({"role": "assistant", "content": reply_text})
+ print(f"[REPLY] from {npc_tag} to {player_name}: {clean_reply_text}")
+ chat_memory[session_id].append({"role": "assistant", "content": clean_reply_text})
- # 6. Package and send back to the game (Awaiting the async Redis push)
reply_payload = {
"npc_tag": npc_tag,
- "reply": reply_text
+ "target_player": player_name,
+ "reply": clean_reply_text
}
await r.rpush('llm_to_nwn', json.dumps(reply_payload))
except Exception as e:
print(f"[ERROR] Failed to process message: {e}")
-
async def main():
print("Initializing Async Redis Bridge...")
-
- # Notice we use redis.asyncio.Redis now
r = redis.Redis(host='127.0.0.1', port=6380, decode_responses=True)
try:
print(f"Ready! Listening for game messages. Max GPU concurrency: {MAX_CONCURRENT_OLLAMA_REQUESTS}")
- # Open a single persistent HTTP session for efficiency
async with aiohttp.ClientSession() as session:
+ asyncio.create_task(memory_summarizer_worker(session))
+
while True:
try:
- # This line yields control back to the event loop while waiting for Redis
result = await r.blpop('nwn_to_llm')
-
if result:
queue_name, message_data = result
-
- # THE MAGIC: Spawn a background worker to handle this message
- # and instantly go back to listening to the Redis queue!
asyncio.create_task(process_message(r, session, message_data))
-
except Exception as e:
print(f"[LOOP ERROR] {e}")
- await asyncio.sleep(1) # Prevent infinite crash loops
+ await asyncio.sleep(1)
if __name__ == "__main__":
- # This starts the Asyncio Event Loop
asyncio.run(main())
\ No newline at end of file