抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

python网络编程一

Socket 套接字

Socket套接字。Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。
Python中标准库中提供了socket模块。socket模块中也提供了socket类,实现了对底层接口的封装,socket模块是非常底层的接口库;

  • socket类定义为
socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)

协议族

AF表示Address Family, 用于socket()第一个参数;

名称 含义
AF_INET IPV4
AF_INET6 IPV6
AF_UNIX Unix Domain Socket, Windows(Windows 下不支持)

Socket类型

名称 含义
SOCK_STREAM 面向连接的流套接字。
SOCK_DGRAM 默认值,UDP协议无连接的数据报文套接字。

TCP协议是流协议,也就是一大段数据看做字节流,一段段持续发送这些字节。
UDP协议是数据报协议,每一份数据封在一个单独的数据报中,一份一份发送数据。

socket 常用方法

socket类创建出socket对象,这个对象常用的方法如下

名称 含义
socket.recv(bufsize[, flags]) 获取数据。默认是阻塞的方式。
socket.recvfrom(bufsize[, flags]) 获取数据,返回一个二元组(bytes, address)。
socket.recv_into(buffer[, nbytes[, flags]]) 获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或为0,将buffer大小的数据存入buffer中。返回接收的字节数。
socket.recvfrom_into(buffer[, nbytes[, flags]]) 获取数据,返回一个二元组(bytes, address)到buffer中。
socket.send(bytes[, flags]) TCP发送数据,发送成功返回发送字节数。
socket.sendall(bytes[, flags]) TCP发送全部数据,成功返回None。
socket.sendto(string[, flags], address) UDP发送数据。
socket.sendfile(file, offset=0, count=None) 发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果在Windows下不支持sendfile,或者不是普通文件,使用send()发送文件。offset告诉起始位置。从Python 3.5版本开始。
名称 含义
socket.getpeername() 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。
socket.getsockname() 返回套接字自己的地址。通常是一个元组(ipaddr,port)。
socket.setblocking(flag) 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起 socket.error 异常。
socket.settimeout(value) 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 connect())。
socket.setsockopt(level,optname,value) 设置套接字选项的值。比如缓冲区大小。太多了,去看文档。不同系统,不同版本都不尽相同。

TCP

C/S编程

Socket 编程是一种完成一端和另一端通信的编程方式,通常这两端分别处在不同的进程中,实现网络通信。在 Socket 编程中,每个 socket 对象表示了通信的一端。

从业务角度来看,通信的两端分别具有不同的角色:

  • 客户端(Client):主动发送请求的一端,负责向服务端发起通信请求。
  • 服务端(Server):被动接受请求并回应的一端,负责监听客户端的连接并处理请求。

这种编程模式通常被称为 C/S 编程(Client/Server Programming),在网络应用中被广泛应用。

服务器端编程步骤

  1. 创建 Socket 对象

    • 创建用于网络通信的 Socket 对象。
  2. 绑定 IP 地址和端口

    • 使用 bind() 方法将 IP 地址和端口绑定到 Socket 对象。
    • IPv4 地址表示为一个二元组,包含 IP 地址字符串和端口号。
  3. 开始监听

    • 使用 listen() 方法在指定的 IP 地址和端口上开始监听连接请求。
  4. 接受客户端连接

    • 使用 accept() 方法阻塞等待客户端建立连接。
    • accept() 方法返回一个新的 Socket 对象和客户端地址的二元组。
    • 客户端地址通常表示为远程客户端的 IP 地址和端口号。
  5. 接收数据

    • 使用新建立的 Socket 对象的 recv(bufsize, flags) 方法接收数据。
    • 可以指定缓冲区大小来接收数据。
  6. 发送数据

    • 使用新建立的 Socket 对象的 send(bytes) 方法发送数据。

