aboutsummaryrefslogtreecommitdiffstats
path: root/tests/hwsim/test_ap_open.py
blob: 3907925352ea110fff62e93213c44fc273bbbd96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
# Open mode AP tests
# Copyright (c) 2014, Qualcomm Atheros, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.

from remotehost import remote_compatible
import logging
logger = logging.getLogger()
import struct
import subprocess
import time
import os

import hostapd
import hwsim_utils
from tshark import run_tshark
from utils import *
from wpasupplicant import WpaSupplicant
from test_ap_ht import set_world_reg

@remote_compatible
def test_ap_open(dev, apdev):
    """AP with open mode (no security) configuration"""
    _test_ap_open(dev, apdev)

def _test_ap_open(dev, apdev):
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bg_scan_period="0")
    ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No connection event received from hostapd")
    hwsim_utils.test_connectivity(dev[0], hapd)

    dev[0].request("DISCONNECT")
    ev = hapd.wait_event([ "AP-STA-DISCONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No disconnection event received from hostapd")

def test_ap_open_packet_loss(dev, apdev):
    """AP with open mode configuration and large packet loss"""
    params = { "ssid": "open",
               "ignore_probe_probability": "0.5",
               "ignore_auth_probability": "0.5",
               "ignore_assoc_probability": "0.5",
               "ignore_reassoc_probability": "0.5" }
    hapd = hostapd.add_ap(apdev[0], params)
    for i in range(0, 3):
        dev[i].connect("open", key_mgmt="NONE", scan_freq="2412",
                       wait_connect=False)
    for i in range(0, 3):
        dev[i].wait_connected(timeout=20)

@remote_compatible
def test_ap_open_unknown_action(dev, apdev):
    """AP with open mode configuration and unknown Action frame"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    bssid = apdev[0]['bssid']
    cmd = "MGMT_TX {} {} freq=2412 action=765432".format(bssid, bssid)
    if "FAIL" in dev[0].request(cmd):
        raise Exception("Could not send test Action frame")
    ev = dev[0].wait_event(["MGMT-TX-STATUS"], timeout=10)
    if ev is None:
        raise Exception("Timeout on MGMT-TX-STATUS")
    if "result=SUCCESS" not in ev:
        raise Exception("AP did not ack Action frame")

def test_ap_open_invalid_wmm_action(dev, apdev):
    """AP with open mode configuration and invalid WMM Action frame"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    bssid = apdev[0]['bssid']
    cmd = "MGMT_TX {} {} freq=2412 action=1100".format(bssid, bssid)
    if "FAIL" in dev[0].request(cmd):
        raise Exception("Could not send test Action frame")
    ev = dev[0].wait_event(["MGMT-TX-STATUS"], timeout=10)
    if ev is None or "result=SUCCESS" not in ev:
        raise Exception("AP did not ack Action frame")

@remote_compatible
def test_ap_open_reconnect_on_inactivity_disconnect(dev, apdev):
    """Reconnect to open mode AP after inactivity related disconnection"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    hapd.request("DEAUTHENTICATE " + dev[0].p2p_interface_addr() + " reason=4")
    dev[0].wait_disconnected(timeout=5)
    dev[0].wait_connected(timeout=2, error="Timeout on reconnection")

@remote_compatible
def test_ap_open_assoc_timeout(dev, apdev):
    """AP timing out association"""
    ssid = "test"
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].scan(freq="2412")
    hapd.set("ext_mgmt_frame_handling", "1")
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   wait_connect=False)
    for i in range(0, 10):
        req = hapd.mgmt_rx()
        if req is None:
            raise Exception("MGMT RX wait timed out")
        if req['subtype'] == 11:
            break
        req = None
    if not req:
        raise Exception("Authentication frame not received")

    resp = {}
    resp['fc'] = req['fc']
    resp['da'] = req['sa']
    resp['sa'] = req['da']
    resp['bssid'] = req['bssid']
    resp['payload'] = struct.pack('<HHH', 0, 2, 0)
    hapd.mgmt_tx(resp)

    assoc = 0
    for i in range(0, 10):
        req = hapd.mgmt_rx()
        if req is None:
            raise Exception("MGMT RX wait timed out")
        if req['subtype'] == 0:
            assoc += 1
            if assoc == 3:
                break
    if assoc != 3:
        raise Exception("Association Request frames not received: assoc=%d" % assoc)
    hapd.set("ext_mgmt_frame_handling", "0")
    dev[0].wait_connected(timeout=15)

@remote_compatible
def test_ap_open_id_str(dev, apdev):
    """AP with open mode and id_str"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", id_str="foo",
                   wait_connect=False)
    ev = dev[0].wait_connected(timeout=10)
    if "id_str=foo" not in ev:
        raise Exception("CTRL-EVENT-CONNECT did not have matching id_str: " + ev)
    if dev[0].get_status_field("id_str") != "foo":
        raise Exception("id_str mismatch")

