工具介绍
通过python编写的TCP&UDP协议的客户端和服务端,支持IPV4和IPV6的网络环境,同时新增加客户端ip和端口绑定功能。
client客户端
# coding=utf-8
"""
@项目:djangoProject
@文件:TCP_client
@环境:PyCharm
@作者:Du
@时间:2022/12/6-16:16
"""
import socket
import time
class tuClient:
host1 = ''
port1 = ''
host = ''
port = ''
mode = ''
def __init__(self,mode):
self.m = mode
def tcpC4(self, host, port, host1, port1):
tcpT4Client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpT4Client.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
tcpT4Client.bind((host1, int(port1)))
tcpT4Client.connect((host, int(port)))
print("... TCP IPv4 连接成功...")
while True: # 判断是否退出
send_data = input("请输入要发送的内容:")
tcpT4Client.send(send_data.encode()) # 发送TCP数据
if send_data == "byebye":
break
info = tcpT4Client.recv(1024).decode()
if info == "byebye":
break
else:
print("通过TCP-IPV4,收到服务端",host,port,"的消息:", info)
tcpT4Client.close()
def udpC4(self, host, port, host1, port1):
udpT4Client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpT4Client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udpT4Client.bind((host1, int(port1)))
print("...UDP IPv4 连接成功...")
while True:
time.sleep(1)
udpT4Client.sendto("hello".encode(), (host, int(port)))
print("将 hello 发送到", host,port)
def tcpC6(self, host, port, host1, port1):
tcpT6Client = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
tcpT6Client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcpT6Client.bind((host1, int(port1)))
tcpT6Client.connect((host, int(port)))
print("... TCP IPv6 连接成功...")
while True: # 判断是否退出
send_data = input("请输入要发送的内容:")
tcpT6Client.send(send_data.encode()) # 发送TCP数据
if send_data == "byebye":
break
info = tcpT6Client.recv(1024).decode()
if info == "byebye":
break
else:
print("通过TCP-IPV6,收到服务端",host,port,"的消息:", info)
tcpT6Client.close()
def udpC6(self, host, port, host1, port1):
udpU6Client = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
udpU6Client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udpU6Client.bind((host1, int(port1)))
print("...UDP IPv6 连接成功...")
while True:
time.sleep(1)
udpU6Client.sendto("hello".encode(), (host, int(port)))
print("将 hello 发送到", host,port)
if __name__ == "__main__":
host1 = input("请输入本机客户端ip地址:")
port1 = input("请输入本机客户端端口号:")
host = input("请输入服务端ip地址:")
port = input("请输入服务端端口号:")
mode = input("请输入类型(t4/t6/u4/u6):")
x = tuClient(mode)
if x.m == 't4':
x.tcpC4(host, port, host1, port1)
elif x.m == 't6':
x.tcpC6(host, port, host1, port1)
elif x.m == 'u4':
x.udpC4(host, port, host1, port1)
else:
x.udpC6(host, port, host1, port1)
server服务端
# coding=utf-8
"""
@项目:djangoProject
@文件:TCP_server
@环境:PyCharm
@作者:Du
@时间:2022/12/6-16:17
"""
import socket
class tuServer():
host = ''
port = ''
mode = ''
def __init__(self,mode):
self.m = mode
def serverT4(self, host, port):
tcpT4Server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpT4Server.bind((host, int(port)))
print("***TCP IPV4 服务创建成功,等待客户端连接***")
tcpT4Server.listen(5)
clientSock, clientaddr = tcpT4Server.accept() # 被动接收TCP客户端连接
print("***客户端已经连接***")
while True: # 判断是否退出
info = clientSock.recv(1024).decode() # 接收客户端数据
if info == "byebye":
break
print("通过TCP-IPV4,收到客户端:", clientSock.getpeername(),"的消息:", info)
send_data = input("请输入要发送的内容:")
clientSock.send(send_data.encode()) # 发送TCP数据
if send_data == "byebye":
break
clientSock.close()
tcpT4Server.close()
def udpT4(self, host, port):
udpT4Server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpT4Server.bind((host, int(port)))
print("***UDP-IPv4服务启动***")
while True:
udpT4Data, udpT4ServerInfo = udpT4Server.recvfrom(1024)
print("收到来自:", udpT4ServerInfo, "的消息:", udpT4Data.decode())
def serverT6(self, host, port):
tcpT6Server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
tcpT6Server.bind((host, int(port)))
print("***TCP IPV6 服务创建成功,等待客户端连接***")
tcpT6Server.listen(5)
clientSock, clientaddr = tcpT6Server.accept() # 被动接收TCP客户端连接
print("***客户端已经连接***")
while True: # 判断是否退出
info = clientSock.recv(1024).decode() # 接收客户端数据
if info == "byebye":
break
print("通过TCP-IPV6,收到客户端:", clientSock.getpeername(),"的消息:", info)
send_data = input("请输入要发送的内容:")
clientSock.send(send_data.encode()) # 发送TCP数据
if send_data == "byebye":
break
clientSock.close()
tcpT6Server.close()
def udpT6(self, host, port):
udpT6Server = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
udpT6Server.bind((host, int(port)))
print("***UDP-IPv6服务启动***")
while True:
udpT4Data, udpT6ServerInfo = udpT6Server.recvfrom(1024)
print("收到来自:", udpT6ServerInfo[:2], "的消息:", udpT4Data.decode())
if __name__ == "__main__":
host = input("请输入本机监听服务使用ip地址:")
port = input("请输入监听服务使用端口号:")
mode = input("请输入类型(t4/t6/u4/u6):")
x = tuServer(mode)
if x.m == 't4':
x.serverT4(host, port)
elif x.m == 't6':
x.serverT6(host, port)
elif x.m == 'u4':
x.udpT4(host, port)
else:
x.udpT6(host, port)
python3.x和python2.x代码逻辑一致,只是部分函数写法不一样,只需要修改写法后同样可以在python2.x环境使用
1、python3.x的print()函数有括号,python2.x对应的print函数没有括号
2、python3.x的输入函数使用input(),而在python2.x对应raw_input()函数
3、python3.x和python2.x的编解码差异
4、python2.x的源码.py文件的默认编码方式为ASCII, python3.x的源码.py文件的默认编码方式为UTF-8。因此,如果要在python2.x的.py文件里面写中文,则必须要添加一行声明文件编码的注释(# coding=utf-8),否则python2.x会默认使用ASCII编码
程序运行方式
程序执行命令:python *.py #需在程序对应路径下执行,python环境变量自行配置
使用指导
服务端
运行后按照提示输入对应信息
- 本机监听服务使用IP:指定IP地址则只在指定IP上启用服务,不指定则输入0.0.0.0,会在所有IP地址起监听服务
- 监听服务端口号:指定一个服务端口起监听服务
- 请输入类型(t4/t6/u4/u6):t4表示ipv4的TCP服务,t6表示ipv6的TCP服务,u4表示ipv4的UDP服务,u6表示ipv6的UDP服务
客户端
运行后按照提示输入对应信息
- 请输入本机客户端IP地址:指定使用本机的一个IP地址做客户端IP
- 请输入本机客户端端口号:指定使用一个端口做客户端端口
- 请输入服务端IP地址:需要连接的服务端IP地址
- 请输入服务端端口号:需要连接的服务端端口号
- 请输入类型(t4/t6/u4/u6):t4表示ipv4的TCP服务,t6表示ipv6的TCP服务,u4表示ipv4的UDP服务,u6表示ipv6的UDP服务
注:连接服务端成功需要服务端对应IP和端口启用监听服务
运行结果
连接效果如下所示:
** 验证客户端一对多如下**
服务端:
同时在10.111.14.155和10.111.14.154服务器上启动server服务端程序,监听50009端口
客户端:
同时在10.111.14.205上启用2个客户端,客户端端口均为50009与10.111.14.155和10.111.14.154服务器上的server服务端程序的50009端口建立连接。在10.111.14.205服务器新开一个窗口查看50009端口监听。
版权归原作者 菜鸡选手的成长之路 所有, 如有侵权,请联系我们删除。