import os import socket import struct import time import zlib size_pack = struct.Struct('!I') def open_connection(): sock = socket.create_connection((host, port)) return sock def zlibify(data): data = zlib.compress(data.encode('utf-8')) return size_pack.pack(len(data)) + data def dezlibify(data, skip_head=True): if skip_head: data = data[size_pack.size:] return zlib.decompress(data).decode('utf-8') def main(): global host, port import argparse parser = argparse.ArgumentParser() parser.add_argument('-l', '--host', default='localhost') parser.add_argument('-p', '--port', default=9999, type=int) args = parser.parse_args() host, port = args.host, args.port print('Opening idle connection:', end=' ') s1 = open_connection() print('Success') print('Opening hello world connection:', end=' ') s2 = open_connection() print('Success') print('Sending Hello, World!', end=' ') s2.sendall(zlibify('Hello, World!')) print('Success') print('Testing blank connection:', end=' ') s3 = open_connection() s3.close() print('Success') result = dezlibify(s2.recv(1024)) assert result == 'Hello, World!' print(result) s2.close() print('Large random data test:', end=' ') s4 = open_connection() data = os.urandom(1000000).decode('iso-8859-1') print('Generated', end=' ') s4.sendall(zlibify(data)) print('Sent', end=' ') result = b'' while len(result) < size_pack.size: result += s4.recv(1024) size = size_pack.unpack(result[:size_pack.size])[0] result = result[size_pack.size:] while len(result) < size: result += s4.recv(1024) print('Received', end=' ') assert dezlibify(result, False) == data print('Success') s4.close() print('Test malformed connection:', end=' ') s5 = open_connection() s5.sendall(data[:100000].encode('utf-8')) s5.close() print('Success') print('Waiting for timeout to close idle connection:', end=' ') time.sleep(6) print('Done') s1.close() if __name__ == '__main__': main()