@remote_compatible
def test_ap_open_select_any(dev, apdev):
    """AP with open mode and select any network"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    id = dev[0].connect("unknown", key_mgmt="NONE", scan_freq="2412",
                        only_add_network=True)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   only_add_network=True)
    dev[0].select_network(id)
    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND",
                            "CTRL-EVENT-CONNECTED"], timeout=10)
    if ev is None:
        raise Exception("No result reported")
    if "CTRL-EVENT-CONNECTED" in ev:
        raise Exception("Unexpected connection")

    dev[0].select_network("any")
    dev[0].wait_connected(timeout=10)

@remote_compatible
def test_ap_open_unexpected_assoc_event(dev, apdev):
    """AP with open mode and unexpected association event"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    dev[0].request("DISCONNECT")
    dev[0].wait_disconnected(timeout=15)
    dev[0].dump_monitor()
    # This will be accepted due to matching network
    dev[0].cmd_execute(['iw', 'dev', dev[0].ifname, 'connect', 'open', "2412",
                        apdev[0]['bssid']])
    dev[0].wait_connected(timeout=15)
    dev[0].dump_monitor()

    dev[0].request("REMOVE_NETWORK all")
    dev[0].wait_disconnected(timeout=5)
    dev[0].dump_monitor()
    # This will result in disconnection due to no matching network
    dev[0].cmd_execute(['iw', 'dev', dev[0].ifname, 'connect', 'open', "2412",
                        apdev[0]['bssid']])
    dev[0].wait_disconnected(timeout=15)

def test_ap_open_external_assoc(dev, apdev):
    """AP with open mode and external association"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open-ext-assoc" })
    try:
        dev[0].request("STA_AUTOCONNECT 0")
        id = dev[0].connect("open-ext-assoc", key_mgmt="NONE", scan_freq="2412",
                            only_add_network=True)
        dev[0].request("ENABLE_NETWORK %s no-connect" % id)
        dev[0].dump_monitor()
        # This will be accepted due to matching network
        dev[0].cmd_execute(['iw', 'dev', dev[0].ifname, 'connect',
                            'open-ext-assoc', "2412", apdev[0]['bssid']])
        ev = dev[0].wait_event([ "CTRL-EVENT-DISCONNECTED",
                                 "CTRL-EVENT-CONNECTED" ], timeout=10)
        if ev is None:
            raise Exception("Connection timed out")
        if "CTRL-EVENT-DISCONNECTED" in ev:
            raise Exception("Unexpected disconnection event")
        dev[0].dump_monitor()
        dev[0].request("DISCONNECT")
        dev[0].wait_disconnected(timeout=5)
    finally:
        dev[0].request("STA_AUTOCONNECT 1")

@remote_compatible
def test_ap_bss_load(dev, apdev):
    """AP with open mode (no security) configuration"""
    hapd = hostapd.add_ap(apdev[0],
                          { "ssid": "open",
                            "bss_load_update_period": "10",
                            "chan_util_avg_period": "20" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    # this does not really get much useful output with mac80211_hwsim currently,
    # but run through the channel survey update couple of times
    for i in range(0, 10):
        hwsim_utils.test_connectivity(dev[0], hapd)
        hwsim_utils.test_connectivity(dev[0], hapd)
        hwsim_utils.test_connectivity(dev[0], hapd)
        time.sleep(0.15)
    avg = hapd.get_status_field("chan_util_avg")
    if avg is None:
        raise Exception("No STATUS chan_util_avg seen")

def test_ap_bss_load_fail(dev, apdev):
    """BSS Load update failing to get survey data"""
    hapd = hostapd.add_ap(apdev[0],
                          { "ssid": "open",
                            "bss_load_update_period": "1" })
    with fail_test(hapd, 1, "wpa_driver_nl80211_get_survey"):
        wait_fail_trigger(hapd, "GET_FAIL")

def hapd_out_of_mem(hapd, apdev, count, func):
    with alloc_fail(hapd, count, func):
        started = False
        try:
            hostapd.add_ap(apdev, { "ssid": "open" })
            started = True
        except:
            pass
        if started:
            raise Exception("hostapd interface started even with memory allocation failure: %d:%s" % (count, func))

def test_ap_open_out_of_memory(dev, apdev):
    """hostapd failing to setup interface due to allocation failure"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_alloc_bss_data")

    for i in range(1, 3):
        hapd_out_of_mem(hapd, apdev[1], i, "hostapd_iface_alloc")

    for i in range(1, 5):
        hapd_out_of_mem(hapd, apdev[1], i, "hostapd_config_defaults;hostapd_config_alloc")

    hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_config_alloc")

    hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_driver_init")

    for i in range(1, 3):
        hapd_out_of_mem(hapd, apdev[1], i, "=wpa_driver_nl80211_drv_init")

    # eloop_register_read_sock() call from i802_init()
    hapd_out_of_mem(hapd, apdev[1], 1, "eloop_sock_table_add_sock;?eloop_register_sock;?eloop_register_read_sock;=i802_init")

    # verify that a new interface can still be added when memory allocation does
    # not fail
    hostapd.add_ap(apdev[1], { "ssid": "open" })

