Commit 5f5e7e2c authored by StevenWdV's avatar StevenWdV

Made it much faster by using settimeout instead of poll. Made it easier for...

Made it much faster by using settimeout instead of poll. Made it easier for connect_client to acquire the lock. Fixed bug: the socket was closed before the thread was done. Made an attempt to make it work on Windows.
parent 79a82f19
......@@ -12,13 +12,13 @@ import btcp
logging.basicConfig(stream=sys.stderr, level=logging.ERROR)
parser = argparse.ArgumentParser()
parser.add_argument("-w", "--window", help="Define bTCP window size", type=int, default=70)
parser.add_argument("-w", "--window", help="Define bTCP window size", type=int, default=80)
parser.add_argument("-t", "--timeout", help="Define bTCP timeout in milliseconds", type=int, default=100)
parser.add_argument("-i", "--input", help="File to send", default="tmp.file")
args = parser.parse_args()
binding = btcp.Binding(socket.AF_INET, None, args.window, args.timeout)
connection = binding.connect_client(0, 0, ("", 9001))
connection = binding.connect_client(0, 0, ("localhost", 9001))
file = open(args.input, "r+b")
data = file.read()
......
......@@ -11,14 +11,13 @@ import btcp
logging.basicConfig(stream=sys.stderr, level=logging.ERROR)
# Handle arguments
parser = argparse.ArgumentParser()
parser.add_argument("-w", "--window", help="Define bTCP window size", type=int, default=70)
parser.add_argument("-w", "--window", help="Define bTCP window size", type=int, default=80)
parser.add_argument("-t", "--timeout", help="Define bTCP timeout in milliseconds", type=int, default=100)
parser.add_argument("-o", "--output", help="Where to store file", default="tmp.file")
args = parser.parse_args()
binding = btcp.Binding(socket.AF_INET, ("", 9001), args.window, args.timeout)
binding = btcp.Binding(socket.AF_INET, ("localhost", 9001), args.window, args.timeout)
server = binding.bind_server(0)
server.start_listen(1)
......
......@@ -5,7 +5,6 @@ import bisect
import copy
import logging
import random
import select
import socket
import struct
import threading
......@@ -616,30 +615,46 @@ class Binding:
self.sockets: Dict[int, Union[_Server, _Connection]] = {} # Local port -> _Server / _Connection
self.stop = False
self.sockets_stop_lock = threading.Lock()
self.read_thread = threading.Thread(None, self.__background)
self.read_thread.start()
self.stop_lock = threading.Lock()
self.sockets_lock = threading.Lock()
self.deb = f"{self.local_udp_addr}: "
self.background_thread = threading.Thread(None, self.__background)
self.background_thread.start()
logging.debug(self.deb + "Set up binding")
def __background(self) -> None:
poller = select.poll() # Does not work on Windows because Python is stupid (WSAPoll is a thing)
poller.register(self.sock, select.POLLIN)
while True:
poll_result = poller.poll(self.poll_time_ms)
packet_batches: Dict[int, List[_AddrPacket]] = {}
self.sockets_stop_lock.acquire()
self.stop_lock.acquire()
if self.stop:
self.stop_lock.release()
break
self.stop_lock.release()
while len(poll_result) > 0:
data, addr = self.sock.recvfrom(header_size + payload_size, socket.MSG_WAITALL)
packet_batches: Dict[int, List[_AddrPacket]] = {}
self.sock.settimeout(self.poll_time_ms / 1000)
no_more_data = False
while not no_more_data:
try:
try:
data, addr = self.sock.recvfrom(header_size + payload_size, socket.MSG_WAITALL)
break
except (socket.timeout, BlockingIOError):
no_more_data = True
break
except OSError as err:
# WSAEOPNOTSUPP; see https://stackoverflow.com/q/34242622/4454665
# (not that this script seems to work on Windows anyway; no idea why it doesn't)
if err.errno != 10045:
raise
self.sockets_lock.acquire()
while not no_more_data:
# noinspection PyUnboundLocalVariable
logging.debug(self.deb + f"Packet from {addr}")
# noinspection PyUnboundLocalVariable
packet = _AddrPacket(_Header.unpack(data[:header_size]),
data[header_size:], addr)
if packet.header.data_length > payload_size:
......@@ -654,7 +669,21 @@ class Binding:
else:
logging.warning(self.deb + "Unknown server")
poll_result = poller.poll(0)
self.sock.settimeout(0)
while not no_more_data:
try:
try:
data, addr = self.sock.recvfrom(header_size + payload_size, socket.MSG_WAITALL)
break
except (socket.timeout, BlockingIOError):
no_more_data = True
break
except OSError as err:
# WSAEOPNOTSUPP; see https://stackoverflow.com/q/34242622/4454665
# (not that this script seems to work on Windows anyway; no idea why it doesn't)
if err.errno != 10045:
raise
for local_port, packets in packet_batches.items():
# noinspection PyProtectedMember
......@@ -664,9 +693,7 @@ class Binding:
# noinspection PyProtectedMember
server._poll_streams()
self.sockets_stop_lock.release()
self.sockets_stop_lock.release()
self.sockets_lock.release()
def _send(self, packet: _Packet, remote_udp_addr: Any) -> None:
logging.debug(self.deb + f"Send to {remote_udp_addr}")
......@@ -676,24 +703,24 @@ class Binding:
def _remove_socket(self, local_port: int) -> None:
logging.debug(self.deb + f"Remove socket {local_port}")
self.sockets_stop_lock.acquire()
self.sockets_lock.acquire()
del self.sockets[local_port]
self.sockets_stop_lock.release()
self.sockets_lock.release()
def bind_server(self, local_btcp_port: int) -> _Server:
logging.debug(self.deb + f"Bind server to btcp {local_btcp_port}")
self.sockets_stop_lock.acquire()
self.sockets_lock.acquire()
server = self.sockets[local_btcp_port] = _Server(self, local_btcp_port)
self.sockets_stop_lock.release()
self.sockets_lock.release()
return server
def connect_client(self, local_btcp_port: int, remote_btcp_port: int, remote_udp_addr: Any) -> _Connection:
logging.debug(self.deb + f"Connect to btcp {local_btcp_port} -> {remote_btcp_port} {remote_udp_addr}")
self.sockets_stop_lock.acquire()
self.sockets_lock.acquire()
connection = _Connection(self, _Stream(self, local_btcp_port, remote_btcp_port, remote_udp_addr,
self.window_size, self.timeout_sec))
self.sockets[local_btcp_port] = connection
self.sockets_stop_lock.release()
self.sockets_lock.release()
return connection
def close(self) -> None:
......@@ -704,11 +731,11 @@ class Binding:
for key in self.sockets.keys():
break
self.sockets[key].close()
self.sock.close()
self.sockets_stop_lock.acquire()
self.stop_lock.acquire()
self.stop = True
self.sockets_stop_lock.release()
self.stop_lock.release()
self.read_thread.join()
self.background_thread.join()
self.sock.close()
logging.debug(self.deb + "Closed")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment