41 lines
1.4 KiB
Python
41 lines
1.4 KiB
Python
import socket
|
|
import argparse
|
|
import time
|
|
import sys
|
|
from loguru import logger
|
|
|
|
def main():
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(description="UDP Packet Receiver with Timeout")
|
|
parser.add_argument('--port', type=int, required=True, help="Port to listen for UDP packets.")
|
|
parser.add_argument('--timeout', type=int, required=True, help="Timeout in seconds to shut down if no packets are received.")
|
|
args = parser.parse_args()
|
|
|
|
# Create a UDP socket
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind(("", args.port))
|
|
print(f"Listening for UDP packets on port {args.port}...")
|
|
|
|
# Set the timeout for the socket
|
|
sock.settimeout(args.timeout)
|
|
|
|
try:
|
|
count = 0
|
|
while True:
|
|
try:
|
|
# Wait for a UDP packet
|
|
data, addr = sock.recvfrom(1024) # Buffer size is 1024 bytes
|
|
count += 1
|
|
logger.debug(f"Received packet {count} from {addr}")
|
|
except socket.timeout:
|
|
print(f"No packets received for {args.timeout} seconds. Shutting down.")
|
|
break
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down due to user interrupt.")
|
|
finally:
|
|
sock.close()
|
|
print("Socket closed.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |