From 467c1717f664a8c91b2795c5e9dc564a73ace552 Mon Sep 17 00:00:00 2001 From: hanJoker Date: Sun, 18 Jun 2023 15:34:37 +0800 Subject: [PATCH] new --- chainmaker.sh | 10 ++ netpoller.go | 230 ++++++++++++++++++++++++++++++++++++++ netpoller_test.go | 23 ++++ tcp_client.go | 138 ++++++++++++++++++++++- tcp_pool.go | 31 +++++- tcp_server.go | 207 ++++++++++++++++++++++++++++++++++ tcp_sticky_encoder.go | 104 +++++++++++++++++ tcp_test.go | 31 ++++++ udp_client.go | 254 ++++++++++++++++++++++++++++++++++++++++++ udp_server.go | 229 +++++++++++++++++++++++++++++++++++++ udp_test.go | 54 +++++++++ 11 files changed, 1306 insertions(+), 5 deletions(-) create mode 100644 chainmaker.sh create mode 100644 netpoller.go create mode 100644 netpoller_test.go create mode 100644 tcp_sticky_encoder.go create mode 100644 udp_client.go create mode 100644 udp_server.go create mode 100644 udp_test.go diff --git a/chainmaker.sh b/chainmaker.sh new file mode 100644 index 0000000..4bb572c --- /dev/null +++ b/chainmaker.sh @@ -0,0 +1,10 @@ +# consensusnodeid remove 删除共识节点 +$ ./cmc client chainconfig consensusnodeid remove \ +--sdk-conf-path=SDK配置文件地址 \ +--admin-crt-file-paths=crt文件路径 \ +--admin-key-file-paths=key文件路径 \ +--node-id=节点ID \ +--node-org-id=组织ID + +# 停止节点进程 +$ kill -15 <节点程序pid> diff --git a/netpoller.go b/netpoller.go new file mode 100644 index 0000000..93932c5 --- /dev/null +++ b/netpoller.go @@ -0,0 +1,230 @@ +package netProgram + +import ( + "log" + "net" + "sync" + "time" +) + +// 网络轮询器 + +// 网络IO(使用系统调用syscall的IO)的阻塞 +func BIONet() { + addr := "127.0.0.1:5678" + wg := sync.WaitGroup{} + + // 1模拟读,体会读的阻塞状态 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + conn, _ := net.Dial("tcp", addr) + defer conn.Close() + + buf := make([]byte, 1024) + // 注意:两次时间的间隔 + log.Println("start read.", time.Now().Format("03:04:05.000")) + n, _ := conn.Read(buf) + log.Println("content:", string(buf[:n]), time.Now().Format("03:04:05.000")) + }(&wg) + + // 2模拟写 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + l, _ := net.Listen("tcp", addr) + defer l.Close() + + for { + conn, _ := l.Accept() + go func(conn net.Conn) { + defer conn.Close() + log.Println("connected.") + + // 阻塞时长 + time.Sleep(3 * time.Second) + conn.Write([]byte("Blocking I/O")) + }(conn) + } + }(&wg) + + wg.Wait() +} + +// Channel(Go的自管理的IO)的阻塞 +func BIOChannel() { + // 0初始化数据 + wg := sync.WaitGroup{} + // IO channel + ch := make(chan struct{}) + + // 1模拟读,体会读的阻塞状态 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + log.Println("start read.", time.Now().Format("03:04:05.000")) + + content := <-ch // IO Read, Receive + + log.Println("content:", content, time.Now().Format("03:04:05.000")) + }(&wg) + + // 2模拟写 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + // 阻塞时长 + time.Sleep(3 * time.Second) + ch <- struct{}{} // Write, Send + }(&wg) + + wg.Wait() +} + +// 网络IO(使用系统调用syscall的IO)的非阻塞 +func NIONet() { + addr := "127.0.0.1:5678" + wg := sync.WaitGroup{} + + // 1模拟读,体会读的阻塞状态 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + conn, _ := net.Dial("tcp", addr) + defer conn.Close() + + buf := make([]byte, 1024) + // 注意:两次时间的间隔 + log.Println("start read.", time.Now().Format("03:04:05.000")) + // 设置截止时间 + conn.SetReadDeadline(time.Now().Add(400 * time.Millisecond)) + n, _ := conn.Read(buf) + log.Println("content:", string(buf[:n]), time.Now().Format("03:04:05.000")) + }(&wg) + + // 2模拟写 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + l, _ := net.Listen("tcp", addr) + defer l.Close() + + for { + conn, _ := l.Accept() + go func(conn net.Conn) { + defer conn.Close() + log.Println("connected.") + + // 阻塞时长 + time.Sleep(3 * time.Second) + conn.Write([]byte("Blocking I/O")) + }(conn) + } + }(&wg) + + wg.Wait() +} + +// Channel(Go的自管理的IO)的非阻塞 +func NIOChannel() { + // 0初始化数据 + wg := sync.WaitGroup{} + // IO channel + ch := make(chan struct{ id uint }) + + // 1模拟读,体会读的阻塞状态 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + log.Println("start read.", time.Now().Format("03:04:05.000")) + + content := struct{ id uint }{} + select { + case content = <-ch: // IO Read, Receive + default: + } + + log.Println("content:", content, time.Now().Format("03:04:05.000")) + }(&wg) + + // 2模拟写 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + // 阻塞时长 + time.Sleep(3 * time.Second) + ch <- struct{ id uint }{42} // Write, Send + }(&wg) + + wg.Wait() +} + +// 网络IO(使用系统调用syscall的IO)的非阻塞 +func NIONetChannel() { + addr := "127.0.0.1:5678" + wg := sync.WaitGroup{} + + // 1模拟读,体会读的阻塞状态 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + conn, _ := net.Dial("tcp", addr) + defer conn.Close() + + // 注意:两次时间的间隔 + log.Println("start read.", time.Now().Format("03:04:05.000")) + + // 独立的goroutine,完成Read操作,将结果Send到channel中 + wgwg := sync.WaitGroup{} + chRead := make(chan []byte) + wgwg.Add(1) + go func() { + defer wgwg.Done() + buf := make([]byte, 1024) + n, _ := conn.Read(buf) + chRead <- buf[:n] + }() + + //time.Sleep(100 * time.Millisecond) + + // 是select+default实现非阻塞操作 + var data []byte + select { + case data = <-chRead: + default: + } + + log.Println("content:", string(data), time.Now().Format("03:04:05.000")) + wgwg.Wait() + }(&wg) + + // 2模拟写 + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + l, _ := net.Listen("tcp", addr) + defer l.Close() + + for { + conn, _ := l.Accept() + go func(conn net.Conn) { + defer conn.Close() + log.Println("connected.") + + // 阻塞时长 + time.Sleep(3 * time.Second) + conn.Write([]byte("Blocking I/O")) + }(conn) + } + }(&wg) + + wg.Wait() +} diff --git a/netpoller_test.go b/netpoller_test.go new file mode 100644 index 0000000..7edd575 --- /dev/null +++ b/netpoller_test.go @@ -0,0 +1,23 @@ +package netProgram + +import "testing" + +func TestBIONet(t *testing.T) { + BIONet() +} + +func TestBIOChannel(t *testing.T) { + BIOChannel() +} + +func TestNIONet(t *testing.T) { + NIONet() +} + +func TestNIOChannel(t *testing.T) { + NIOChannel() +} + +func TestNIONetChannel(t *testing.T) { + NIONetChannel() +} diff --git a/tcp_client.go b/tcp_client.go index f350722..5d8f545 100644 --- a/tcp_client.go +++ b/tcp_client.go @@ -8,6 +8,7 @@ import ( "log" "math/rand" "net" + "os" "sync" "time" ) @@ -428,7 +429,7 @@ func TcpClientPool() { if err != nil { log.Fatalln(err) } - log.Println(pool, len(pool.idleList)) + log.Println(pool, pool.Len()) wg := sync.WaitGroup{} clientNum := 50 @@ -450,4 +451,139 @@ func TcpClientPool() { }(&wg) } wg.Wait() + + // 释放连接池 + pool.Release() + log.Println(pool, pool.idleList, pool.Len()) +} + +// 粘包 +func TcpClientSticky() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + // 从服务端接收数据,SerRead + buf := make([]byte, 1024) + for { + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + break + } + log.Println("received data:", string(buf[:rn])) + } +} + +// 粘包,编解码器解决 +func TcpClientCoder() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + // 从服务端接收数据,SerRead + // 创建解码器 + decoder := NewDecoder(conn) + data := "" + i := 0 + for { + // 错误 io.EOF 时,表示连接被给关闭 + if err := decoder.Decode(&data); err != nil { + log.Println(err) + break + } + + log.Println(i, "received data:", data) + i++ + } +} + +func TcpClientSpecial() { + // 1建立连接 + // raddr remote addr,服务端的地址 + raddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:5678") + if err != nil { + log.Fatalln(err) + } + // laddr, local addr, 客户端的地址,可以用于设置客户端的端口 + tcpConn, err := net.DialTCP("tcp", nil, raddr) + if err != nil { + log.Fatalln(err) + } + // 保证关闭 + defer tcpConn.Close() + log.Printf("connection is establish, client addr is %s\n", tcpConn.LocalAddr()) + + // 2读写数据 + buf := make([]byte, 1024) + for { + n, err := tcpConn.Read(buf) + if err != nil { + log.Println(err) + return + } + log.Println("receive len:", n) + log.Println("receive data:", string(buf[:n])) + } +} + +func TcpFileClient() { + // 1建立连接 + tcpConn, err := net.Dial("tcp", "192.168.50.131:5678") + if err != nil { + log.Fatalln(err) + } + // 保证关闭 + defer tcpConn.Close() + log.Printf("connection is establish, client addr is %s\n", tcpConn.LocalAddr()) + + // 1.获取文件信息 + filename := "./data/Beyond.mp3" + // 打开文件 + file, err := os.Open(filename) + if err != nil { + log.Fatalln(err) + } + // 关闭文件 + defer file.Close() + + buf := make([]byte, 1024) + for { + // 读取文件内容 + rn, err := file.Read(buf) + if err != nil { + // io.EOF 错误表示文件读取完毕 + if err == io.EOF { + break + } + log.Fatalln(err) + } + + // 发送到服务端 + if _, err := tcpConn.Write(buf[:rn]); err != nil { + log.Fatalln(err) + } + } + + log.Println("file send complete.") } diff --git a/tcp_pool.go b/tcp_pool.go index 88fb2ba..ef08f32 100644 --- a/tcp_pool.go +++ b/tcp_pool.go @@ -273,10 +273,33 @@ func (pool *TcpPool) Put(conn net.Conn) error { return nil } } -func (*TcpPool) Release() error { - log.Println("release all connections") + +// 释放连接池 +func (pool *TcpPool) Release() error { + // 1并发安全锁 + pool.mu.Lock() + defer pool.mu.Unlock() + + // 2确定连接池是否被释放 + if pool.idleList == nil { + return nil + } + + // 3关闭IdleList + close(pool.idleList) + + // 4释放全部空闲连接 + // 继续接收已关闭channel中的元素 + for idleConn := range pool.idleList { + // 关闭连接 + _ = pool.config.Factory.Close(idleConn.conn) + } + return nil } -func (*TcpPool) Len() int { - return 0 + +// 获取连接池长度 +// 当前的可用连接数 +func (pool *TcpPool) Len() int { + return len(pool.idleList) } diff --git a/tcp_server.go b/tcp_server.go index b100bce..5dbee2a 100644 --- a/tcp_server.go +++ b/tcp_server.go @@ -10,6 +10,7 @@ import ( "log" "math/rand" "net" + "os" "sync" "time" ) @@ -596,3 +597,209 @@ func HandleConnPool(conn net.Conn) { select {} } + +// 粘包 +func TcpServerSticky() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnSticky(conn) + } +} + +func HandleConnSticky(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer func() { + conn.Close() + log.Println("connection be closed") + }() + + // 连续发送数据 + data := "package data." + for i := 0; i < 50; i++ { + _, err := conn.Write([]byte(data)) + if err != nil { + log.Println(err) + } + } +} + +// 粘包编解码器,header+body +func TcpServerCoder() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnCoder(conn) + } +} + +func HandleConnCoder(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer func() { + conn.Close() + log.Println("connection be closed") + }() + + // 断言为TCPConn即可 + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + log.Println("non tcp connection") + } + tcpConn.SetNoDelay(true) + + // 连续发送数据 + data := []string{ + "package data.", + "package.", + "package data data", + "pack", + } + encoder := NewEncoder(conn) + for i := 0; i < 50; i++ { + // 创建编解码器 + // 利用编码器进行编码 + // encode 成功后,会写入到conn,已经完成了conn.Write() + if err := encoder.Encode(data[rand.Intn(len(data))]); err != nil { + log.Println(err) + } + } +} + +// TCP特定方法 +func TcpServerSpecial() { + // 1建立监听 + // 获取本地地址(监听地址) + laddr, err := net.ResolveTCPAddr("tcp", ":5678") + if err != nil { + log.Fatalln(err) + } + tcpListener, err := net.ListenTCP("tcp", laddr) + if err != nil { + log.Fatalln(err) + } + defer tcpListener.Close() + log.Printf("%s server is listening on %s\n", tcp, tcpListener.Addr()) + + // 2接收连接 + for { + tcpConn, err := tcpListener.AcceptTCP() + if err != nil { + log.Println(err) + continue + } + + // 3处理每个连接 + go handleConnSpecial(tcpConn) + } +} +func handleConnSpecial(tcpConn *net.TCPConn) { + log.Printf("accept from %s\n", tcpConn.RemoteAddr()) + + // 设置连接属性 + tcpConn.SetKeepAlive(true) + + // 写数据 + data := "tcp message." + n, err := tcpConn.Write([]byte(data)) + if err != nil { + log.Println(err) + return + } + log.Println("Send len:", n) +} + +// UDP文件传输 +func TCPFileServer() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + log.Printf("accept from %s\n", conn.RemoteAddr()) + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go func(conn net.Conn) { + defer conn.Close() + + file, err := os.Create("./Beyond.mp3") + if err != nil { + log.Fatalln(err) + } + defer file.Close() + + buf := make([]byte, 4*1024) + for { + // 一次读取 + rn, err := conn.Read(buf) + if err != nil { + if err == io.EOF { + break + } + log.Fatalln(err) + } + + // 写入文件 + if _, err := file.Write(buf[:rn]); err != nil { + log.Fatalln(err) + } + } + + log.Println("file receive complete") + + }(conn) + } +} diff --git a/tcp_sticky_encoder.go b/tcp_sticky_encoder.go new file mode 100644 index 0000000..2953afd --- /dev/null +++ b/tcp_sticky_encoder.go @@ -0,0 +1,104 @@ +package netProgram + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "log" +) + +// 定义编码器(发送端) +type Encoder struct { + // 编码结束后,写入的目标 + w io.Writer +} + +// 创建编码器函数 +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{ + w: w, + } +} + +// 编码,将编码的结果,写入到w io.Writer +// binary(int32(13))[]byte("package data.") +func (enc *Encoder) Encode(message string) error { + // 1获取message的长度 + l := int32(len(message)) + + // 构建一个数据包缓存 + buf := new(bytes.Buffer) + + // 2在数据包中写入长度 + // 需要二进制的写入操作,需要将数据以bit的形式写入 + if err := binary.Write(buf, binary.LittleEndian, l); err != nil { + return err + } + + // 3将数据主体Body写入 + //if err := binary.Write(buf, binary.LittleEndian, []byte(message)); err != nil { + // return err + //} + if _, err := buf.Write([]byte(message)); err != nil { + return err + } + + // 4利用io.Writer发送数据 + if n, err := enc.w.Write(buf.Bytes()); err != nil { + log.Println(n, err) + return err + } + + return nil +} + +// 定义解码器(接收端) +// 解码器 +type Decoder struct { + // Reader + r io.Reader +} + +// 创建Decoder +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{ + r: r, + } +} + +// 从Reader中读取内容,解码 +// binary(int32(13))[]byte("package data.") +func (dec *Decoder) Decode(message *string) error { + // 1读取前4个字节,读取header + header := make([]byte, 4) + hn, err := dec.r.Read(header) + if err != nil { + return err + } + if hn != 4 { + return errors.New("header is not enough") + } + + // 2将前4个字节转换为int32类型,确定了body的长度 + var l int32 + headerBuf := bytes.NewBuffer(header) + if err := binary.Read(headerBuf, binary.LittleEndian, &l); err != nil { + return err + } + + // 3读取body + body := make([]byte, l) + bn, err := dec.r.Read(body) + if err != nil { + return err + } + if bn != int(l) { + return errors.New("body is not enough") + } + + // 4设置message + *message = string(body) + + return nil +} diff --git a/tcp_test.go b/tcp_test.go index 165a930..507094c 100644 --- a/tcp_test.go +++ b/tcp_test.go @@ -77,3 +77,34 @@ func TestTcpServerPool(t *testing.T) { func TestTcpClientPool(t *testing.T) { TcpClientPool() } + +// 粘包测试 +func TestTcpServerSticky(t *testing.T) { + TcpServerSticky() +} +func TestTcpClientSticky(t *testing.T) { + TcpClientSticky() +} + +// 粘包编解码器解决测试 +func TestTcpServerCoder(t *testing.T) { + TcpServerCoder() +} +func TestTcpClientCoder(t *testing.T) { + TcpClientCoder() +} + +// 测试TCP专用方法 +func TestTcpServerSpecial(t *testing.T) { + TcpServerSpecial() +} +func TestTcpClientSpecial(t *testing.T) { + TcpClientSpecial() +} + +func TestTCPFileServer(t *testing.T) { + TCPFileServer() +} +func TestTCPFileClient(t *testing.T) { + TcpFileClient() +} diff --git a/udp_client.go b/udp_client.go new file mode 100644 index 0000000..faa7ae1 --- /dev/null +++ b/udp_client.go @@ -0,0 +1,254 @@ +package netProgram + +import ( + "errors" + "fmt" + "io" + "log" + "net" + "os" + "time" +) + +func UDPClientBasic() { + // 1.建立连接 + raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.DialUDP("udp", nil, raddr) + if err != nil { + log.Fatalln(err) + } + log.Println(udpConn) + + // 2.写 + data := []byte("Go UDP program") + wn, err := udpConn.Write(data) // WriteToUDP(data, raddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) +} + +func UDPClientConnect() { + // 1.建立连接 + raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.DialUDP("udp", nil, raddr) + if err != nil { + log.Fatalln(err) + } + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) + + // 2.写 + data := []byte("Go UDP program") + wn, err := udpConn.Write(data) // WriteToUDP(data, raddr) + //wn, err := udpConn.WriteToUDP(data, raddr) // WriteToUDP(data, raddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) +} + +func UDPClientPeer() { + // 1.解析地址 + laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6789") + if err != nil { + log.Fatalln(err) + } + // 2.监听 + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + defer udpConn.Close() + + // 远程地址 + raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") + if err != nil { + log.Fatalln(err) + } + + // 2.写 + data := []byte("Go UDP program") + wn, err := udpConn.WriteToUDP(data, raddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) +} + +// 多播的发送端 +func UDPSenderMulticast() { + // 1.建立UDP多播组连接 + address := "224.1.1.2:6789" + raddr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.DialUDP("udp", nil, raddr) + if err != nil { + log.Fatalln(err) + } + defer udpConn.Close() + + // 2.发送内容 + // 循环发送 + for { + data := fmt.Sprintf("[%s]: %s", time.Now().Format("03:04:05.000"), "hello!") + wn, err := udpConn.Write([]byte(data)) + if err != nil { + log.Println(err) + } + log.Printf("send \"%s\"(%d) to %s\n", data, wn, raddr.String()) + + time.Sleep(time.Second) + } +} + +// 广播发送端 +func UDPSenderBroadcast() { + // 1.监听地址 + // 2.建立连接 + laddr, err := net.ResolveUDPAddr("udp", ":9876") + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + defer udpConn.Close() + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + + // 3.发送数据 + // 广播地址 + rAddress := "192.168.50.255:6789" + raddr, err := net.ResolveUDPAddr("udp", rAddress) + if err != nil { + log.Fatalln(err) + } + for { + data := fmt.Sprintf("[%s]: %s", time.Now().Format("03:04:05.000"), "hello!") + // 广播发送 + wn, err := udpConn.WriteToUDP([]byte(data), raddr) + if err != nil { + log.Println(err) + } + log.Printf("send \"%s\"(%d) to %s\n", data, wn, raddr.String()) + + time.Sleep(time.Second) + } +} + +// 文件传输(上传) +func UDPFileClient() { + // 1.获取文件信息 + filename := "./data/Beyond.mp3" + // 打开文件 + file, err := os.Open(filename) + if err != nil { + log.Fatalln(err) + } + // 关闭文件 + defer file.Close() + // 获取文件信息 + fileinfo, err := file.Stat() + if err != nil { + log.Fatalln(err) + } + //fileinfo.Size(), fileinfo.Name() + log.Println("send file size:", fileinfo.Size()) + + // 2.连接服务器 + raddress := "192.168.50.131:5678" + raddr, err := net.ResolveUDPAddr("udp", raddress) + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.DialUDP("udp", nil, raddr) + if err != nil { + log.Fatalln(err) + } + defer udpConn.Close() + + // 3.发送文件名 + if _, err := udpConn.Write([]byte(fileinfo.Name())); err != nil { + log.Fatalln(err) + } + + // 4.服务端确认 + buf := make([]byte, 4*1024) + rn, err := udpConn.Read(buf) + if err != nil { + log.Fatalln(err) + } + // 判断是否为文件名正确接收响应 + if "filename ok" != string(buf[:rn]) { + log.Fatalln(errors.New("server not ready")) + } + + // 5.发送文件内容 + // 读取文件内容,利用连接发送到服务端 + // file.Read() + i := 0 + for { + // 读取文件内容 + rn, err := file.Read(buf) + if err != nil { + // io.EOF 错误表示文件读取完毕 + if err == io.EOF { + break + } + log.Fatalln(err) + } + + // 发送到服务端 + if _, err := udpConn.Write(buf[:rn]); err != nil { + log.Fatalln(err) + } + i++ + } + log.Println(i) + // 文件发送完成。 + log.Println("file send complete.") + + // 等待的测试 + time.Sleep(2 * time.Second) +} diff --git a/udp_server.go b/udp_server.go new file mode 100644 index 0000000..cc5fe7f --- /dev/null +++ b/udp_server.go @@ -0,0 +1,229 @@ +package netProgram + +import ( + "log" + "net" + "os" +) + +func UDPServerBasic() { + // 1.解析地址 + laddr, err := net.ResolveUDPAddr("udp", ":9876") + if err != nil { + log.Fatalln(err) + } + + // 2.监听 + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + defer udpConn.Close() + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) + + // 4.写 + data := []byte("received:" + string(buf[:rn])) + wn, err := udpConn.WriteToUDP(data, raddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) +} + +func UDPServerConnect() { + // 1.解析地址 + laddr, err := net.ResolveUDPAddr("udp", ":9876") + if err != nil { + log.Fatalln(err) + } + + // 2.监听 + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + defer udpConn.Close() + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + //rn, err := udpConn.Read(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) + + // 4.写 + data := []byte("received:" + string(buf[:rn])) + wn, err := udpConn.WriteToUDP(data, raddr) + //wn, err := udpConn.Write(data) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) + + // 测试输出远程地址 + log.Println(udpConn.RemoteAddr()) +} + +func UDPServerPeer() { + // 1.解析地址 + laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") + if err != nil { + log.Fatalln(err) + } + + // 2.监听 + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + defer udpConn.Close() + + // 远程地址 + raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6789") + if err != nil { + log.Fatalln(err) + } + + // 3.读 + buf := make([]byte, 1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + log.Printf("received %s from %s\n", string(buf[:rn]), raddr.String()) + + // 4.写 + data := []byte("received:" + string(buf[:rn])) + wn, err := udpConn.WriteToUDP(data, raddr) + if err != nil { + log.Fatalln(err) + } + log.Printf("send %s(%d) to %s\n", string(data), wn, raddr.String()) + +} + +// 多播接收端 +func UDPReceiverMulticast() { + // 1.多播监听地址 + address := "224.1.1.2:6789" + gaddr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + log.Fatalln(err) + } + + // 2.多播监听 + udpConn, err := net.ListenMulticastUDP("udp", nil, gaddr) + if err != nil { + log.Fatalln(err) + } + + // 3.接受数据 + // 循环接收 + buf := make([]byte, 1024) + for { + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Println(err) + } + log.Printf("received \"%s\" from %s\n", string(buf[:rn]), raddr.String()) + } + +} + +// 广播接收端 +func UDPReceiverBroadcast() { + // 1.广播监听地址 + laddr, err := net.ResolveUDPAddr("udp", ":6789") + if err != nil { + log.Fatalln(err) + } + + // 2.广播监听 + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + defer udpConn.Close() + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + + // 3.接收数据 + // 4.处理数据 + buf := make([]byte, 1024) + for { + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Println(err) + } + log.Printf("received \"%s\" from %s\n", string(buf[:rn]), raddr.String()) + } +} + +// UDP文件传输 +func UDPFileServer() { + // 1.建立UDP连接 + laddress := ":5678" + laddr, err := net.ResolveUDPAddr("udp", laddress) + if err != nil { + log.Fatalln(err) + } + udpConn, err := net.ListenUDP("udp", laddr) + if err != nil { + log.Fatalln(err) + } + defer udpConn.Close() + log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String()) + + // 2.接收文件名,并确认 + buf := make([]byte, 4*1024) + rn, raddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + filename := string(buf[:rn]) + if _, err := udpConn.WriteToUDP([]byte("filename ok"), raddr); err != nil { + log.Fatalln(err) + } + + // 3.接收文件内容,并写入文件 + // 打开文件(创建) + file, err := os.Create(filename) + if err != nil { + log.Fatalln(err) + } + defer file.Close() + + // 网络读取 + i := 0 + for { + // 一次读取 + rn, _, err := udpConn.ReadFromUDP(buf) + if err != nil { + log.Fatalln(err) + } + + // 写入文件 + if _, err := file.Write(buf[:rn]); err != nil { + log.Fatalln(err) + } + i++ + log.Println("file write some content", i) + } +} diff --git a/udp_test.go b/udp_test.go new file mode 100644 index 0000000..5dfc7de --- /dev/null +++ b/udp_test.go @@ -0,0 +1,54 @@ +package netProgram + +import "testing" + +func TestUDPServerBasic(t *testing.T) { + UDPServerBasic() +} + +func TestUDPClientBasic(t *testing.T) { + UDPClientBasic() +} + +// 连接状态测试 +func TestUDPServerConnect(t *testing.T) { + UDPServerConnect() +} + +func TestUDPClientConnect(t *testing.T) { + UDPClientConnect() +} + +// 对等终端测试 +func TestUDPServerPeer(t *testing.T) { + UDPServerPeer() +} + +func TestUDPClientPeer(t *testing.T) { + UDPClientPeer() +} + +// 多播测试 +func TestUDPReceiverMulticast(t *testing.T) { + UDPReceiverMulticast() +} +func TestUDPSenderMulticast(t *testing.T) { + UDPSenderMulticast() +} + +// 广播测试 +func TestUDPReceiverBroadcast(t *testing.T) { + UDPReceiverBroadcast() +} +func TestUDPSenderBroadcast(t *testing.T) { + UDPSenderBroadcast() +} + +// 文件传输测试 +func TestUDPFileClient(t *testing.T) { + UDPFileClient() +} + +func TestUDPFileServer(t *testing.T) { + UDPFileServer() +}