def test_bssid_black_white_list(dev, apdev):
    """BSSID black/white list"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    hapd2 = hostapd.add_ap(apdev[1], { "ssid": "open" })

    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_whitelist=apdev[1]['bssid'])
    dev[1].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_blacklist=apdev[1]['bssid'])
    dev[2].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_whitelist="00:00:00:00:00:00/00:00:00:00:00:00",
                   bssid_blacklist=apdev[1]['bssid'])
    if dev[0].get_status_field('bssid') != apdev[1]['bssid']:
        raise Exception("dev[0] connected to unexpected AP")
    if dev[1].get_status_field('bssid') != apdev[0]['bssid']:
        raise Exception("dev[1] connected to unexpected AP")
    if dev[2].get_status_field('bssid') != apdev[0]['bssid']:
        raise Exception("dev[2] connected to unexpected AP")
    dev[0].request("REMOVE_NETWORK all")
    dev[1].request("REMOVE_NETWORK all")
    dev[2].request("REMOVE_NETWORK all")

    dev[2].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_whitelist="00:00:00:00:00:00", wait_connect=False)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_whitelist="11:22:33:44:55:66/ff:00:00:00:00:00 " + apdev[1]['bssid'] + " aa:bb:cc:dd:ee:ff")
    dev[1].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bssid_blacklist="11:22:33:44:55:66/ff:00:00:00:00:00 " + apdev[1]['bssid'] + " aa:bb:cc:dd:ee:ff")
    if dev[0].get_status_field('bssid') != apdev[1]['bssid']:
        raise Exception("dev[0] connected to unexpected AP")
    if dev[1].get_status_field('bssid') != apdev[0]['bssid']:
        raise Exception("dev[1] connected to unexpected AP")
    dev[0].request("REMOVE_NETWORK all")
    dev[1].request("REMOVE_NETWORK all")
    ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=0.1)
    if ev is not None:
        raise Exception("Unexpected dev[2] connectin")
    dev[2].request("REMOVE_NETWORK all")

def test_ap_open_wpas_in_bridge(dev, apdev):
    """Open mode AP and wpas interface in a bridge"""
    br_ifname='sta-br0'
    ifname='wlan5'
    try:
        _test_ap_open_wpas_in_bridge(dev, apdev)
    finally:
        subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'down'])
        subprocess.call(['brctl', 'delif', br_ifname, ifname])
        subprocess.call(['brctl', 'delbr', br_ifname])
        subprocess.call(['iw', ifname, 'set', '4addr', 'off'])

def _test_ap_open_wpas_in_bridge(dev, apdev):
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })

    br_ifname='sta-br0'
    ifname='wlan5'
    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
    # First, try a failure case of adding an interface
    try:
        wpas.interface_add(ifname, br_ifname=br_ifname)
        raise Exception("Interface addition succeeded unexpectedly")
    except Exception as e:
        if "Failed to add" in str(e):
            logger.info("Ignore expected interface_add failure due to missing bridge interface: " + str(e))
        else:
            raise

    # Next, add the bridge interface and add the interface again
    subprocess.call(['brctl', 'addbr', br_ifname])
    subprocess.call(['brctl', 'setfd', br_ifname, '0'])
    subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up'])
    subprocess.call(['iw', ifname, 'set', '4addr', 'on'])
    subprocess.check_call(['brctl', 'addif', br_ifname, ifname])
    wpas.interface_add(ifname, br_ifname=br_ifname)

    wpas.connect("open", key_mgmt="NONE", scan_freq="2412")

@remote_compatible
def test_ap_open_start_disabled(dev, apdev):
    """AP with open mode and beaconing disabled"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open",
                                      "start_disabled": "1" })
    bssid = apdev[0]['bssid']

    dev[0].flush_scan_cache()
    dev[0].scan(freq=2412, only_new=True)
    if dev[0].get_bss(bssid) is not None:
        raise Exception("AP was seen beaconing")
    if "OK" not in hapd.request("RELOAD"):
        raise Exception("RELOAD failed")
    dev[0].scan_for_bss(bssid, freq=2412)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")