import socket
import sys 
server = socket.socket()
addr = '127.0.0.1', 9999
server.bind(addr) # 没有端口
server.listen() # netstat ss -l # 队列长度
print(server,file=sys.stderr)
newsock,raddr = server.accept() # 阻塞方法

print(newsock.getpeername())
print(newsock.getsockname())
newsock.send(b'hello')
data = newsock.recv(1024) # 也是一个新的阻塞方法
print(type(data),data)
msg = "data message is {}".format(data)
newsock.send(msg.encode())

s1, radd1 = server.accept() # 允许一个连接
data = s1.recv(1024)
print(data, '~~~~~~~~~~~~~~~')

server.close()

# 可以使用telnet 连接本地的9999端口进行测试
  • socket 示例,实现多人群聊的后端服务
# 实现一个群聊工具server端

import socket
import threading
import logging
import time 

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

# 暂时实现了一个echo server
class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.addr = ip, port # 服务器的地址
        self.sock = socket.socket() # 创建一个socket对象
        self.event = threading.Event() # 事件对象
        self.clients = {} # 用于存储客户端的连接
        self.lock = threading.Lock() # 锁对象

    def start(self):
        self.sock.bind(self.addr) # 绑定地址
        self.sock.listen() # 监听

        threading.Thread(target=self.accept, name='accept').start() # 启动一个线程,用于接收客户端的连接

    def accept(self): # 接收客户端的连接
        count = 1 # 用于记录客户端的数量
        while not self.event.is_set(): # 事件对象没有被设置
            try: 
                newsock, raddr = self.sock.accept() # 接收客户端的连接
            except ConnectionAbortedError: # 客户端连接被中断
                logging.error('client is closed')
                continue
            self.clients[raddr] = newsock # 存储客户端的连接
            logging.info(newsock) # 打印客户端的连接
            threading.Thread(target=self.recv, name=f'recv-{count}',args=(newsock, raddr)).start() # 启动一个线程,用于接收客户端的消息
            count += 1 # 客户端的数量加1


    def recv(self,sock,raddr):
        while not self.event.is_set(): # 事件对象没有被设置 
            try:
                data = sock.recv(1024) # 接收客户端的消息
            except Exception as e:
                logging.error(e)
                data = b''
            if data.strip().rstrip() == b'' or data.strip().rstrip() == b'quit': # 客户端发送了空消息或者quit 
                with self.lock: # 锁住
                    self.clients.pop(raddr) # 删除客户端的连接
                logging.info("{} bye.".format(raddr)) # 打印客户端的地址
                sock.close() # 关闭客户端的连接
                break
            msg = "from {}: {}. data = {}".format(*raddr, data) # 消息

            logging.info(msg) # 打印消息
            with self.lock: # 锁住
                for s in self.clients.values(): # 遍历客户端的连接
                    s.send(msg.encode()) # 发送消息

    def stop(self):
        self.event.set()  # 设置事件对象,停止接收新连接
        with self.lock:  # 锁住,以确保线程的安全
            for s in self.clients.values():  # 遍历客户端的连接
                s.close()  # 关闭连接
            self.clients.clear()  # 清空客户端连接字典
        self.sock.close()  # 关闭服务器的连接


if __name__ == '__main__':  
    cs = ChatServer() # 创建ChatServer对象
    cs.start() # 启动服务
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate()) # 打印线程的信息
        logging.info(cs.clients) # 打印客户端的连接
# 使用telnet 可以连接到服务器进行测试

MakeFile



# 实现一个群聊工具server端

import socket
import threading
import logging
import time 

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

# 暂时实现了一个echo server
class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.addr = ip, port # 服务器的地址
        self.sock = socket.socket() # 创建一个socket对象
        self.event = threading.Event() # 事件对象
        self.clients = {} # 用于存储客户端的连接
        self.lock = threading.Lock() # 锁对象

    def start(self):
        self.sock.bind(self.addr) # 绑定地址
        self.sock.listen() # 监听

        threading.Thread(target=self.accept, name='accept').start() # 启动一个线程,用于接收客户端的连接

    def accept(self): # 接收客户端的连接
        count = 1 # 用于记录客户端的数量
        while not self.event.is_set(): # 事件对象没有被设置
            try: 
                newsock, raddr = self.sock.accept() # 接收客户端的连接
                f = newsock.makefile('rw') # 读写都使用文本
            except ConnectionAbortedError: # 客户端连接被中断
                logging.error('client is closed')
                continue
            with self.lock:
                self.clients[raddr] = f,newsock # 存储客户端的连接
            logging.info(newsock) # 打印客户端的连接
            threading.Thread(target=self.recv, name=f'recv-{count}',args=(f, raddr)).start() # 启动一个线程,用于接收客户端的消息
            count += 1 # 客户端的数量加1


    def recv(self,f,raddr):
        while not self.event.is_set(): # 事件对象没有被设置 
            try:
                #data = sock.recv(1024) # 接收客户端的消息
                data = f.readline().strip() # 读取一行 \n 带有换行符
            except Exception as e:
                logging.error(e)
                data = ''
            if data == '' or data == 'quit': # 客户端发送了空消息或者quit 
                with self.lock: # 锁住
                    _, sock = self.clients.pop(raddr) # 弹出客户端的连接
                    sock.close()
                    f.close()
                break
            msg = "from {}: {}. data = {}".format(*raddr, data) # 消息

            logging.info(msg) # 打印消息
            with self.lock: # 锁住
                for ff, _ in self.clients.values(): # 遍历客户端的连接
                    # s.send(msg.encode()) # 发送消息
                    ff.write(msg)
                    ff.flush()

    def stop(self):
        self.event.set()  # 设置事件对象,停止接收新连接
        with self.lock:  # 锁住,以确保线程的安全
            for s in self.clients.values():  # 遍历客户端的连接
                s.close()  # 关闭连接
            self.clients.clear()  # 清空客户端连接字典
        self.sock.close()  # 关闭服务器的连接


if __name__ == '__main__':  
    cs = ChatServer() # 创建ChatServer对象
    cs.start() # 启动服务
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate()) # 打印线程的信息
        logging.info(cs.clients) # 打印客户端的连接
# 使用telnet 可以连接到服务器进行测试

客户端的实现

import socket
import threading
import logging
import termcolor

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

# 实现一个群聊工具客户端

class ChatClient:
    def __init__(self, rip='127.0.0.1', rport=9999): # 初始化, 默认连接服务器的地址
        self.raddr = rip, rport # 服务器的地址
        self.sock = socket.socket() # 创建一个socket对象
        self.event = threading.Event() # 事件对象

    def start(self):
        try: 
            self.sock.connect(self.raddr) # 连接服务器
            self.sock.send(b'hello\r\n') # 发送消息
            logging.info(self.sock) # 打印连接
            threading.Thread(target=self.recv, name='recv').start()# 启动一个线程,用于接收服务器的消息
        except Exception as e:
            logging.error(e) # 打印错误
    def recv(self):
        while not self.event.is_set():
            try:
                data = self.sock.recv(1024) # 接收服务器的消息
                msg = 'data={}'.format(data) # 消息
                self.sock.send(data) # 发送消息
                logging.info(termcolor.colored(msg, 'blue')) #打印消息   
            except Exception as e:
                logging.error(e) # 打印错误
    def send(self, msg:str):
        data = "{}\n".format(msg).encode() # 发送的消息
        print(data) # 打印消息
        self.sock.send(data)
    def stop(self):
        self.event.set()
        self.sock.close()

def inter(client:ChatClient):
    while not client.event.is_set(): # 事件对象没有被设置
        cmd = input(termcolor.colored('>>>', 'green')).strip() # 输入命令
        if cmd == 'quit': 
            client.stop() # 停止客户端
            break
        client.send(cmd) # 发送退出消息q

def main():
    cs = ChatClient()
    try:
        cs.start()
        threading.Thread(target=inter, name='inter', args=(cs,)).start()
    except Exception as e:
        logging.error(e)

if __name__ == '__main__':
    main()
# 可以python 在多个控制台启动测试    

SocketServer

  • socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层API进行封装,Python的封装就是socketserver模块。它是网络服务编程框架,便于企业级快速开发;

类的继承关系

+------------+        
| BaseServer |    
+------------+
      |                       
      V                        
+------------+             +-------------------+
| TCPServer  | --------->  |  UnixStreamServer |
+------------+             +-------------------+
      |                           |
      V                           V
+------------+             +---------------------+
| UDPServer  | --------->  | unixDatagramServer  |
+------------+             +---------------------+
  • SocketServer简化了网络服务器的编写

  • SocketServer 提供了简化网络服务器编写的工具,包括以下同步类和 mixin 类:

同步类:

  1. TCPServer:TCP 协议的服务器类。
  2. UDPServer:UDP 协议的服务器类。
  3. UnixStreamServer:基于 Unix 套接字的流式服务器类。
  4. UnixDatagramServer:基于 Unix 套接字的数据报服务器类。

Mixin 类:

  1. ForkingMixIn:用于支持基于多进程的并发。
  2. ThreadingMixIn:用于支持基于多线程的并发。

混合类:

  • 通过组合同步类和 mixin 类,可以得到不同类型的服务器:
    • ForkingUDPServer(ForkingMixIn, UDPServer)
    • ForkingTCPServer(ForkingMixIn, TCPServer)
    • ThreadingUDPServer(ThreadingMixIn, UDPServer)
    • ThreadingTCPServer(ThreadingMixIn, TCPServer)

说明:

  • ForkingMixIn 用于创建多进程服务器,而 ThreadingMixIn 用于创建多线程服务器。
  • 需要注意,fork 操作需要操作系统的支持,而 Windows 平台不支持 fork.
import socketserver

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print("=" * 30)
        print(self.request) # 与客户端通信的socket对象
        print(self.client_address)
        print(id(self.server), self.server)
        print("=" * 30)

server = socketserver.TCPServer(('127.0.0.1',9999),MyHandler)
print(id(server))
server.handle_request()

# 4298556464
# ==============================
# <socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 56343)>
# ('127.0.0.1', 56343)
# 4298556464 <socketserver.TCPServer object at 0x10036c430>
# ==============================

BaseRequestHandler类

BaseRequestHandler 类是用于处理用户连接和请求的基类,定义如下:

BaseRequesthandler (request, client_address, server)
当服务端 Server 实例接收到用户请求时,会实例化这个类。它被初始化时,会传入三个构造参数:requestclient_addressserver

BaseRequestHandler 类的实例上,可以使用以下属性:

  • self.request:与客户端的连接的 socket 对象。
  • self.server:TCPServer 实例本身。
  • self.client_address:客户端地址。
    这个类在初始化时,会依次调用三个方法。子类可以覆盖这些方法。

创建服务器的步骤总结

创建服务器的过程通常包括以下几个步骤:

  1. 创建请求处理程序类:

    • BaseRequestHandler 类派生出子类,并重写其 handle() 方法,用于处理传入的请求。
  2. 实例化服务器类:

    • 实例化一个服务器类,通常是 TCPServerUDPServer 或其衍生类。
    • 传入服务器的地址和请求处理程序类作为参数。
  3. 启动服务器:

    • 调用服务器实例的 handle_request() 方法,或者使用 serve_forever() 方法启动服务器并持续监听请求。
    • 服务器会开始接受并处理传入的请求。
  4. 关闭服务器:

    • 在适当的时候,调用 server_close() 方法关闭服务器的套接字,释放相关资源。

通过以上步骤,可以创建一个简单的服务器,用于处理客户端的请求。

总结

在使用 socketserver 模块创建服务器时,通常会遵循以下步骤和原则:

  1. 为每个连接提供 RequestHandlerClass 实例:

    • 为每个连接创建一个请求处理程序类的实例,该类通常是从 BaseRequestHandler 类派生而来的。
    • 该实例会依次调用 setup()handle()finish() 方法,以处理连接的请求。
    • 使用 try..finally 结构确保 finish() 方法一定被调用,即使出现异常情况。
  2. 循环处理连接:

    • 如果想要保持与客户端的通信,需要在 handle() 方法中使用循环,持续处理来自客户端的请求。
  3. 简化编程接口:

    • socketserver 模块提供了多种类,如 TCPServerUDPServer 以及多进程、多线程的类等。
    • 尽管类别不同,但编程接口是一致的,这大大简化了编程的复杂度。
    • 程序员只需专注于实现请求处理程序类(Handler 类),而不必担心底层的网络通信细节。

通过以上步骤和原则,可以有效地利用 socketserver 模块来创建服务器,并简化网络编程的过程,使程序员能够更专注于业务逻辑的实现。

import socketserver
import time 
import threading   

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print("=" * 30)
        print(self.request) # 与客户端通信的socket对象
        print(self.client_address)
        print(id(self.server), self.server)
        print("=" * 30)
        while True:
            data = self.request.recv(1024)
            msg = "from {}: {}. data = {}".format(*self.client_address, data)
            self.request.send(msg.encode())

server = socketserver.ThreadingTCPServer(('127.0.0.1',9999),MyHandler)
print(id(server))
server.serve_forever()

threading.Thread(target=server.serve_forever,name='serve').start()

while True:
    cmd = input('>>')
    if cmd == 'quit':
        server.server_close()
    print(threading.enumerate())
  • 使用socketserver 改写服务端

# 用socketserver 改写ChatServer

import socketserver
import threading
from socketserver import BaseRequestHandler, StreamRequestHandler
import termcolor
import logging

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'

logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatHandler(StreamRequestHandler): # 继承StreamRequestHandler,重写handle方法,用于处理客户端的连接,读写数据
    clients = {} # 用于存储客户端的连接
    def setup(self) -> None:
        super().setup()
        self.event = threading.Event() 
        self.clients[self.client_address] = self.wfile # 存储客户端的连接
        self.clock = threading.Lock() # 锁对象
    def handle(self) -> None:
        super().handle()
        while not self.event.is_set():
            data = self.rfile.readline().strip() # 读取一行 \n 带有换行符
            if data == b'' or data == b'quit':
                with self.clock: # 锁住
                    self.wfile.close()   # 关闭文件
                    self.clients.pop(self.client_address) # 弹出客户端的连接
                break
            msg = "from {}: {}. message = {}".format(*self.client_address, data)
            logging.info(termcolor.colored(msg, 'blue'))
            with self.clock:
                for c in self.clients.values():
                    c.write(msg.encode())
                    c.flush()

    def finish(self) -> None:
        super().finish()
        self.event.set()
        with self.clock:
            if self.client_address in self.clients:
                self.clients.pop(self.client_address)
                self.wfile.close()



class ChatServer:
    def __init__(self, ip='127.0.0.1',port=9999,HanderClass=ChatHandler):
        self.addr = ip, port
        self.server = socketserver.ThreadingTCPServer(self.addr, HanderClass)
        self.stop_event = threading.Event() 
    def start(self):
        threading.Thread(target=self.server.serve_forever, name='forever').start()

    def stop(self):
        if self.server:
            self.server.server_close()
            self.server.shutdown()
        self.stop_event.set()


if __name__ == '__main__':
    cs = ChatServer()
    cs.start()
    while True:
        cmd = input(termcolor.colored('>>', 'green')).strip()
        if cmd == 'quit':
            cs.stop()
            break
    print(threading.enumerate())    

评论