复制
id: CVE-2025-9242
info:
name: WatchGuard IKEv2 Out-of-Bounds Write Vulnerability
author: pussycat0x,DhiyaneshDK,watchTowr
severity: critical
description: |
WatchGuard Fireware OS 11.10.2 to 11.12.4_Update1, 12.0 to 12.11.3, and 2025.1 contains an out-of-bounds write caused by improper handling in Mobile User VPN and Branch Office VPN with IKEv2 dynamic gateway peer, letting remote unauthenticated attackers execute arbitrary code.
impact: |
Remote unauthenticated attackers can execute arbitrary code, potentially leading to full system compromise.
remediation: |
Update to the latest Fireware OS version beyond 2025.1.
reference:
- https://github.com/watchtowrlabs/watchTowr-vs-WatchGuard-CVE-2025-9242/
- https://labs.watchtowr.com/yikes-watchguard-fireware-os-ikev2-out-of-bounds-write-cve-2025-9242
classification:
cvss-metrics: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H
cvss-score: 9.8
cve-id: CVE-2025-9242
cwe-id: CWE-787,CWE-362
epss-score: 0.055
epss-percentile: 0.89765
cpe: cpe:2.3:a:watchguard:fireware_os:*:*:*:*:*:*:*
metadata:
verified: true
shodan-query: html:"Watchguard" html:"Authentication Server"
tags: cve,cve2025,watchguard,ikev2,rce,passive,vpn
variables:
HOST: "{{Host}}"
PORT: "{{Port}}"
code:
- engine:
- py
- python3
source: |
import socket
import struct
import random
import enum
import hashlib
import hmac
import logging
import base64
import binascii
import re
import sys
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
FW_VERSION = None
BUILD_NUMBER = None
class WatchGuardFw:
ADDRESSES = {
'12.11.3': {
'pop_rcx_ret': 0x4225ab,
'mov_rax_rcx_ret': 0x5a4fac,
'mov_rbp_rsp_call_rax': 0x42008d,
'pop_r13_ret': 0x594ac4,
'mov_rax_rbp_pop_rbx_pop_rbp_ret': 0x598d69,
'sub_rax_rcx_ret': 0x5a4fd8,
'push_rax_mov_rax_rbx_pop_rbx_ret': 0x5a4468,
'mov_rdi_rbx_call_rax': 0x42fce4,
'pop_rsi_ret': 0x508ece,
'pop_rdx_ret': 0x483a4a,
'mov_rax_rax_ret': 0x5b145e,
'jmp_rax': 0x41908f,
'jmp_rbx': 0x449ba3,
'offset_data': 0x00,
'offset_shellcode': 0x30,
'offset_stack': 0x340,
'offset_stack_page_aligned': 0x0cc8,
'offset_bind_mprotect': 0x5ea0,
'got_bind': 0x658028,
},
}
@staticmethod
def version_tuple(v):
return tuple(map(int, (v.split("."))))
class PayloadType(enum.IntEnum):
NONE = 0
TRANSFORM = 3
SECURITY_ASSOCIATION = 33
KEY_EXCHANGE = 34
IDENTIFIER_INITIATOR = 35
IDENTIFIER_RESPONDER = 36
CERTIFICATE = 37
CERTIFICATE_REQUEST = 38
AUTHENTICATION = 39
NONCE = 40
NOTIFY = 41
DELETE = 42
VENDOR_ID = 43
TRAFFIC_SELECTOR_INITIATOR = 44
TRAFFIC_SELECTOR_RESPONDER = 45
ENCRYPTED = 46
CONFIGURATION = 47
EXTENSIBLE_AUTHENTICATION = 48
class ExchangeType(enum.IntEnum):
IKE_SA_INIT = 34
IKE_AUTH = 35
CREATE_CHILD_SA = 36
INFORMATIONAL = 37
class TransformType(enum.IntEnum):
ENCRYPTION_ALGORITHM = 1
PSEUDO_RANDOM_FUNCTION = 2
INTEGRITY_ALGORITHM = 3
DIFFIE_HELLMAN_GROUP = 4
class NotifyType(enum.IntEnum):
UNSUPPORTED_CRITICAL_PAYLOAD = 1
INVALID_IKE_SPI = 4
INVALID_MAJOR_VERSION = 5
INVALID_SYNTAX = 7
INVALID_MESSAGE_ID = 9
INVALID_SPI = 11
NO_PROPOSAL_CHOSEN = 14
INVALID_KE_PAYLOAD = 17
AUTHENTICATION_FAILED = 24
SINGLE_PAIR_REQUIRED = 34
NO_ADDITIONAL_SAS = 35
INTERNAL_ADDRESS_FAILURE = 36
FAILED_CP_REQUIRED = 37
TS_UNACCEPTABLE = 38
INVALID_SELECTORS = 39
INITIAL_CONTACT = 16384
SET_WINDOW_SIZE = 16385
ADDITIONAL_TS_POSSIBLE = 16386
IPCOMP_SUPPORTED = 16387
NAT_DETECTION_SOURCE_IP = 16388
NAT_DETECTION_DESTINATION_IP = 16389
COOKIE = 16390
USE_TRANSPORT_MODE = 16391
HTTP_CERT_LOOKUP_SUPPORTED = 16392
REKEY_SA = 16393
ESP_TFC_PADDING_NOT_SUPPORTED = 16394
NON_FIRST_FRAGMENTS_ALSO = 16395
MOBIKE_SUPPORTED = 16396
MULTIPLE_AUTH_SUPPORTED = 16404
REDIRECT_SUPPORTED = 16406
IKEV2_FRAGMENTATION_SUPPORTED = 16430
SIGNATURE_HASH_ALGORITHMS = 16431
class EncryptionAlgorithm(enum.IntEnum):
ENCR_DES_IV64 = 1
ENCR_DES = 2
ENCR_3DES = 3
ENCR_RC5 = 4
ENCR_IDEA = 5
ENCR_CAST = 6
ENCR_BLOWFISH = 7
ENCR_3IDEA = 8
ENCR_DES_IV32 = 9
RESERVED = 10
ENCR_NULL = 11
ENCR_AES_CBC = 12
ENCR_AES_CTR = 13
ENCR_AES_CCM_8 = 14
ENCR_AES_CCM_12 = 15
ENCR_AES_CCM_16 = 16
ENCR_AES_GCM_8 = 18
ENCR_AES_GCM_12 = 19
ENCR_AES_GCM_16 = 20
ENCR_NULL_AUTH_AES_GMAC = 21
P1619_XTS_AES = 22
ENCR_CAMELLIA_CBC = 23
ENCR_CAMELLIA_CTR = 24
ENCR_CAMELLIA_CCM_8 = 25
ENCR_CAMELLIA_CCM_12 = 26
ENCR_CAMELLIA_CCM_16 = 27
ENCR_CHACHA20_POLY1305 = 28
ENCR_AES_CCM_8_IIV = 29
ENCR_AES_GCM_16_IIV = 30
ENCR_CHACHA20_POLY1305_IIV = 31
ENCR_KUZNYECHIK_MGM_KTREE = 32
ENCR_MAGMA_MGM_KTREE = 33
ENCR_KUZNYECHIK_MGM_MAC_KTREE = 34
ENCR_MAGMA_MGM_MAC_KTREE = 35
class PseudoRandomFunction(enum.IntEnum):
PRF_HMAC_MD5 = 1
PRF_HMAC_SHA1 = 2
PRF_HMAC_TIGER = 3
PRF_AES128_XCBC = 4
PRF_HMAC_SHA2_256 = 5
PRF_HMAC_SHA2_384 = 6
PRF_HMAC_SHA2_512 = 7
PRF_AES128_CMAC = 8
PRF_HMAC_STREEBOG_512 = 9
class IntegrityAlgorithm(enum.IntEnum):
AUTH_HMAC_MD5_96 = 1
AUTH_HMAC_SHA1_96 = 2
AUTH_DES_MAC = 3
AUTH_KPDK_MD5 = 4
AUTH_AES_XCBC_96 = 5
AUTH_HMAC_MD5_128 = 6
AUTH_HMAC_SHA1_160 = 7
AUTH_AES_CMAC_96 = 8
AUTH_AES_128_GMAC = 9
AUTH_AES_192_GMAC = 10
AUTH_AES_256_GMAC = 11
AUTH_HMAC_SHA2_256_128 = 12
AUTH_HMAC_SHA2_384_192 = 13
AUTH_HMAC_SHA2_512_256 = 14
class DiffieHellmanGroup(enum.IntEnum):
DH_GROUP_2048_BIT_MODP = 14
DH_GROUP_768_BIT_MODP = 1
DH_GROUP_1024_BIT_MODP = 2
DH_GROUP_1536_BIT_MODP = 5
DH_GROUP_3072_BIT_MODP = 15
DH_GROUP_4096_BIT_MODP = 16
DH_GROUP_6144_BIT_MODP = 17
DH_GROUP_8192_BIT_MODP = 18
RANDOM_ECP_GROUP_256_BIT = 19
RANDOM_ECP_GROUP_384_BIT = 20
RANDOM_ECP_GROUP_521_BIT = 21
DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP = 22
DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP = 23
DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP = 24
RANDOM_ECP_GROUP_192_BIT = 25
RANDOM_ECP_GROUP_224_BIT = 26
BRAINPOOLP224R1 = 27
BRAINPOOLP256R1 = 28
BRAINPOOLP384R1 = 29
BRAINPOOLP512R1 = 30
CURVE25519 = 31
CURVE448 = 32
GOST3410_2012_256 = 33
GOST3410_2012_512 = 34
ML_KEM_512 = 35
ML_KEM_768 = 36
ML_KEM_1024 = 37
class IkePacker:
@staticmethod
def header(initiator_spi, responder_spi, next_payload, exchange_type, message_id, version=0x20, flags = 0x08):
return struct.pack('>QQBBBBII', initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, 0)
@staticmethod
def payload_header(next_payload, payload, critical = 0):
return struct.pack('>BBH', next_payload, critical, 4 + len(payload)) + payload
@staticmethod
def security_association(next_payload, proposal):
return IkePacker.payload_header(next_payload, proposal)
@staticmethod
def proposal(next_payload, number, id, transforms = [], spi = b''):
b_transforms = b''
for transform in transforms:
if isinstance(transform, IkeTransform):
b_transforms += transform.pack()
continue
b_transforms += transform
return IkePacker.payload_header(next_payload, struct.pack('>BBBB', number, id, len(spi), len(transforms)) + spi + b_transforms)
@staticmethod
def key_exchange(next_payload, dh_group, data, reserved = 0):
return IkePacker.payload_header(next_payload, struct.pack('>HH', dh_group, reserved) + data)
@staticmethod
def nonce(next_payload, nonce):
return IkePacker.payload_header(next_payload, nonce)
@staticmethod
def notify(next_payload, notify_type, data = b'', protocol_id = 1, spi_size = 0):
return IkePacker.payload_header(next_payload, struct.pack('>BBH', protocol_id, spi_size, notify_type) + data)
@staticmethod
def vendor_id(next_payload, vendor_id):
return IkePacker.payload_header(next_payload, vendor_id)
class IkeTransform:
def __init__(self, next_payload, transform_type, transform_id, transform_attributes=None, reserved=0):
self.next_payload = next_payload
self.transform_type = transform_type
self.reserved = reserved
self.transform_id = transform_id
self.transform_attributes = transform_attributes
def pack(self):
transform = struct.pack('>BBH', self.transform_type, self.reserved, self.transform_id)
if self.transform_attributes:
transform += struct.pack('>I', self.transform_attributes)
return IkePacker.payload_header(self.next_payload, transform)
class IkeResponse:
def __init__(self, initiator_spi, responder_spi, version, exchange_type, flags, message_id, length, payloads):
self.initiator_spi = initiator_spi
self.responder_spi = responder_spi
self.version = version
self.exchange_type = exchange_type
self.flags = flags
self.message_id = message_id
self.length = length
self.payloads = payloads
def get(self, type):
payloads = []
for payload in self.payloads:
if payload['type'] == type:
payloads.append(payload['payload'])
return payloads
class IkeUnpacker:
@staticmethod
def unpack_payload_header(data):
return struct.unpack('>BBH', data[:4])
@staticmethod
def unpack(response):
initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, length = struct.unpack('>QQBBBBII', response[:28])
raw_payloads = response[28:]
i = 0
payload_type = next_payload
payloads = []
while i < len(raw_payloads):
next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(raw_payloads[i:i+4])
payload = raw_payloads[i+4:i+payload_length]
if payload_type == PayloadType.VENDOR_ID:
payload = payload
payloads.append({
'type': payload_type,
'critical': critical,
'payload': payload
})
i += payload_length
payload_type = next_payload
return IkeResponse(
initiator_spi,
responder_spi,
version,
exchange_type,
flags,
message_id,
length,
payloads
)
class IkeCrypto:
def __init__(self):
self.initiator_nonce = self.generate_nonce(32)
self.initiator_spi = random.randint(0, 0xffffffffffffffff)
self.responder_spi = 0x0000000000000000
self.client_private_key = None
self.client_public_key = None
self.client_public_key_bytes = None
self.generate_keys = self.dh_modp_14_generate_keys
self.generate_keys()
def generate_nonce(self, length=32):
return random.getrandbits(length * 8).to_bytes(length, 'big')
def dh_modp_14_generate_keys(self):
p_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF'
self.p = int(p_hex, 16)
if self.p.bit_length() != 2048:
self.p = self.p >> (self.p.bit_length() - 2048)
g = 2
self.client_private_key = random.randint(1, self.p - 1)
self.client_public_key = pow(g, self.client_private_key, self.p)
public_key_bit_length = self.client_public_key.bit_length()
public_key_byte_length = (public_key_bit_length + 7) // 8
if public_key_byte_length > 256:
raise ValueError(f"Public key too large: {public_key_byte_length} bytes (max 256)")
self.client_public_key_bytes = self.client_public_key.to_bytes(public_key_byte_length, 'big').rjust(256, b'\x00')
class IkeException(Exception):
pass
class IkeNoProposalChosenException(IkeException):
pass
class IkeInvalidKEPayloadException(IkeException):
pass
class Ike:
def __init__(self, ip, port, timeout = 5):
self.ip = ip
self.port = port
self.timeout = timeout
self.crypto = None
self.sock = None
self.reset()
@staticmethod
def update_request_length(request):
total_length = len(request)
return request[:24] + struct.pack('>I', total_length) + request[28:]
def reset(self):
if self.sock != None:
self.close()
self.crypto = IkeCrypto()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(self.timeout)
def send(self, packet):
self.sock.sendto(packet, (self.ip, self.port))
response, addr = self.sock.recvfrom(65535)
return IkeUnpacker.unpack(response)
def close(self):
self.sock.close()
self.sock = None
def sa_init(self):
global FW_VERSION, BUILD_NUMBER
dh_group = DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value
key_exchange = self.crypto.client_public_key_bytes
transforms = [
IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100),
IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value),
IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value),
IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value),
]
r = self.send(Ike.update_request_length(
IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.SECURITY_ASSOCIATION, ExchangeType.IKE_SA_INIT, 0) +
IkePacker.security_association(PayloadType.KEY_EXCHANGE, IkePacker.proposal(PayloadType.NONE, 1, 1, transforms)) +
IkePacker.key_exchange(PayloadType.NONCE, dh_group, key_exchange) +
IkePacker.nonce(PayloadType.NOTIFY, self.crypto.initiator_nonce) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.NAT_DETECTION_DESTINATION_IP, bytes.fromhex('a6358d813592fdd80a9aaa3390f39c8a5a76b6e4')) +
IkePacker.notify(PayloadType.VENDOR_ID, NotifyType.NAT_DETECTION_SOURCE_IP, bytes.fromhex('4cc324152ba3f68ef649ac1e6f96f33791611db2')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c590254e5403cbb71f3d493111d7fcad')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c61baca1f1a60cc10800000000000000')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3c0000000')) +
IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3')) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.IKEV2_FRAGMENTATION_SUPPORTED, protocol_id = 0) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.REDIRECT_SUPPORTED, protocol_id = 0) +
IkePacker.notify(PayloadType.NONE, NotifyType.SIGNATURE_HASH_ALGORITHMS, bytes.fromhex('0001000200030004'), protocol_id = 0)
))
if FW_VERSION == None:
vendors = r.get(PayloadType.VENDOR_ID)
for vendor in vendors:
if len(vendor) > 32 and vendor[:8].hex() == 'bfc22e9856ba9936':
try:
watchguard_data = base64.b64decode(vendor[32:].decode('ascii')).decode()
match = re.search(r"VN=([0-9\.]+) BN=([0-9]+)", watchguard_data)
if match:
FW_VERSION = match.group(1)
BUILD_NUMBER = match.group(2)
if WatchGuardFw.version_tuple(FW_VERSION) < WatchGuardFw.version_tuple("12.11.4"):
return True, FW_VERSION, BUILD_NUMBER
else:
return False, FW_VERSION, BUILD_NUMBER
except (UnicodeDecodeError, binascii.Error):
continue
notifications = r.get(PayloadType.NOTIFY)
for notification in notifications:
if notification.message_type == NotifyType.NO_PROPOSAL_CHOSEN:
raise IkeNoProposalChosenException()
if notification.message_type == NotifyType.INVALID_KE_PAYLOAD:
raise IkeInvalidKEPayloadException()
return None, None, None
def test_connection(ip, port, timeout=20):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.connect((ip, port))
sock.close()
return True
except (socket.timeout, socket.error, ConnectionRefusedError, OSError):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except (socket.timeout, socket.error, ConnectionRefusedError, OSError):
return False
def check_watchguard_version(ip, port=500, timeout=10, connection_timeout=20):
global FW_VERSION, BUILD_NUMBER
FW_VERSION = None
BUILD_NUMBER = None
if not test_connection(ip, port, connection_timeout):
return False, None, None
ike = Ike(ip, port, timeout)
try:
is_vulnerable, fw_version, build_number = ike.sa_init()
return is_vulnerable, fw_version, build_number
except TimeoutError:
logging.error(f"[-] {ip}:{port} - IKEv2 timeout")
return False, None, None
except IkeNoProposalChosenException:
logging.warning(f"[!] {ip}:{port} - IKEv2 service found but no proposal chosen")
return False, None, None
except IkeInvalidKEPayloadException:
logging.warning(f"[!] {ip}:{port} - IKEv2 service found but invalid KE payload")
return False, None, None
except Exception as e:
logging.error(f"[-] {ip}:{port} - Error: {e}")
return False, None, None
finally:
ike.close()
def main():
host = os.getenv("Host")
port = os.getenv("Port")
# Set default port to 500 if Port environment variable is not set
if port is None or port == "":
port = "500"
try:
is_vulnerable, fw_version, build_number = check_watchguard_version(host, int(port))
if is_vulnerable is True:
print(f"Vulnerable to Version {fw_version} (Build {build_number})")
elif is_vulnerable is False:
print(f"NOT VULN to Version {fw_version} (Build {build_number})")
except Exception as e:
logging.error(f"[-] {host}:{port or 500} - Error: {e}")
logging.info(f"[?] {host}:{port or 500} - ERROR")
logging.info(f" Status: Error occurred during check")
if __name__ == "__main__":
main()
matchers:
- type: word
words:
- "Vulnerable"
extractors:
- type: dsl
dsl:
- response
# digest: 4a0a00473045022100e816304f342aa2499c55337f12dd1d27a71f70984fee248694b8c541db6072d802204a7380b536533cb68ecec2a2db03c8cb7440d5b3b1aa736cd74cfd61288753a6:922c64590222798bb761d5b6d8e72950