@remote_compatible
def test_ap_open_start_disabled2(dev, apdev):
    """AP with open mode and beaconing disabled (2)"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open",
                                      "start_disabled": "1" })
    bssid = apdev[0]['bssid']

    dev[0].flush_scan_cache()
    dev[0].scan(freq=2412, only_new=True)
    if dev[0].get_bss(bssid) is not None:
        raise Exception("AP was seen beaconing")
    if "OK" not in hapd.request("UPDATE_BEACON"):
        raise Exception("UPDATE_BEACON failed")
    dev[0].scan_for_bss(bssid, freq=2412)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    if "OK" not in hapd.request("UPDATE_BEACON"):
        raise Exception("UPDATE_BEACON failed")
    dev[0].request("DISCONNECT")
    dev[0].wait_disconnected()
    dev[0].request("RECONNECT")
    dev[0].wait_connected()

@remote_compatible
def test_ap_open_ifdown(dev, apdev):
    """AP with open mode and external ifconfig down"""
    params = { "ssid": "open",
               "ap_max_inactivity": "1" }
    hapd = hostapd.add_ap(apdev[0], params)
    bssid = apdev[0]['bssid']

    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
    hapd.cmd_execute(['ip', 'link', 'set', 'dev', apdev[0]['ifname'], 'down'])
    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=10)
    if ev is None:
        raise Exception("Timeout on AP-STA-DISCONNECTED (1)")
    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=5)
    if ev is None:
        raise Exception("Timeout on AP-STA-DISCONNECTED (2)")
    ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5)
    if ev is None:
        raise Exception("No INTERFACE-DISABLED event")
    # The following wait tests beacon loss detection in mac80211 on dev0.
    # dev1 is used to test stopping of AP side functionality on client polling.
    dev[1].request("REMOVE_NETWORK all")
    hapd.cmd_execute(['ip', 'link', 'set', 'dev', apdev[0]['ifname'], 'up'])
    dev[0].wait_disconnected()
    dev[1].wait_disconnected()
    ev = hapd.wait_event(["INTERFACE-ENABLED"], timeout=10)
    if ev is None:
        raise Exception("No INTERFACE-ENABLED event")
    dev[0].wait_connected()
    hwsim_utils.test_connectivity(dev[0], hapd)

def test_ap_open_disconnect_in_ps(dev, apdev, params):
    """Disconnect with the client in PS to regression-test a kernel bug"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bg_scan_period="0")
    ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No connection event received from hostapd")

    time.sleep(0.2)
    # enable power save mode
    hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_ENABLED)
    time.sleep(0.1)
    try:
        # inject some traffic
        sa = hapd.own_addr()
        da = dev[0].own_addr()
        hapd.request('DATA_TEST_CONFIG 1')
        hapd.request('DATA_TEST_TX {} {} 0'.format(da, sa))
        hapd.request('DATA_TEST_CONFIG 0')

        # let the AP send couple of Beacon frames
        time.sleep(0.3)

        # disconnect - with traffic pending - shouldn't cause kernel warnings
        dev[0].request("DISCONNECT")
    finally:
        hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_DISABLED)

    time.sleep(0.2)
    out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
                     "wlan_mgt.tim.partial_virtual_bitmap",
                     ["wlan_mgt.tim.partial_virtual_bitmap"])
    if out is not None:
        state = 0
        for l in out.splitlines():
            pvb = int(l, 16)
            if pvb > 0 and state == 0:
                state = 1
            elif pvb == 0 and state == 1:
                state = 2
        if state != 2:
            raise Exception("Didn't observe TIM bit getting set and unset (state=%d)" % state)

@remote_compatible
def test_ap_open_select_network(dev, apdev):
    """Open mode connection and SELECT_NETWORK to change network"""
    hapd1 = hostapd.add_ap(apdev[0], { "ssid": "open" })
    bssid1 = apdev[0]['bssid']
    hapd2 = hostapd.add_ap(apdev[1], { "ssid": "open2" })
    bssid2 = apdev[1]['bssid']

    id1 = dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                         only_add_network=True)
    id2 = dev[0].connect("open2", key_mgmt="NONE", scan_freq="2412")
    hwsim_utils.test_connectivity(dev[0], hapd2)

    dev[0].select_network(id1)
    dev[0].wait_connected()
    res = dev[0].request("BLACKLIST")
    if bssid1 in res or bssid2 in res:
        raise Exception("Unexpected blacklist entry")
    hwsim_utils.test_connectivity(dev[0], hapd1)

    dev[0].select_network(id2)
    dev[0].wait_connected()
    hwsim_utils.test_connectivity(dev[0], hapd2)
    res = dev[0].request("BLACKLIST")
    if bssid1 in res or bssid2 in res:
        raise Exception("Unexpected blacklist entry(2)")

