源码如下,不做过多讲解:
服务器:
# -*- coding:utf-8 -*- """ 程序由 BY我叫以赏 And Pikachu 制作 程序名称:Python TCP服务器传输文件 (转载注明出处!商用必须由本人许可!) Email:422880152@qq.com """ # 导入所需要的模块 import socket, configparser, os, time, threading # 初始化程序 addr, port = '', 0 UserList = [] Server_run = False # 服务器是否启动 Server = socket.socket() # 初始化服务器类 WorkDir = os.getcwd() # 获取运行目录 ConfigFile = WorkDir + '\\FileSendConfig.ini' # 配置文件路径 logfile = WorkDir + '\\FileSendConfig.log' # 日志文件路径 config = configparser.ConfigParser() # 初始化配置文件读写类 # 初始化日志文件 if os.path.exists(logfile): fopen_log = open(logfile, 'a+', encoding='utf-8') # 打开文件 else: fopen_log = open(logfile, 'w+', encoding='utf-8') # 打开文件 def read_config(): """ 此函数用于读写配置 """ global addr # 声明为全局变量 global port # 声明为全局变量 global fopen_log # 声明为全局变量 try: if os.path.exists(ConfigFile): config.read(ConfigFile) # 将配置文件内容读到内存中 addr = config.get('basic', 'addr') # 读入地址 port = int(config.get('basic', 'port')) # 读入端口 else: config.add_section('basic') # 在内存中增加一个类 config.set('basic', 'addr', '0.0.0.0') # 把addr项放到这个类中 config.set('basic', 'port', '23456') # 把port项放到这个类中 input('您仿佛是第一次使用本软件!我们需要进行一些配置,请回车进入下一步![Please Enter]') # 提示用户 addr = input('请输入服务器绑定的地址(默认绑定为 0.0.0.0,注意:服务器地址会影响服务器正常启动):') # 引导输入服务器地址 if addr.strip() == '': # strip()即是删除文本首尾空 addr = '0.0.0.0' # 默认绑定地址 print('设置:addr(地址) => ' + addr) while True: port = input('请输入服务器的端口(默认设置为 23456,注意:服务器端口影响客户端访问的地址):') # 引导用户输入端口 if not port: port = '23456' # 默认输入 try: if port.isdigit() and len(port) <= 6: # 是否输入整数 port = int(port) # 尝试将端口转化为整数型 else: raise BaseException('输入不正确') # 实现错误 except: print('输入不正确!请输入整数型的端口,最大长度为6位!') continue # 无法转化 即就是输入非数字 else: break # 输入正确跳出循环 print('设置:port(端口) => ' + str(port)) with open(ConfigFile, 'w') as fopen: # 初始写入没有文件所以使用 w config.write(fopen) # 将内存中的数据写出 except BaseException as error: print('执行读写配置项时出现错误!错误原因:' + str(error)) # 报错提示 return False else: print('配置完成!正在执行主程序中......') # 配置完成提示 return True def get_time(): """ 此函数用于获取时间 """ now = time.localtime() # 获取当前时间快照 return '%d/%d/%d %d:%d:%d' % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec) # 将快照格式化 def log(Text): """ 写日志 """ global fopen_log # 声明为全局变量 try: fopen_log.write('[' + get_time() + '] ' + Text + '\n') # 写日志 fopen_log.flush() # 实时输出 except: pass # 错误代码段 def logprint(Text): log(Text) # 写日志 print(Text) # 输出 def del_user(userip): """ 删除在用户列表里的数据 """ global UserList # 声明为全局变量 for x in range(len(UserList)): if UserList[x][1] == userip: UserList.pop(x) return def accept_Client(): global UserList, Server # 声明为全局变量 Fragment_Size = 1024*1024*10 # 分片大小 10 MB传输 Fopen = None Fopen_size = 0 try: user, ip = Server.accept() # 接待客户 这段代码会占线 except: return UserList.append((user, ip)) # 把用户添加到用户列表 log('用户上线获得的用户信息: IP为%s 端口为%d' % (ip[0], ip[1])) # 写日志 run_Thread = threading.Thread(target=accept_Client) # 在启动一个接待线程 run_Thread.start() # 启动接待线程 log('新的接待线程已启动 线程ID:' + str(run_Thread.ident)) # 写日志 try: while True: data = user.recv(1024 * 1024 * 100) # 接收用户数据 if not data: # 没有发送用户数据 即为用户下线 del_user(ip) # 将用户列表删除用户 log('用户下线!用户IP为%s 端口为%d' % (ip[0], ip[1])) # 用户下线 break data_split = data.split('|Pikaqui|wojiaoyishang|'.encode('utf-8')) # 分割客户端发送的数据 if len(data_split) < 2: # 分割未达到要求 continue head = data_split[0].decode('utf-8') # 头部 body = data_split[1] # 身体 try: # 执行命令出错则继续 if head == 'dir': path = body.decode('utf-8') # 客户端将要获取的路径 path = {'\\': path[:len(path) - 1]}.get(path[:len(path) - 1], path) # 修正显示 判断结尾是否有“\”有则去掉 log('客户' + str(ip) + ': dir ' + path) file_list = [] # 初始化列表 for x in os.listdir(path): # 判断是否为文件夹 if os.path.isdir(path + '\\' + x): # 判断是否为文件夹 file_list.append((x, True, os.path.getsize(path + '\\' + x))) else: file_list.append((x, False, os.path.getsize(path + '\\' + x))) send(user, 'filelist', str(file_list).encode('utf-8')) # 发送 if head == 'size': path = body.decode('utf-8') # 客户端将要获取的文件路径 if os.path.exists(path) and os.path.isdir(path) == False: send(user, 'size', str(os.path.getsize(path)).encode('utf-8')) else: send(user, 'error', '文件不存在或者输入了目录名称!'.encode('utf-8')) if head == 'open': path = body.decode('utf-8') # 客户端将要打开的文件路径 try: Fopen = open(path, 'rb+') except BaseException as error: send(user, 'error', str(error).encode('utf-8')) else: send(user, 'open', 'File opened'.encode('utf-8')) if head == 'get': now = int(body.decode('utf-8')) Fopen.seek(now,0) send(user, 'get', Fopen.read(Fragment_Size)) except BaseException as error: send(user, 'error', str(error).encode('utf-8')) continue return except: if not ip == False: del_user(ip) # 将用户列表删除用户 log('用户下线!用户IP为%s 端口为%d' % (ip[0], ip[1])) # 用户下线 return def send(user, head, body): """ 给客户端发送数据 """ return user.send( (head + '|Pikaqui|wojiaoyishang|').encode('utf-8') + body) def open_Server(): global Server_run, Server try: Server.bind((addr, port)) # 将服务器绑定在本机指定地址与端口上 logprint('服务器已绑定在本机 %s:%d 上......' % (addr, port)) # 提示 如果不成功 则会跳到 except 语段 Server.listen() # 开启服务器监听 logprint('成功开始服务器监听......') # 提示 run_Thread = threading.Thread(target=accept_Client) # 初始化线程 run_Thread.start() # 启动线程 log('初始接待线程已启动 线程ID:' + str(run_Thread.ident)) # 写入认真但是不输出 Server_run = True except BaseException as error: logprint('在执行服务器启动的时候出现错误!错误原因:' + str(error)) # 错误输出 else: print('Server:服务器已开启') return def print_user(): global UserList if len(UserList) != 0: print('-' * 100) for x in range(len(UserList)): print('%d:[IP]%s [Port]%d' % (x + 1, UserList[x][1][0], UserList[x][1][1])) print('-' * 100) else: print('Server:没有用户连接!') return def print_log(): global logfile print('-' * 100) with open(logfile, 'r', encoding='utf-8') as temp_log: print(temp_log.read(os.path.getsize(logfile))) print('-' * 100) if __name__ == '__main__': print('※ 程序由 @我叫以赏 And Pikachu 制作 ※ TCP文件传输 ※ 更多信息请输入5查看 ※') log('程序开始运行,准备执行操作......') # 写初始日志 if read_config() == False: # 如果处理错误那么直接关闭程序 os.kill(os.getpid(), 1) # 杀死自身进程 第二个参数是退出代码 open_Server() while True: try: Enter_id = '' print( """ \r--------------------------------------- \r1.启动或开始服务器 | 2.查看连接用户列表 \r3.查看程序运行日志 | 4.更改服务器的设置 \r5.关于此服务器程序 | 6.关闭服务器并退出 \r--------------------------------------- """ ) while Enter_id.strip() == '': Enter_id = input('>>>') # 判断Enter_id 允许多样性判断 if Enter_id in ('1', '一', '壹', '启动或开始服务器', 'a', 'A'): Enter_id = input('确定要%s服务器吗?[y/n(other)]>>>' % ({True: '关闭', False: '启动'}.get(Server_run, False))) if Enter_id in ('y', 'Y', '1'): if Server_run: try: Server.close() except: pass Server_run = False print('Server:服务器已停止') log('停止服务器') else: Server = socket.socket() # 初始化服务器类 open_Server() elif Enter_id in ('2', '二', '贰', '查看连接用户列表', 'b', 'B'): print_user() elif Enter_id in ('3', '三', '叁', '查看程序运行日志', 'c', 'C'): print_log() elif Enter_id in ('4', '四', '肆', '更改服务器的设置', 'd', 'D'): # ----设置配置---- print('接下来,我们会进行更改您的配置项操作!为空则不修改,配置项修改后会在您下一次启动此程序时生效!') print('[basic]') new_addr = input('addr(服务器绑定地址) = ') if new_addr.strip() == '': new_addr = addr print('addr(服务器绑定地址)没有任何变化!') else: print('addr(服务器绑定地址) => ' + new_addr) new_port = input('port(服务器绑定端口) = ') if new_port.strip() == '': new_port = port print('port(服务器绑定端口)没有任何变化!') elif new_port.isdigit(): print('port(服务器绑定端口) => ' + new_port) else: new_port = port print('port设置未生效,可能是输入错误!') config.set('basic', 'addr', new_addr) # 把addr项放到这个类中 config.set('basic', 'port', new_port) # 把port项放到这个类中 with open(ConfigFile, 'r+') as fopen: # 写入已有文件所以使用 r+ config.write(fopen) # 将内存中的数据写出 elif Enter_id in ('5', '五', '伍', '关于此服务器程序', 'help', '?', 'e', 'E'): try: print(__import__('requests').get('https://lovepikachu.top/copyright.txt').content.decode( 'utf-8')) # 版权文字 except: print( '※ Python TCP服务器传输文件 --- 程序由 BY我叫以赏 And Pikachu 制作 ※ 未经允许禁止商用 ※ 个人网站:https://lovepikachu.top ※') else: print('※ Python TCP服务器传输文件 --- 程序由 BY我叫以赏 And Pikachu 制作 ※') elif Enter_id in ('6', '六', '陆', '关闭服务器并退出', 'f', 'F'): # 退出程序 try: Server.close() except: pass Server_run = False log('退出程序') os.kill(os.getpid(), 0) break else: print('*错误的输入,请输入阿拉伯数字,输入5查看使用教程!') except KeyboardInterrupt: print('\n※您结束了程序※') log('退出程序') os.kill(os.getpid(), 0) # 杀死自身进程 第二个参数是退出代码 except BaseException as error: pass
客户端:
# -*- coding:utf-8 -*- # 初始化 import socket, os WorkDir = os.getcwd() # 获取运行目录 Client_connect = False Client = socket.socket() # 初始化客户端类 Client.close() help_text = """ 命令 效果 help 显示帮助文字 exit 退出命令模式 dir [目录名] 查看服务器目录文件 download 进入下载模式 debug [命令] 测试命令返回服务器原数据 """ def rettextsize(b): """ 将字节容量单位处理加单位 如 1024 B ---> 1KB :param b: 输入没有标记的内容单位(以B为单位) :return: 返回处理好的容量加单位 """ kb = b / 1024 mb = kb / 1024 gb = mb / 1024 tb = gb / 1024 if int(tb) < 1024 and int(tb) > 0: return str(round(tb, 2)) + ' TB' elif int(gb) < 1024 and int(gb) > 0: return str(round(gb, 2)) + ' GB' elif int(mb) < 1024 and int(mb) > 0: return str(round(mb, 2)) + ' MB' elif int(kb) < 1024 and int(kb) > 0: return str(round(kb, 2)) + ' KB' else: return str(b) + ' B' return str(b) + ' B' def data_split(data): split = data.split('|Pikaqui|wojiaoyishang|'.encode('utf-8')) if len(split) == 2: return split[0].decode('utf-8'), split[1] else: return 'error' def send(head, body): Client.send((head + '|Pikaqui|wojiaoyishang|').encode('utf-8') + body) if __name__ == '__main__': print('※ 程序由 @我叫以赏 And Pikachu 制作 ※ TCP文件传输 ※ 更多信息请输入3查看 ※') while True: try: Enter_id = '' print( """ \r--------------------------------------- \r1.连接或断开服务器 | 2.让服务器执行命令 \r3.关于此客户端程序 | 4.断开服务器并退出 \r--------------------------------------- """ ) while Enter_id.strip() == '': Enter_id = input('>>>') if Enter_id in ('1', '一', '壹', '连接或断开服务器', 'a', 'A'): # -----连接----- if Client_connect == False: Client = socket.socket() # 初始化客户端类 addr = input('请输入服务器地址(addr) = ') port = input('请输入服务器端口(port) = ') if port.isdigit(): print('Client:尝试连接中......') try: Client.connect((addr, int(port))) print('Client:连接成功!已在服务器上线!') Client_connect = True except BaseException as error: print('Client:连接时出现了 %s 的错误!' % (error)) else: print('Client:连接出错!端口配置不正确!') else: Client_connect == False try: Client.close() except: pass print('Client:已断开连接!') elif Enter_id in ('2', '二', '贰', '让服务器执行命令', 'b', 'B') and Client_connect: # --- 执行命令 --- print('Client:已进入命令模式,输入help查看帮助,输入exit退出命令模式!') while True: cmd = '' Scmd = [] while cmd.strip() == '': cmd = input('command>>>') cmd = cmd.strip() Scmd = cmd.split(' ') if cmd == 'exit': print('※ 已退出命令模式 ※') break elif cmd == 'help': print(help_text) elif cmd == 'download': # ------- 下载文件 -------- ServerFlie = input('请输入远程服务器文件路径(完整路径):') SaveName = input('请输入本地保存名称:' + WorkDir + '\\') LocalFlie = WorkDir + '\\' + SaveName print('正在与服务器通讯并获取文件信息......') try: send('size', ServerFlie.encode('utf-8')) data = data_split(Client.recv(1024 * 1024 * 100)) FileSize = int(data[1].decode('utf-8')) if data[0] == 'error': raise BaseException(data[1].decode('utf-8')) else: print('正在打开服务器文件......') send('open', ServerFlie.encode('utf-8')) try: data = data_split(Client.recv(1024 * 1024 * 100)) if data[0] == 'error': raise BaseException('无法打开文件!') except: print('无法打开服务器文件!') else: print('开始下载文件.......') Completed_Size = 0 # 已完成大小 with open(LocalFlie, 'wb+') as Fopen: while Completed_Size < FileSize: send('get', str(Completed_Size).encode('utf-8')) data = data_split(Client.recv(1024 * 1024 * 100)) if data[0] == 'get': Fopen.write(data[1]) Fopen.flush() Completed_Size = Completed_Size + len(data[1]) print('\r下载进度:'+str(round(Completed_Size/FileSize*100,2))+'%',end="") print('\n') except BaseException as error: print('文件获取失败!返回的失败原因:' + str(error)) continue else: if len(Scmd) >= 2: # ------获取目录------ if Scmd[0].lower() == 'dir': temp_src = Scmd[1] for x in range(2, len(Scmd)): temp_src = temp_src + ' ' + Scmd[x] send('dir', temp_src.encode('utf-8')) try: data = data_split(Client.recv(1024 * 1024 * 100)) if data[0] == 'filelist': temp_dir = data[1].decode('utf-8') temp_dir = eval(temp_dir) print('查看目录下有%d个文件:' % (len(temp_dir))) print('%-5s%-20s%-50s' % ('类型', '大小', '名称')) for x in temp_dir: print('%-5s%-20s%-50s' % ( {True: '目录', False: '文件'}.get(x[1], '文件'), rettextsize(x[2]), x[0])) elif data[0] == 'error': print('Client服务器返回错误:' + data[1].decode('utf-8')) except: print('Client:获取失败!服务器返回了错误的数据!') elif Scmd[0].lower() == 'debug': temp_cmd = Scmd[2] for x in range(3, len(Scmd)): temp_cmd = temp_cmd + ' ' + Scmd[x] send(Scmd[1], temp_cmd.encode('utf-8')) try: data = data_split(Client.recv(1024 * 1024 * 100)) print('head:' + data[0] + '\n' 'body:' + str(data[1]) + '\n' 'body(文本):' + data[1].decode( 'utf-8')) except: print('Client:出现错误!') elif Enter_id in ('3', '三', '叁', '关于此客户端程序', 'c', 'C'): try: print(__import__('requests').get('https://lovepikachu.top/copyright.txt').content.decode( 'utf-8')) # 版权文字 except: print( '※ Python TCP服务器传输文件 --- 程序由 BY我叫以赏 And Pikachu 制作 ※ 未经允许禁止商用 ※ 个人网站:https://lovepikachu.top ※') else: print('※ Python TCP服务器传输文件 --- 程序由 BY我叫以赏 And Pikachu 制作 ※') elif Enter_id in ('4', '四', '肆', '断开服务器并退出', 'd', 'D'): os.kill(os.getpid(), 0) else: print('*输入错误或者还未连接服务器!') except KeyboardInterrupt: print('\n※您结束了程序※') os.kill(os.getpid(), 0) # 杀死自身进程 第二个参数是退出代码 break except BaseException as error: continue