2020-07-19 21:27:14 +00:00
|
|
|
import os
|
2020-01-21 06:35:58 +00:00
|
|
|
import socket
|
|
|
|
import struct
|
|
|
|
import time
|
|
|
|
import zlib
|
|
|
|
|
|
|
|
size_pack = struct.Struct('!I')
|
|
|
|
|
|
|
|
|
|
|
|
def open_connection():
|
|
|
|
sock = socket.create_connection((host, port))
|
|
|
|
return sock
|
|
|
|
|
|
|
|
|
|
|
|
def zlibify(data):
|
|
|
|
data = zlib.compress(data.encode('utf-8'))
|
|
|
|
return size_pack.pack(len(data)) + data
|
|
|
|
|
|
|
|
|
|
|
|
def dezlibify(data, skip_head=True):
|
|
|
|
if skip_head:
|
|
|
|
data = data[size_pack.size:]
|
|
|
|
return zlib.decompress(data).decode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
global host, port
|
|
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument('-l', '--host', default='localhost')
|
|
|
|
parser.add_argument('-p', '--port', default=9999, type=int)
|
|
|
|
args = parser.parse_args()
|
|
|
|
host, port = args.host, args.port
|
|
|
|
|
|
|
|
print('Opening idle connection:', end=' ')
|
|
|
|
s1 = open_connection()
|
|
|
|
print('Success')
|
|
|
|
print('Opening hello world connection:', end=' ')
|
|
|
|
s2 = open_connection()
|
|
|
|
print('Success')
|
|
|
|
print('Sending Hello, World!', end=' ')
|
|
|
|
s2.sendall(zlibify('Hello, World!'))
|
|
|
|
print('Success')
|
|
|
|
print('Testing blank connection:', end=' ')
|
|
|
|
s3 = open_connection()
|
|
|
|
s3.close()
|
|
|
|
print('Success')
|
|
|
|
result = dezlibify(s2.recv(1024))
|
|
|
|
assert result == 'Hello, World!'
|
|
|
|
print(result)
|
|
|
|
s2.close()
|
|
|
|
print('Large random data test:', end=' ')
|
|
|
|
s4 = open_connection()
|
2020-07-19 21:27:14 +00:00
|
|
|
data = os.urandom(1000000).decode('iso-8859-1')
|
2020-01-21 06:35:58 +00:00
|
|
|
print('Generated', end=' ')
|
|
|
|
s4.sendall(zlibify(data))
|
|
|
|
print('Sent', end=' ')
|
2020-07-19 21:27:14 +00:00
|
|
|
result = b''
|
2020-01-21 06:35:58 +00:00
|
|
|
while len(result) < size_pack.size:
|
|
|
|
result += s4.recv(1024)
|
|
|
|
size = size_pack.unpack(result[:size_pack.size])[0]
|
|
|
|
result = result[size_pack.size:]
|
|
|
|
while len(result) < size:
|
|
|
|
result += s4.recv(1024)
|
|
|
|
print('Received', end=' ')
|
|
|
|
assert dezlibify(result, False) == data
|
|
|
|
print('Success')
|
|
|
|
s4.close()
|
|
|
|
print('Test malformed connection:', end=' ')
|
|
|
|
s5 = open_connection()
|
2020-07-19 21:27:14 +00:00
|
|
|
s5.sendall(data[:100000].encode('utf-8'))
|
2020-01-21 06:35:58 +00:00
|
|
|
s5.close()
|
|
|
|
print('Success')
|
|
|
|
print('Waiting for timeout to close idle connection:', end=' ')
|
|
|
|
time.sleep(6)
|
|
|
|
print('Done')
|
|
|
|
s1.close()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
main()
|