@remote_compatible
def test_ap_open_disable_enable(dev, apdev):
    """AP with open mode getting disabled and re-enabled"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                   bg_scan_period="0")

    for i in range(2):
        hapd.request("DISABLE")
        dev[0].wait_disconnected()
        hapd.request("ENABLE")
        dev[0].wait_connected()
        hwsim_utils.test_connectivity(dev[0], hapd)

def sta_enable_disable(dev, bssid):
    dev.scan_for_bss(bssid, freq=2412)
    work_id = dev.request("RADIO_WORK add block-work")
    ev = dev.wait_event(["EXT-RADIO-WORK-START"])
    if ev is None:
        raise Exception("Timeout while waiting radio work to start")
    id = dev.connect("open", key_mgmt="NONE", scan_freq="2412",
                     only_add_network=True)
    dev.request("ENABLE_NETWORK %d" % id)
    if "connect@" not in dev.request("RADIO_WORK show"):
        raise Exception("connect radio work missing")
    dev.request("DISABLE_NETWORK %d" % id)
    dev.request("RADIO_WORK done " + work_id)

    ok = False
    for i in range(30):
        if "connect@" not in dev.request("RADIO_WORK show"):
            ok = True
            break
        time.sleep(0.1)
    if not ok:
        raise Exception("connect radio work not completed")
    ev = dev.wait_event(["CTRL-EVENT-CONNECTED"], timeout=0.1)
    if ev is not None:
        raise Exception("Unexpected connection")
    dev.request("DISCONNECT")

def test_ap_open_sta_enable_disable(dev, apdev):
    """AP with open mode and wpa_supplicant ENABLE/DISABLE_NETWORK"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    bssid = apdev[0]['bssid']

    sta_enable_disable(dev[0], bssid)

    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
    wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
    sta_enable_disable(wpas, bssid)

@remote_compatible
def test_ap_open_select_twice(dev, apdev):
    """AP with open mode and select network twice"""
    id = dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                        only_add_network=True)
    dev[0].select_network(id)
    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
    if ev is None:
        raise Exception("No result reported")
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    # Verify that the second SELECT_NETWORK starts a new scan immediately by
    # waiting less than the default scan period.
    dev[0].select_network(id)
    dev[0].wait_connected(timeout=3)

@remote_compatible
def test_ap_open_reassoc_not_found(dev, apdev):
    """AP with open mode and REASSOCIATE not finding a match"""
    id = dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                        only_add_network=True)
    dev[0].select_network(id)
    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
    if ev is None:
        raise Exception("No result reported")
    dev[0].request("DISCONNECT")

    time.sleep(0.1)
    dev[0].dump_monitor()

    dev[0].request("REASSOCIATE")
    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
    if ev is None:
        raise Exception("No result reported")
    dev[0].request("DISCONNECT")

@remote_compatible
def test_ap_open_sta_statistics(dev, apdev):
    """AP with open mode and STA statistics"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    addr = dev[0].own_addr()

    stats1 = hapd.get_sta(addr)
    logger.info("stats1: " + str(stats1))
    time.sleep(0.4)
    stats2 = hapd.get_sta(addr)
    logger.info("stats2: " + str(stats2))
    hwsim_utils.test_connectivity(dev[0], hapd)
    stats3 = hapd.get_sta(addr)
    logger.info("stats3: " + str(stats3))

    # Cannot require specific inactive_msec changes without getting rid of all
    # unrelated traffic, so for now, just print out the results in the log for
    # manual checks.

@remote_compatible
def test_ap_open_poll_sta(dev, apdev):
    """AP with open mode and STA poll"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    addr = dev[0].own_addr()

    if "OK" not in hapd.request("POLL_STA " + addr):
        raise Exception("POLL_STA failed")
    ev = hapd.wait_event(["AP-STA-POLL-OK"], timeout=5)
    if ev is None:
        raise Exception("Poll response not seen")
    if addr not in ev:
        raise Exception("Unexpected poll response: " + ev)

def test_ap_open_pmf_default(dev, apdev):
    """AP with open mode (no security) configuration and pmf=2"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[1].connect("open", key_mgmt="NONE", scan_freq="2412",
                   ieee80211w="2", wait_connect=False)
    dev[2].connect("open", key_mgmt="NONE", scan_freq="2412",
                   ieee80211w="1")
    try:
        dev[0].request("SET pmf 2")
        dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")

        dev[0].request("DISCONNECT")
        dev[0].wait_disconnected()
    finally:
        dev[0].request("SET pmf 0")
    dev[2].request("DISCONNECT")
    dev[2].wait_disconnected()

    ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=0.1)
    if ev is not None:
        raise Exception("Unexpected dev[1] connection")
    dev[1].request("DISCONNECT")

def test_ap_open_drv_fail(dev, apdev):
    """AP with open mode and driver operations failing"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })

    with fail_test(dev[0], 1, "wpa_driver_nl80211_authenticate"):
        dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                       wait_connect=False)
        wait_fail_trigger(dev[0], "GET_FAIL")
        dev[0].request("REMOVE_NETWORK all")

    with fail_test(dev[0], 1, "wpa_driver_nl80211_associate"):
        dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
                       wait_connect=False)
        wait_fail_trigger(dev[0], "GET_FAIL")
        dev[0].request("REMOVE_NETWORK all")

