Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions scapy/sendrecv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,8 +1296,10 @@ def _run(self,
"will be the one of the first socket")

close_pipe = None # type: Optional[ObjectPipe[None]]
if not nonblocking_socket:
# select is blocking: Add special control socket
if not nonblocking_socket or timeout is not None:
# Blocking select needs a control socket to wake up select().
# Nonblocking sockets with a timeout also need it so that stop()
# does not wait for the remaining timeout (#4890).
from scapy.automaton import ObjectPipe
close_pipe = ObjectPipe[None]("control_socket")
sniff_sockets[close_pipe] = "control_socket" # type: ignore
Expand All @@ -1309,7 +1311,6 @@ def stop_cb():
self.continue_sniff = False
self.stop_cb = stop_cb
else:
# select is non blocking
def stop_cb():
# type: () -> None
self.continue_sniff = False
Expand Down
46 changes: 24 additions & 22 deletions test/answering_machines.uts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@

= Generic answering machine mocker
from unittest import mock
@mock.patch("scapy.ansmachine.sniff")
def test_am(cls_name, packet_query, check_reply, mock_sniff, **kargs):
packet_query = packet_query.__class__(bytes(packet_query))
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = cls_name(**kargs)
called = [False]
def _sndrpl(x):
called[0] = True
check_reply(x.__class__(bytes(x)))
am.send_reply = _sndrpl
am()
assert called[0], "Filter never passed for AnsweringMachine !"

def test_am(cls_name, packet_query, check_reply, **kargs):
with mock.patch("scapy.ansmachine.sniff") as mock_sniff:
packet_query = packet_query.__class__(bytes(packet_query))
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = cls_name(**kargs)
called = [False]
def _sndrpl(x):
called[0] = True
check_reply(x.__class__(bytes(x)))
am.send_reply = _sndrpl
am()
assert called[0], "Filter never passed for AnsweringMachine !"


= BOOT_am
Expand Down Expand Up @@ -248,14 +249,15 @@ a.print_reply(req, res)

= WiFi_am
from unittest import mock
@mock.patch("scapy.layers.dot11.sniff")
def test_WiFi_am(packet_query, check_reply, mock_sniff, **kargs):
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = WiFi_am(**kargs)
am.send_reply = check_reply
am()

def test_WiFi_am(packet_query, check_reply, **kargs):
with mock.patch("scapy.layers.dot11.sniff") as mock_sniff:
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = WiFi_am(**kargs)
am.send_reply = check_reply
am()

def check_WiFi_am_reply(packet):
assert isinstance(packet, list) and len(packet) == 2
Expand Down
33 changes: 33 additions & 0 deletions test/regression.uts
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,39 @@ try:
except ValueError:
assert True

= AsyncSniffer early stop with timeout on nonblocking socket (#4890)

import time
from scapy.automaton import ObjectPipe, select_objects
from scapy.supersocket import SuperSocket

class NonblockingSelectSocket(SuperSocket):
nonblocking_socket = True
def __init__(self):
self.ins = ObjectPipe(name="dummy_sniff_socket")
self.outs = None
def recv(self, x=65535, **kwargs):
self.ins.recv()
return None
@staticmethod
def select(sockets, remain=None):
return select_objects(sockets, remain)
def close(self):
if self.closed:
return
self.closed = True
self.ins.close()

sock = NonblockingSelectSocket()
sniffer = AsyncSniffer(opened_socket=sock, timeout=10, count=0)
sniffer.start()
time.sleep(0.2)
sniffer.stop(join=False)
sniffer.thread.join(timeout=1)
assert not sniffer.running
assert not sniffer.thread.is_alive()
sock.close()

= Sending a TCP syn 'forever' at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
Expand Down
Loading