aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tests/hwsim/test_dpp.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/tests/hwsim/test_dpp.py b/tests/hwsim/test_dpp.py
index b696c5d..71df7fc 100644
--- a/tests/hwsim/test_dpp.py
+++ b/tests/hwsim/test_dpp.py
@@ -15,6 +15,10 @@ import socket
import struct
import subprocess
import time
+try:
+ from socketserver import StreamRequestHandler, TCPServer
+except ImportError:
+ from SocketServer import StreamRequestHandler, TCPServer
import hostapd
import hwsim_utils
@@ -5284,6 +5288,61 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False):
time.sleep(0.5)
wt.close()
+class MyTCPServer(TCPServer):
+ def __init__(self, addr, handler):
+ self.allow_reuse_address = True
+ TCPServer.__init__(self, addr, handler)
+
+class DPPControllerServer(StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.read()
+ # Do not reply
+
+def test_dpp_relay_incomplete_connections(dev, apdev):
+ """DPP Relay and incomplete connections"""
+ check_dpp_capab(dev[0], min_ver=2)
+ check_dpp_capab(dev[1], min_ver=2)
+
+ id_c = dev[1].dpp_bootstrap_gen()
+ uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+ res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+ pkhash = None
+ for line in res.splitlines():
+ name, value = line.split('=')
+ if name == "pkhash":
+ pkhash = value
+ break
+ if not pkhash:
+ raise Exception("Could not fetch public key hash from Controller")
+
+ params = {"ssid": "unconfigured",
+ "channel": "6",
+ "dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash}
+ hapd = hostapd.add_ap(apdev[0], params)
+ check_dpp_capab(hapd)
+
+ server = MyTCPServer(("127.0.0.1", 8908), DPPControllerServer)
+ server.timeout = 30
+
+ hapd.set("ext_mgmt_frame_handling", "1")
+ dev[0].dpp_auth_init(uri=uri_c, role="enrollee")
+ msg = hapd.mgmt_rx()
+ if msg is None:
+ raise Exception("MGMT RX wait timed out")
+ dev[0].request("DPP_STOP_LISTEN")
+ frame = msg['frame']
+ for i in range(20):
+ if i == 14:
+ time.sleep(20)
+ addr = struct.pack('6B', 0x02, 0, 0, 0, 0, i)
+ tmp = frame[0:10] + addr + frame[16:]
+ hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(tmp).decode())
+ ev = hapd.wait_event(["DPP-FAIL"], timeout=0.1)
+ if ev:
+ raise Exception("DPP relay failed [%d]: %s" % (i + 1, ev))
+
+ server.server_close()
+
def test_dpp_tcp(dev, apdev, params):
"""DPP over TCP"""
prefix = "dpp_tcp"