def run_multicast_to_unicast(dev, apdev, convert):
    params = { "ssid": "open" }
    params["multicast_to_unicast"] = "1" if convert else "0"
    hapd = hostapd.add_ap(apdev[0], params)
    dev[0].scan_for_bss(hapd.own_addr(), freq=2412)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No connection event received from hostapd")
    hwsim_utils.test_connectivity(dev[0], hapd, multicast_to_unicast=convert)
    dev[0].request("DISCONNECT")
    ev = hapd.wait_event([ "AP-STA-DISCONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No disconnection event received from hostapd")

def test_ap_open_multicast_to_unicast(dev, apdev):
    """Multicast-to-unicast conversion enabled"""
    run_multicast_to_unicast(dev, apdev, True)

def test_ap_open_multicast_to_unicast_disabled(dev, apdev):
    """Multicast-to-unicast conversion disabled"""
    run_multicast_to_unicast(dev, apdev, False)

def test_ap_open_drop_duplicate(dev, apdev, params):
    """AP dropping duplicate management frames"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open",
                                      "interworking": "1" })
    hapd.set("ext_mgmt_frame_handling", "1")
    bssid = hapd.own_addr().replace(':', '')
    addr = "020304050607"
    auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
        raise Exception("MGMT_RX_PROCESS failed")
    auth = "b0083a01" + bssid + addr + bssid + '1000000001000000'
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
        raise Exception("MGMT_RX_PROCESS failed")

    ies = "00046f70656e010802040b160c12182432043048606c2d1a3c101bffff0000000000000000000001000000000000000000007f0a04000a020140004000013b155151525354737475767778797a7b7c7d7e7f808182dd070050f202000100"
    assoc_req = "00003a01" + bssid + addr + bssid + "2000" + "21040500" + ies
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
        raise Exception("MGMT_RX_PROCESS failed")
    assoc_req = "00083a01" + bssid + addr + bssid + "2000" + "21040500" + ies
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
        raise Exception("MGMT_RX_PROCESS failed")
    reassoc_req = "20083a01" + bssid + addr + bssid + "2000" + "21040500" + ies
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % reassoc_req):
        raise Exception("MGMT_RX_PROCESS failed")
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % reassoc_req):
        raise Exception("MGMT_RX_PROCESS failed")

    action = "d0003a01" + bssid + addr + bssid + "1000" + "040a006c0200000600000102000101"
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % action):
        raise Exception("MGMT_RX_PROCESS failed")

    action = "d0083a01" + bssid + addr + bssid + "1000" + "040a006c0200000600000102000101"
    if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % action):
        raise Exception("MGMT_RX_PROCESS failed")

    out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
                     "wlan.fc.type == 0", ["wlan.fc.subtype"])
    num_auth = 0
    num_assoc = 0
    num_reassoc = 0
    num_action = 0
    for subtype in out.splitlines():
        val = int(subtype)
        if val == 11:
            num_auth += 1
        elif val == 1:
            num_assoc += 1
        elif val == 3:
            num_reassoc += 1
        elif val == 13:
            num_action += 1
    if num_auth != 1:
        raise Exception("Unexpected number of Authentication frames: %d" % num_auth)
    if num_assoc != 1:
        raise Exception("Unexpected number of association frames: %d" % num_assoc)
    if num_reassoc != 1:
        raise Exception("Unexpected number of reassociation frames: %d" % num_reassoc)
    if num_action != 1:
        raise Exception("Unexpected number of Action frames: %d" % num_action)

def test_ap_open_select_network_freq(dev, apdev):
    """AP with open mode and use for SELECT_NETWORK freq parameter"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    id = dev[0].connect("open", key_mgmt="NONE", only_add_network=True)
    dev[0].select_network(id, freq=2412)
    start = os.times()[4]
    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=5)
    if ev is None:
        raise Exception("Scan not started")
    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=15)
    if ev is None:
        raise Exception("Scan not completed")
    end = os.times()[4]
    logger.info("Scan duration: {} seconds".format(end - start))
    if end - start > 3:
        raise Exception("Scan took unexpectedly long time")
    dev[0].wait_connected()

