diff --git a/nitrogen/game_env.py b/nitrogen/game_env.py index f55edd4..982c1d7 100644 --- a/nitrogen/game_env.py +++ b/nitrogen/game_env.py @@ -21,44 +21,45 @@ import win32gui import win32api import win32con + def get_process_info(process_name): """ Get process information for a given process name on Windows. - + Args: process_name (str): Name of the process (e.g., "isaac-ng.exe") - + Returns: list: List of dictionaries containing PID, window_name, and architecture for each matching process. Returns empty list if no process found. """ results = [] - + # Find all processes with the given name for proc in psutil.process_iter(['pid', 'name']): try: if proc.info['name'].lower() == process_name.lower(): pid = proc.info['pid'] - + # Get architecture try: # Check if process is 32-bit or 64-bit process_handle = win32api.OpenProcess( - win32con.PROCESS_QUERY_INFORMATION, - False, + win32con.PROCESS_QUERY_INFORMATION, + False, pid ) is_wow64 = win32process.IsWow64Process(process_handle) win32api.CloseHandle(process_handle) - + # On 64-bit Windows: WOW64 means "Windows 32-bit on Windows 64-bit", i.e. a 32-bit process architecture = "x86" if is_wow64 else "x64" except: architecture = "unknown" - + # Find windows associated with this PID windows = [] - + def enum_window_callback(hwnd, pid_to_find): _, found_pid = win32process.GetWindowThreadProcessId(hwnd) if found_pid == pid_to_find: @@ -70,13 +71,13 @@ def get_process_info(process_name): 'visible': win32gui.IsWindowVisible(hwnd) }) return True - + # Find all windows for this PID try: win32gui.EnumWindows(enum_window_callback, pid) except: pass - + # Choose the best window window_name = None if windows: @@ -85,31 +86,31 @@ def get_process_info(process_name): print("Using heuristics to select the correct window...") # Filter out common proxy/helper windows proxy_keywords = ['d3dproxywindow', 'proxy', 'helper', 'overlay'] - + # First try to find a visible window without proxy keywords for win in windows: if not any(keyword in win['title'].lower() for keyword in proxy_keywords): window_name = win['title'] break - + # If no good window found, just use the first one if window_name is None and windows: window_name = windows[0]['title'] - + results.append({ 'pid': pid, 'window_name': window_name, 'architecture': architecture }) - + except (psutil.NoSuchProcess, psutil.AccessDenied): continue - + if len(results) == 0: raise ValueError(f"No process found with name: {process_name}") elif len(results) > 1: print(f"Warning: Multiple processes found with name '{process_name}'. Returning first match.") - + return results[0] @@ -338,6 +339,7 @@ class GamepadEmulator: self.gamepad.reset() self.gamepad.update() + class PyautoguiScreenshotBackend: def __init__(self, bbox): @@ -346,15 +348,17 @@ class PyautoguiScreenshotBackend: def screenshot(self): return pyautogui.screenshot(region=self.bbox) + class DxcamScreenshotBackend: - def __init__(self, bbox): + def __init__(self, bbox, fps): import dxcam self.camera = dxcam.create() self.bbox = bbox self.last_screenshot = None + self.camera.start(region=self.bbox, target_fps=fps, video_mode=True) def screenshot(self): - screenshot = self.camera.grab(region=self.bbox) + screenshot = self.camera.get_latest_frame() if screenshot is None: print("DXCAM failed to capture frame, trying to use the latest screenshot") if self.last_screenshot is not None: @@ -381,15 +385,15 @@ class GamepadEnv(Env): """ def __init__( - self, - game, - image_height=1440, - image_width=2560, - controller_type="xbox", - game_speed=1.0, - env_fps=10, - async_mode=True, - screenshot_backend="dxcam", + self, + game, + image_height=1440, + image_width=2560, + controller_type="xbox", + game_speed=1.0, + env_fps=10, + async_mode=True, + screenshot_backend="dxcam", ): super().__init__() @@ -414,12 +418,12 @@ class GamepadEnv(Env): self.game_arch = proc_info["architecture"] self.game_window_name = proc_info["window_name"] - print(f"Game process found: {self.game} (PID: {self.game_pid}, Arch: {self.game_arch}, Window: {self.game_window_name})") + print( + f"Game process found: {self.game} (PID: {self.game_pid}, Arch: {self.game_arch}, Window: {self.game_window_name})") if self.game_pid is None: raise Exception(f"Could not find PID for game: {game}") - self.observation_space = Box( low=0, high=255, shape=(self.image_height, self.image_width, 3), dtype="uint8" ) @@ -464,20 +468,19 @@ class GamepadEnv(Env): self.game_window.activate() l, t, r, b = self.game_window.left, self.game_window.top, self.game_window.right, self.game_window.bottom - self.bbox = (l, t, r-l, b-t) + self.bbox = (l, t, r - l, b - t) # Initialize speedhack client if using DLL injection self.speedhack_client = xsh.Client(process_id=self.game_pid, arch=self.game_arch) # Get the screenshot backend if screenshot_backend == "dxcam": - self.screenshot_backend = DxcamScreenshotBackend(self.bbox) + self.screenshot_backend = DxcamScreenshotBackend(self.bbox, self.env_fps) elif screenshot_backend == "pyautogui": self.screenshot_backend = PyautoguiScreenshotBackend(self.bbox) else: raise ValueError("Unsupported screenshot backend. Use 'dxcam' or 'pyautogui'.") - def calculate_step_duration(self): """ Calculate the step duration based on game speed and environment FPS.