| """ |
| Hook BCryptDecrypt using ctypes in-process hooking via DLL detour. |
| Instead of Frida, we directly hook BCryptDecrypt's IAT entry in oneocr.dll. |
| """ |
| import ctypes |
| import ctypes.wintypes as wt |
| from ctypes import ( |
| c_int64, c_char_p, c_ubyte, POINTER, byref, Structure, |
| c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8 |
| ) |
| import os |
| import sys |
| import struct |
| from pathlib import Path |
|
|
| OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" |
| MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") |
| KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
| |
| intercepted_calls = [] |
| decrypt_call_num = 0 |
|
|
| |
| |
| |
| |
|
|
| BCRYPT_DECRYPT_TYPE = WINFUNCTYPE( |
| c_ulong, |
| c_void_p, |
| c_void_p, |
| c_ulong, |
| c_void_p, |
| c_void_p, |
| c_ulong, |
| c_void_p, |
| c_ulong, |
| POINTER(c_ulong), |
| c_ulong, |
| ) |
|
|
| |
| original_bcrypt_decrypt = None |
|
|
|
|
| def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
| pbOutput, cbOutput, pcbResult, dwFlags): |
| """Our hook that intercepts BCryptDecrypt calls.""" |
| global decrypt_call_num |
| |
| call_num = decrypt_call_num |
| decrypt_call_num += 1 |
| |
| |
| iv_before = None |
| if pbIV and cbIV > 0: |
| try: |
| iv_before = ctypes.string_at(pbIV, cbIV) |
| except: |
| pass |
| |
| |
| encrypted_input = None |
| if pbInput and cbInput > 0: |
| try: |
| encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64)) |
| except: |
| pass |
|
|
| |
| status = original_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, |
| pbIV, cbIV, pbOutput, cbOutput, |
| pcbResult, dwFlags) |
| |
| |
| result_size = 0 |
| if pcbResult: |
| result_size = pcbResult[0] |
| |
| |
| iv_after = None |
| if pbIV and cbIV > 0: |
| try: |
| iv_after = ctypes.string_at(pbIV, cbIV) |
| except: |
| pass |
| |
| info = { |
| 'call': call_num, |
| 'status': status, |
| 'cbInput': cbInput, |
| 'cbIV': cbIV, |
| 'cbOutput': result_size, |
| 'dwFlags': dwFlags, |
| 'iv_before': iv_before.hex() if iv_before else None, |
| 'iv_after': iv_after.hex() if iv_after else None, |
| } |
| |
| print(f"[BCryptDecrypt #{call_num}] status={status:#x} " |
| f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}") |
| if encrypted_input: |
| print(f" Encrypted input[:32]: {encrypted_input[:32].hex()}") |
| print(f" pbInput addr: {pbInput:#x}") |
| if iv_before: |
| print(f" IV before: {iv_before.hex()}") |
| if iv_after and iv_after != iv_before: |
| print(f" IV after: {iv_after.hex()}") |
| |
| |
| if status == 0 and result_size > 0 and pbOutput: |
| try: |
| decrypted = ctypes.string_at(pbOutput, result_size) |
| |
| |
| if len(decrypted) >= 4: |
| magic = struct.unpack('<I', decrypted[:4])[0] |
| info['magic'] = magic |
| print(f" Magic: {magic} | First 32 bytes: {decrypted[:32].hex()}") |
| |
| if magic == 1: |
| print(f" *** MAGIC NUMBER == 1 FOUND! ***") |
| |
| |
| fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin" |
| fname.write_bytes(decrypted) |
| print(f" -> Saved: {fname.name} ({result_size:,} bytes)") |
| |
| except Exception as e: |
| print(f" Error reading output: {e}") |
| |
| intercepted_calls.append(info) |
| return status |
|
|
|
|
| def hook_iat(dll_handle, target_dll_name, target_func_name, hook_func): |
| """ |
| Hook a function by patching the Import Address Table (IAT) of a DLL. |
| Returns the original function pointer. |
| """ |
| import pefile |
| |
| |
| kernel32 = ctypes.windll.kernel32 |
| buf = ctypes.create_unicode_buffer(260) |
| h = ctypes.c_void_p(dll_handle) |
| kernel32.GetModuleFileNameW(h, buf, 260) |
| dll_path = buf.value |
| |
| print(f"Analyzing IAT of: {dll_path}") |
| |
| pe = pefile.PE(dll_path) |
| |
| |
| base_addr = dll_handle |
| if hasattr(dll_handle, '_handle'): |
| base_addr = dll_handle._handle |
| |
| for entry in pe.DIRECTORY_ENTRY_IMPORT: |
| import_name = entry.dll.decode('utf-8', errors='ignore').lower() |
| if target_dll_name.lower() not in import_name: |
| continue |
| |
| for imp in entry.imports: |
| if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name: |
| |
| iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase |
| iat_addr = base_addr + iat_rva |
| |
| print(f"Found {target_func_name} in IAT at RVA={iat_rva:#x}, " |
| f"VA={iat_addr:#x}") |
| |
| |
| original_ptr = ctypes.c_void_p() |
| ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8) |
| print(f"Original function pointer: {original_ptr.value:#x}") |
| |
| |
| callback = BCRYPT_DECRYPT_TYPE(hook_func) |
| callback_ptr = ctypes.cast(callback, c_void_p).value |
| |
| |
| old_protect = c_ulong() |
| PAGE_READWRITE = 0x04 |
| kernel32.VirtualProtect( |
| ctypes.c_void_p(iat_addr), 8, |
| PAGE_READWRITE, ctypes.byref(old_protect) |
| ) |
| |
| |
| new_ptr = ctypes.c_void_p(callback_ptr) |
| ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8) |
| |
| |
| kernel32.VirtualProtect( |
| ctypes.c_void_p(iat_addr), 8, |
| old_protect.value, ctypes.byref(old_protect) |
| ) |
| |
| print(f"IAT patched! New function pointer: {callback_ptr:#x}") |
| |
| |
| original_func = BCRYPT_DECRYPT_TYPE(original_ptr.value) |
| |
| pe.close() |
| return original_func, callback |
| |
| pe.close() |
| return None, None |
|
|
|
|
| def main(): |
| global original_bcrypt_decrypt |
| |
| print("=" * 70) |
| print("IN-PROCESS BCryptDecrypt HOOKING") |
| print("=" * 70) |
| |
| |
| kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
| kernel32.SetDllDirectoryW(DLL_DIR) |
| |
| dll_path = os.path.join(DLL_DIR, "oneocr.dll") |
| print(f"Loading: {dll_path}") |
| dll = ctypes.WinDLL(dll_path) |
| |
| |
| dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] |
| dll.CreateOcrInitOptions.restype = c_int64 |
| dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte] |
| dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 |
| dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] |
| dll.CreateOcrPipeline.restype = c_int64 |
| |
| |
| print("\n--- Setting up BCryptDecrypt hook ---") |
| |
| |
| bcrypt_dll = ctypes.WinDLL("bcrypt") |
| real_decrypt_addr = ctypes.cast( |
| bcrypt_dll.BCryptDecrypt, c_void_p |
| ).value |
| print(f"Real BCryptDecrypt address: {real_decrypt_addr:#x}") |
| |
| |
| |
| |
| |
| |
| |
| try: |
| import pefile |
| print("pefile available, trying IAT hook...") |
| |
| original_bcrypt_decrypt_func, callback_ref = hook_iat( |
| dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt |
| ) |
| |
| if original_bcrypt_decrypt_func: |
| original_bcrypt_decrypt = original_bcrypt_decrypt_func |
| print("IAT hook installed successfully!") |
| else: |
| raise Exception("IAT hook failed - function not found in imports") |
| |
| except ImportError: |
| print("pefile not available, installing...") |
| os.system("uv pip install pefile") |
| import pefile |
| |
| original_bcrypt_decrypt_func, callback_ref = hook_iat( |
| dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt |
| ) |
| |
| if original_bcrypt_decrypt_func: |
| original_bcrypt_decrypt = original_bcrypt_decrypt_func |
| else: |
| print("ERROR: Could not hook BCryptDecrypt") |
| return |
| |
| |
| print("\n--- Creating OCR Pipeline (will trigger BCryptDecrypt) ---") |
| |
| init_options = c_int64() |
| ret = dll.CreateOcrInitOptions(byref(init_options)) |
| print(f"CreateOcrInitOptions: {ret}") |
| |
| ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) |
| print(f"SetUseModelDelayLoad: {ret}") |
| |
| pipeline = c_int64() |
| model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) |
| key_buf = ctypes.create_string_buffer(KEY) |
| |
| print(f"\nCalling CreateOcrPipeline...") |
| print(f"Model: {MODEL_PATH}") |
| print(f"Key: {KEY}") |
| print() |
| |
| ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) |
| |
| print(f"\nCreateOcrPipeline returned: {ret}") |
| print(f"Pipeline handle: {pipeline.value}") |
| |
| |
| print() |
| print("=" * 70) |
| print("SUMMARY") |
| print("=" * 70) |
| print(f"Total BCryptDecrypt calls intercepted: {len(intercepted_calls)}") |
| |
| magic_1_files = [] |
| for info in intercepted_calls: |
| if info.get('magic') == 1: |
| magic_1_files.append(info) |
| |
| if magic_1_files: |
| print(f"\n*** Found {len(magic_1_files)} calls with magic_number == 1! ***") |
| for info in magic_1_files: |
| print(f" Call #{info['call']}: input={info['cbInput']:,}, " |
| f"output={info['cbOutput']:,}") |
| |
| |
| if OUTPUT_DIR.exists(): |
| files = sorted(OUTPUT_DIR.glob("decrypt_*.bin")) |
| if files: |
| print(f"\nSaved {len(files)} decrypted buffers:") |
| total = 0 |
| for f in files: |
| sz = f.stat().st_size |
| total += sz |
| header = open(f, 'rb').read(4) |
| magic = struct.unpack('<I', header)[0] if len(header) >= 4 else -1 |
| marker = " *** MAGIC=1 ***" if magic == 1 else "" |
| print(f" {f.name}: {sz:,} bytes (magic={magic}){marker}") |
| print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)") |
| |
| print("\nDone!") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|