def test_ap_open_noncountry(dev, apdev):
    """AP with open mode and noncountry entity as Country String"""
    _test_ap_open_country(dev, apdev, "XX", "0x58")

def test_ap_open_country_table_e4(dev, apdev):
    """AP with open mode and Table E-4 Country String"""
    _test_ap_open_country(dev, apdev, "DE", "0x04")

def test_ap_open_country_indoor(dev, apdev):
    """AP with open mode and indoor country code"""
    _test_ap_open_country(dev, apdev, "DE", "0x49")

def test_ap_open_country_outdoor(dev, apdev):
    """AP with open mode and outdoor country code"""
    _test_ap_open_country(dev, apdev, "DE", "0x4f")

def _test_ap_open_country(dev, apdev, country_code, country3):
    try:
        hapd = None
        hapd = run_ap_open_country(dev, apdev, country_code, country3)
    finally:
        clear_regdom(hapd, dev)

def run_ap_open_country(dev, apdev, country_code, country3):
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open",
                                      "country_code": country_code,
                                      "country3": country3,
                                      "ieee80211d": "1" })
    dev[0].scan_for_bss(hapd.own_addr(), freq=2412)
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    dev[0].wait_regdom(country_ie=True)
    return hapd

def test_ap_open_disable_select(dev, apdev):
    """DISABLE_NETWORK for connected AP followed by SELECT_NETWORK"""
    hapd1 = hostapd.add_ap(apdev[0], { "ssid": "open" })
    hapd2 = hostapd.add_ap(apdev[1], { "ssid": "open" })
    id = dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")

    dev[0].request("DISABLE_NETWORK %d" % id)
    dev[0].wait_disconnected()
    res = dev[0].request("BLACKLIST")
    if hapd1.own_addr() in res or hapd2.own_addr() in res:
        raise Exception("Unexpected blacklist entry added")
    dev[0].request("SELECT_NETWORK %d" % id)
    dev[0].wait_connected()

def test_ap_open_reassoc_same(dev, apdev):
    """AP with open mode and STA reassociating back to same AP without auth exchange"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
    try:
        dev[0].request("SET reassoc_same_bss_optim 1")
        dev[0].request("REATTACH")
        dev[0].wait_connected()
        hwsim_utils.test_connectivity(dev[0], hapd)
    finally:
        dev[0].request("SET reassoc_same_bss_optim 0")

def test_ap_open_no_reflection(dev, apdev):
    """AP with open mode, STA sending packets to itself"""
    hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")

    ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
    if ev is None:
        raise Exception("No connection event received from hostapd")
    # test normal connectivity is OK
    hwsim_utils.test_connectivity(dev[0], hapd)

    # test that we can't talk to ourselves
    addr = dev[0].own_addr()
    res = dev[0].request('DATA_TEST_CONFIG 1')
    try:
        assert 'OK' in res

        cmd = "DATA_TEST_TX {} {} {}".format(addr, addr, 0)
        dev[0].request(cmd)

        ev = dev[0].wait_event(["DATA-TEST-RX"], timeout=1)

        if ev is not None and "DATA-TEST-RX {} {}".format(addr, addr) in ev:
            raise Exception("STA can unexpectedly talk to itself")
    finally:
        dev[0].request('DATA_TEST_CONFIG 0')