中山大学 数据科学与计算机学院 软件工程(计算机应用方向) 16340132 梁颖霖
- Please choose one of following programing languages: C, C++, Java, Python;
- 本项目采用的是python3.6
- LFTP should use a client-server service model;
- 本项目使用客户端-服务器的模式
- LFTP must include a client side program and a server side program; Client side program can not only send a large file to the server but also download a file from the server.
Sending file should use the following format:
- LFTP lsend myserver mylargefile Getting file should use the following format
- LFTP lget myserver mylargefile The parameter myserver can be a url address or an IP address.
- 本项目,客户端不仅可以向服务器上传大文件,也可以从服务器下载大文件
- LFTP should use UDP as the transport layer protocol.
- 本项目利用UDP来作为传输层协议
- LFTP must realize 100% reliability as TCP;
- 本项目实现类似TCP的100%可靠性,处理了丢包,超时,数据包顺序不一致等问题
- LFTP must implement flow control function similar as TCP;
- 本项目实现了类似TCP的流控制,在接收方维护一个rwnd接收窗口
- LFTP must implement congestion control function similar as TCP;
- 本项目实现了类似TCP的阻塞控制,在发送方维护一个cwnd阻塞窗口
- LFTP server side must be able to support multiple clients at the same time;
- 本项目支持多个用户同时向服务器收发文件,使用了多线程的机制
- LFTP should provide meaningful debug information when programs are
executed.
- 本项目提供了有意义的debug消息来显示发送情况,包括丢包,阻塞等事件的处理
基于UDP的传输过程如下图所示
基于UDP来实现类似TCP的大文件传输
我的代码构建过程
- 利用socket实现简单字符串传送
- 对于文件进行处理,将1的代码改造成文件的传送
- 对于文件进行分段,打包发送
- 发送过程使用多线程
- 服务器与客户端连接的建立与断开,三方握手,四次挥手
- 处理命令行输入命令,与服务器进行交互
- 添加ACK反馈包,建立判断重复ACK机制
- 丢包事件的判断与处理
- 接收方的序列号是否正确,要求重发
- 接收方的流控制,构建接收窗口
- 发送方的阻塞控制
这里的数据包与反馈包结构我采用的是struct结构体,其中一个数据包所带有文件数据为1024bytes,而头文件包括序列号,确认号,文件结束标志,为24bytes。
反馈包包括ack确认,rwnd接收窗口的大小
# 传送一个包的结构,包含序列号,确认号,文件结束标志,数据包
packet_struct = struct.Struct('III1024s')
# 接收后返回的信息结构,包括ACK确认,rwnd
feedback_struct = struct.Struct('II')
BUF_SIZE = 1024+24
FILE_SIZE = 1024
服务器先建立一个主的线程来监听接收客户端到来的命令,将命令接收后传送到server_thread来处理,传递的参数包括客户端的命令参数以及客户端的地址。
···
IP = '192.168.88.129'
SERVER_PORT = 7777
···
def main():
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 绑定端口:
s.bind((IP, SERVER_PORT))
while True:
data,client_addr = s.recvfrom(BUF_SIZE)
# 多线程处理
my_thread = threading.Thread(target=server_thread,args=(client_addr,data))
my_thread.start()
该函数要为每一个客户端新建一个socket,并将命令解析得到客户端所需要的操作
def server_thread(client_addr,string):
# 处理传输过来的str,得到文件名,命令
order = ''
try:
order = string.decode('utf-8').split(',')[0]
file_name = string.decode('utf-8').split(',')[1]
except Exception as e:
return
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
···
除此以外,还需要像TCP建立连接一样来进行三方握手,确认连接才开始发送
客户端先申请连接
# 三方握手
# 发送请求建立连接
s.sendto(data,server_addr)
# 接收连接允许
print(data.decode('utf-8'))
data,server_addr = s.recvfrom(BUF_SIZE)
print('来自服务器', server_addr, '的数据是: ', data.decode('utf-8'))
服务器确认连接
# 文件存在,返回确认信号
s.sendto('连接就绪'.encode('utf-8'),client_addr)
data,client_addr = s.recvfrom(BUF_SIZE)
print('来自',client_addr,'的数据是:',data.decode('utf-8'))
客户端再发送确认包,这时连接就建立完毕
# 第三次握手,确认后就开始接收
data='ACK'.encode('utf-8')
s.sendto(data,server_addr)
下面是四次挥手的过程:
当服务器执行完客户端的命令后,要像tcp一样来执行断开连接操作,断开连接后可以把socket释放掉,即调用close函数
服务器:
print('\n开始中断连接')
# 中断连接,四次挥手
data,server_addr = s.recvfrom(BUF_SIZE)
print(data.decode('utf-8'))
data = 'Server allows disconnection'
s.sendto(data.encode('utf-8'),client_addr)
print(data)
data = 'Server requests disconnection'
s.sendto(data.encode('utf-8'),client_addr)
print(data)
data,server_addr = s.recvfrom(BUF_SIZE)
print(data.decode('utf-8'))
print('The connection between client and server has been interrupted')
s.close()
客户端:
# 中断连接,四次挥手
data = 'Client requests disconnection'
print(data)
s.sendto(data.encode('utf-8'),server_addr)
data,client_addr = s.recvfrom(BUF_SIZE)
print(data.decode('utf-8'))
data,client_addr = s.recvfrom(BUF_SIZE)
print(data.decode('utf-8'))
data = 'Client allows disconnection'
s.sendto(data.encode('utf-8'),server_addr)
print(data)
print('The connection between client and server has been interrupted')
s.close()
说完建立连接与断开连接,回到多线程处理函数,对于lsend,lget函数的判断
当命令是lget时,要判断服务器是否存在该文件,如果不存在则直接返回信息后,关闭socket
若存在该文件直接进入lget函数来进行处理
当命令是lsend时,直接进入lsend函数来进行处理
if order == 'lget':
# 处理文件不存在的情况
if os.path.exists(file_name) is False:
data = 'FileNotFound'.encode('utf-8')
s.sendto(data, client_addr)
# 关闭socket
s.close()
return
# 文件存在,返回确认信号
s.sendto('连接就绪'.encode('utf-8'),client_addr)
data,client_addr = s.recvfrom(BUF_SIZE)
print('来自',client_addr,'的数据是:',data.decode('utf-8'))
lget(s,client_addr,file_name)
elif order == 'lsend':
s.sendto('是否可以连接'.encode('utf-8'),client_addr)
# 等待确认
data,client_addr = s.recvfrom(BUF_SIZE)
print('来自',client_addr,'的数据是:',data.decode('utf-8'))
lsend(s,client_addr,file_name)
lget函数分别在客户端与服务器都各有一个,服务器负责发送功能,客户端负责接收功能。
lsend也类似,只是客户端与服务器的角色对调而已,这里只叙述其中一个函数。
对于数据data,先判断它是否已经到达了文件尾,即data为空。
- 若不为空则发送该data信息,并与ack,seq,end等信息一起打包发送到客户端的地址
- 若为空则发送end=1的数据包,告诉客户端文件已经全部发送完毕
if str(data) != "b''":
end = 0
s.sendto(packet_struct.pack(*(seq,ack,end,data)),client_addr)
else:
end = 1
packet_count+=1
data = 'end'.encode('utf-8')
s.sendto(packet_struct.pack(*(seq,ack,end,data)),client_addr)
break
获取反馈包并unpack来得到接收方的ack,这个ack来确认客户端是否已经成功接收,是否需要重新发送或者序号是否正确等,rwnd则是用来用来处理流控制。
最后显示一些debug信息到控制台,可以查看文件传输的过程
# 发送成功,等待ack
packeted_data,client_addr = s.recvfrom(BUF_SIZE)
unpacked_data = feedback_struct.unpack(packeted_data)
rwnd = unpacked_data[1]
ack = unpacked_data[0]
print('接受自',client_addr,'收到数据为:','rwnd = ', rwnd,
'ack = ', ack,'发送方的数据:cwnd = ', cwnd)
发送方的流控制是要根据接收方返回的rwnd来处理的。
- 若rwnd不为0,证明可以继续正常传输
- 若rwnd为0,证明接收方已经不能处理这么多的数据包,应该停止发送,而是传送一个1字节的包去确认rwnd的变化
# 判断rwnd是否为0
rwnd_zero_flag = False
# 判断是否需要重传
retransmit_flag = False
# 下一个需要传的包的序号seq
current_packet = 1
# 接收窗口未满,正常发送
if rwnd_zero_flag == False:
···
# 接收窗口满了,发确认rwnd的包
else:
# 不需要重传
if retransmit_flag == False:
seq = 0
end = 0
data = 'rwnd'.encode('utf-8')
# 需要重传
else:
ack -= 1
seq -= 1
packet_count -= 1
data = packet_buffer[0]
del packet_buffer[0]
# 暂存下要传输的包,用于重传机制
packet_buffer.append(data)
current_packet = seq
s.sendto(packet_struct.pack(*(seq,ack,end,data)),client_addr)
接收方维护一个rwnd接收窗口,我初始化为50,List是用来缓存接收过来的数据,后面再进行处理,避免接收的延迟导致包的经常丢失。
- 当发现rwnd大于0,则可以继续接收数据包,
- 否则就发送rwnd已经满了的信息返回给服务器。
当接收到服务器的确认rwnd包,客户端会跳过处理这个包,而是告诉服务器rwnd的大小,让发送方继续进行发送操作
# 接收窗口rwnd,rwnd = RcvBuffer - [LastByteRcvd - LastßyteRead]
rwnd = 50
# 空列表用于暂时保存数据包
List = []
# 先将数据加入列表,后面再读取出来
if rwnd > 0:
# 服务端为确认rwnd的变化,会继续发送字节为1的包,这里我设置seq为-1代表服务端的确认
# 此时直接跳过处理这个包,返回rwnd的大小
if unpacked_data[0] == 0:
s.sendto(feedback_struct.pack(*(unpacked_data[0],rwnd)), server_addr)
continue
# 要求序号要连续,否则将该包直接丢弃,等待下一个序号包的到来
if unpacked_data[1] != packet_count-1:
print('客户端接收第',unpacked_data[0],'个包的序号不正确,要求服务器重发')
# 反馈上一个包的ack
s.sendto(feedback_struct.pack(*(unpacked_data[1]-1,rwnd)), server_addr)
continue
# 序号一致,加进缓存
List.append(unpacked_data)
rwnd -= 1
# 接收完毕,发送ACK反馈包
s.sendto(feedback_struct.pack(*(unpacked_data[0],rwnd)), server_addr)
else:
s.sendto(feedback_struct.pack(*(unpacked_data[0],rwnd)), server_addr)
print('客户端已接收第',unpacked_data[0],'个包','rwnd为',rwnd)
关于将缓存中数据写入文件,我采用的是一批批的写入,将缓存中的数据包一次性写入文件中。
# 随机将数据包写入文件,即存在某一时刻不写入,继续接收
random_write = random.randint(1,10)
random_num = random.randint(1,100)
# 40%机率写入文件,读入文件数也是随机数
if random_write > 4:
while len(List) > random_num:
unpacked_data = List[0]
seq = unpacked_data[0]
ack = unpacked_data[1]
end = unpacked_data[2]
data = unpacked_data[3]
del List[0]
rwnd += 1
if end != 1:
f.write(data)
else:
break
这样的随机写入,会出现在所有包接收完毕后,缓存中仍剩下部分数据未写入,所以在接收完毕后要对缓存的剩余做处理,直到最后一个包都写入文件中。
# 处理剩下在List中的数据包
while len(List) > 0:
unpacked_data = List[0]
end = unpacked_data[2]
data = unpacked_data[3]
del List[0]
rwnd += 1
if end != 1:
f.write(data)
else:
break
print('文件接收完成,一共接收了'+str(packet_count),'个包')
这里,我为了模拟丢包,所以在测试的时候设置了随机丢包事件,或者也可以设置一个定时器来,当超过一定的时间后,接收方就认为丢包,没有接收到下一个数据包。
这时,接收方的操作为发送上一个ACK包,重复的ACK包就是在告诉发送方接收不到新的数据包,要求重新发送。
# 设置随机丢包,并通知服务器要求重发
random_drop = random.randint(1,100)
if random_drop <= 5:
print('客户端已丢失第',unpacked_data[0],'个包,要求服务器重发')
# 反馈上一个包的ack
s.sendto(feedback_struct.pack(*(unpacked_data[1]-1,rwnd)), server_addr)
continue
packet_count是接收方期望收到的数据包序号,而传送过来的数据包序号不符合的时候,直接与丢包的处理一致,反馈上一个包的ack,通知发送方需要重新发送该数据包。
# 要求序号要连续,否则将该包直接丢弃,等待下一个序号包的到来
if unpacked_data[1] != packet_count-1:
print('客户端接收第',unpacked_data[0],'个包的序号不正确,要求服务器重发')
# 反馈上一个包的ack
s.sendto(feedback_struct.pack(*(unpacked_data[1]-1,rwnd)), server_addr)
continue
设置一个flag来判断是否需要重传上一个数据包,而这个flag是根据收到重复的ACK包来决定的
# 判断是否需要重传
if ack != current_packet:
print('收到重复的ACK包: ack=',ack)
retransmit_flag = True
else:
retransmit_flag = False
关于重传的机制,我预留了一个buffer,储存上一次发送的数据包,这样当需要重传的时候,我只需要从packet_buffer中拿出来进行传送就行。
# 判断是否需要重传
retransmit_flag = False
# 下一个需要传的包的序号seq
current_packet = 1
# 添加BUFFER暂时存储上一个发送过的包,当丢包发生时执行重传操作
packet_buffer = ['hello']
根据retransmit_flag来判断重传,然后执行读文件还是读buffer。直到接收方成功接收到该数据包,否则会一直进入重传事件。
# 不需要重传
if retransmit_flag == False:
data = f.read(FILE_SIZE)
# 需要重传
else:
packet_count -= 1
print('需要重传的包序号为 seq = ',seq,'出现丢包事件,将cwnd调整为 cwnd = ',threshold)
data = packet_buffer[0]
del packet_buffer[0]
# 暂存下要传输的包,用于重传机制
packet_buffer.append(data)
current_packet = seq
关于阻塞控制部分,我仿照TCP的阻塞函数进行慢启动,快速恢复,线性增长,指数增长等操作。
初始化cwnd为1,阈值初始化为25,congestion_flag用于判断是否遇到阻塞
# 拥塞窗口cwnd,初始化为1,慢启动
cwnd = 1
# 空列表用于暂时保存数据包
List = []
# 拥塞窗口的阈值threshold,初始化为25
threshold = 25
# 判断是否遇到阻塞
congestion_flag = False
# 判断线性增长
linear_flag = False
当遇到阻塞的时候,发送方应该先暂时停止传送,然后更改cwnd与threshold。cwnd为之前的阈值,然后开始线性增长。而阈值变化为当前遭遇阻塞的cwnd的一半
# 线路阻塞,停止发送,线程先休息0.01秒,稍后再继续发送
if congestion_flag == True:
print('传输线路遇到阻塞,将cwnd快速恢复至', cwnd)
# cwnd等于之前的阈值,新阈值等于遭遇阻塞时cwnd的一半
temp = cwnd
cwnd = threshold
threshold = int(temp/2)+1
congestion_flag = True
time.sleep(0.01)
congestion_flag = False
continue
当cwnd小于阈值,慢启动,指数增加。当cwnd大于等于阈值时候,线性增长
# 阻塞控制
# cwnd小于阈值,慢启动,指数增加
if cwnd < threshold and linear_flag == False:
cwnd *= 2
# 否则,线性增加
else:
cwnd += 1
linear_flag = True
以上的模块设计不仅适用于lget函数的客户端与服务器,而且也适用与lsend的服务器与客户端。唯一的不同只是发送方与接收方的角色对调,且地址需要更改,其余操作大致相同,这里就不再重复说明相同的代码设计。
根据上述
- 分段发送
- ACK反馈包接收确认
- 丢包处理
- 序列号正确处理
- 流控制
- 阻塞控制
这几部分的模块就可以在udp的传输协议下执行稳定正确的大文件传输。
处理客户端的输入,我使用了python的正则匹配,获取必要的操作,地址,文件名。将这些信息在后面发送到已经启动了的服务器中,来让服务器响应我的操作。
输入前提示输入规则,若输入错误,也应该提示响应的信息,这是更加友好的界面设计
# 读取输入信息
op = input('Please enter your operation: LFTP [lsend | lget] myserver mylargefile\n')
# 正则匹配
pattern = re.compile(r'(LFTP) (lsend|lget) (\S+) (\S+)')
match =pattern.match(op)
if op:
op = match.group(2)
server_ip = match.group(3)
file_name = match.group(4)
else:
print('Wrong input!')
客户端ip地址为172.18.32.199
服务器ip地址为192.168.88.129
服务器先启动服务:
客户端输入发送命令:
客户端发送完成截图:
分析:文件发送完成,并统计大约发送了5455个包,大致符合文件的大小5424kb,其中包括一些重复的包重发,处理rwnd为0的事件,传输链路遇到阻塞等情况
发送完毕后,执行四次挥手来中断连接,此时客户端的命令就执行完毕。
服务器端的完成情况:
发送完毕后,可以在server.py的当前目录下找到传输过去的test.pdf,且文件内容正确,可正常打开,大小无误
查看服务器端的debug信息:
服务器与客户端发送一样,一共接收了5455个数据包。并且在传输完毕后执行四次挥手的断开连接操作。然后服务器完成该命令后,继续等待别的命令的到来。
对于客户端的debug信息分析:阻塞事件
**在第5423个数据包发送的时候,遇到了线路阻塞,于是执行阻塞控制,将cwnd快速恢复到上次的阈值,然后再线性增长,符合阻塞控制的函数 **
对于客户端的debug信息分析:丢包事件
当发送方收到了重复的ACK包的时候,就会触发数据包重新发送的事件,直到下一个ACK为新值。
该视频长度为3:14,大小约为57MB,能正常播放。
客户端命令:
执行结果:
分析:文件接收成功,一共接收了57622个包。接收完成后,四次挥手中断连接
文件大小正确,且能正确播放
服务器结果:
**客户端也是正常发送,一共发57622个数据包,数量正确。 **
这个测试是用于验证是否可以并发执行,服务器同时处理多个请求命令并正确运行。
服务端文件初始目录:
客户端文件初始目录:
打开三个命令行端口,分别输入命令来测试上传与获取:
测试结果:
并发执行成功,服务器支持多个客户端同时传输或获取文件。
执行后,文件目录截图:
文件成功获取,客户端与服务器都可以正确打开文件,且文件大小一致
本测试是最终测试,测试是否可以进行大文件的传输。
传输过程约用时 30min
客户端命令截图:
客户端执行结果:一共发送1061920个数据包
服务器执行结果: