打造先进的内存KV数据库-5 TCP侦听

news/2024/5/17 15:14:06 标签: tcp, 数据库, cgo, golang

tcp侦听">TCP侦听

作为支持集群的数据库,必定要与多个客户端交互信息,不可能让数据库与所有客户共享地址空间(虽然这样性能好),所以需要使用TCP协议进行交互数据,(UDP协议不可靠。。。弃用),C语言的TCP库其实还好,但是对于高并发和并行的处理不如Go,而且并发锁机制比较难写,所以使用Go写了服务器和客户端调用C的库,目前版本没有什么身份验证,之后会加上。

代码实现

//server.go
package main
// #cgo LDFLAGS: -L ./lib -lmonkeyS
// #include "./lib/core.h"
// #include <stdlib.h>
import "C"
import (
    "unsafe"
    _"fmt"
    "net"
    "strings"
)

func main() {
    str := []byte("monkey")
    str = append(str,0)
    C.CreateDB((*C.char)(unsafe.Pointer(&str[0])))  //创建基础数据库
    servicePort := ":1517"
    tcpAddr,err := net.ResolveTCPAddr("tcp4",servicePort)
    if err != nil {
        panic(err)
    }
    l,err := net.ListenTCP("tcp",tcpAddr)   //侦听TCP
    if err != nil {
        panic(err)
    }
    for{
        conn,err := l.Accept()
        if err != nil {
            panic(err)
        }
        go Handler(conn)
    }
}

func Handler(conn net.Conn) {

    str := []byte("monkey")                         //环境变量-当前数据库
    db := C.SwitchDB((*C.char)(unsafe.Pointer(&str[0])))
    for {               
        buff := []byte{}
        buf := make([]byte,1024)
        length,err := conn.Read(buf)
        total := uint32(0); //前4个字节保存消息长度
        for i := 0;i < 4;i++ {
            total <<= 8;
            total += uint32(buf[i]);
        }
        //fmt.Println("Message length:",total)
        buff = append(buff,buf[4:]...)
        total -= uint32(length)
        for total > 0 {
            length,err = conn.Read(buf)
            total -= uint32(length)
            buff = append(buff,buf...)
        }
        if err != nil {
            conn.Close()
            break
        }
        TranslateMessage(conn,&db,buff)                     //解析消息
    }

}

func TranslateMessage(conn net.Conn,db **C.Database,message []byte) {
    command := string(message)
    params := strings.Split(command," ")
    //fmt.Println(params)
    response := []byte{}
    if params[0] == "set" {
        r := C.Set(&(*db).tIndex,(*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))),(unsafe.Pointer(&([]byte(params[2]))[0])))
        for i := 0;;i++ {
            response = append(response,byte(r.msg[i]))
            if response[i] == 0 { break; }
        }

    }else if params[0] == "get" {
        r := C.Get(&(*db).tIndex,(*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))))
        // for i := 0;;i++ {
        //  response = append(response,byte(r.msg[i]))
        //  if response[i] == 0 { break; }
        // }
        if(int(r.code) == 0) {
            for i := 0;;i++ {
                response = append(response,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i))))))
                if response[i] == 0 { break; }
            }
        }else {
            // for i := 0;;i++ {
            // response = append(response,byte(r.msg[i]))
            // if response[i] == 0 { break; }
            // }
        }

    }else if params[0] == "delete" || params[0] == "remove" {
        r := C.Delete(&(*db).tIndex,(*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))))
        for i := 0;;i++ {
            response = append(response,byte(r.msg[i]))
            if response[i] == 0 { break; }
        }

    }else if params[0] == "createdb" {
        d := C.CreateDB((*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))))
        if d != nil {
            *db = d
            response = []byte("Already exist,switched\n")
        }else {
            response = []byte("Created\n")
        }
    }else if params[0] == "switchdb" {
        d := C.SwitchDB((*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))))
        if d != nil {
            *db = d
            response = []byte("ok\n")
        }else {
            response = []byte("fail\n")
        }
    }else if params[0] == "dropdb" {
        *db = C.DropDB((*C.char)(unsafe.Pointer((&([]byte(params[1]))[0]))))
    }else if strings.EqualFold("listdb",params[0]) {
        r := C.ListDB()
        for i := 0;i < 1024;i++ {
            b := byte(*(*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(r))+uintptr(i))))
            response = append(response,b)
            if(b == 0){ break; }
        }
        C.free(unsafe.Pointer(r))
    }else {
        //fmt.Println("unkown command:",params[0])
    }
    total := len(response) + 4
    header := make([]byte,4)
    i := 0
    for total > 0 {
        header[3-i] = byte(total % 256)
        total /= 256
        i++
    }
    response = append(header,response...)
    conn.Write(response)
}
//Client.go
package main
import "net"
import "fmt"
func main() {
    tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:1517")  
    if err != nil {
        panic(err)
    }
    conn, err := net.DialTCP("tcp", nil, tcpAddr)  
    if err != nil {
        panic(err)
    }

    for {
        buf1 := ""
        buf2 := ""
        buf3 := ""
        buf := ""
        fmt.Print("monkey>")
        fmt.Scanf("%s",&buf1)
        if buf1 == "set" {
            fmt.Scanf("%s",&buf2)
            fmt.Scanf("%s",&buf3)
            buf = buf1 + " " + buf2 + " " + buf3
        }else if buf1 == "get"{
            fmt.Scanf("%s",&buf2)
            buf = buf1 + " " + buf2
        }else if buf1 == "remove" || buf1 == "delete" {
            fmt.Scanf("%s",&buf2)
            buf = buf1 + " " + buf2
        }else if buf1 == "createdb"{
            fmt.Scanf("%s",&buf2)
            buf = buf1 + " " + buf2
        }else if buf1 == "switchdb"{
            fmt.Scanf("%s",&buf2)
            buf = buf1 + " " + buf2
        }else if buf1 == "dropdb"{
            fmt.Scanf("%s",&buf2)
            buf = buf1 + " " + buf2
        }else if buf1 == "listdb"{
            buf = buf1 + " "
        }else if buf1 == "exit"{
            fmt.Println("Bye!")
            break;
        }
        total := uint32(0)
        total = uint32(len(buf) + 4)
        header := make([]byte,4)
        i := 0
        for total > 0 {
            header[3-i] = byte(total % 256)
            total /= 256
            i++
        } 
        conn.Write(append(header,([]byte(buf))...))

        buff := []byte{}
        buff2 := make([]byte,1024)
        length,_ := conn.Read(buff2)
        total = uint32(0);  //前4个字节保存消息长度
        for i := 0;i < 4;i++ {
            total <<= 8;
            total += uint32(buff2[i]);
        }
        buff = append(buff,buff2[4:]...)
        total -= uint32(length)
        for total > 0 {
            length,_ = conn.Read(buff2)
            total -= uint32(length)
            buff = append(buff,buff2...)
        }
        for i := 0;i < 1024;i++ {
            if buff[i] == 0 { break; }
            fmt.Printf("%c",buff[i])
        }
        fmt.Print("\n")
    }
}

修正:上述代码存在严重问题:
发送1K以上数据会无法正确接收
改进代码如下:

//tcp.go
package tcp
import "net"
import "fmt"

func ok(bytes []byte) bool {
    return bytes[0] == 111 && bytes[1] == 107 && bytes[2] == 0;
}

func bytes4uint(bytes []byte) uint32 {
    total := uint32(0); 
    for i := 0;i < 4;i++ {
        total <<= 8;
        total += uint32(bytes[i]);
    }
    return total
}

func uint32bytes(n uint32) []byte {
    header := make([]byte,4)
    i := 0
    for n > 0 {
        header[3-i] = byte(n % 256)
        n /= 256
        i++
    }
    return header
}


type TCPSession struct {
    Conn *net.TCPConn
    ToSend chan interface{} //要发送的数据
    Received chan interface{}   //接受到的数据
    Closed bool //是否已经关闭
}

func (s *TCPSession) Init() {
    s.ToSend = make(chan interface{})
    s.Received = make(chan interface{})
    go s.Send()
    go s.Recv()
}

func (s *TCPSession) Send() {
    for {
        if s.Closed {
            return
        }
        buf0 := <- s.ToSend //取出要发送的数据
        buf := buf0.([]byte)

        _,err := s.Conn.Write(buf)  //发送掉   
        //fmt.Println("send,",buf)
        if err != nil {
            s.Closed = true
            return
        }
    }

}

func (s *TCPSession) Recv() {
    for {
        if s.Closed {
            return
        }
        buf := make([]byte,1024)
        _,err := s.Conn.Read(buf)
        if err != nil {
            s.Closed = true
            return
        }
        s.Received <- buf
        //fmt.Println("read,",buf)
        }

}

func (s *TCPSession) SendMessage(bytes []byte) {
    total := len(bytes) / 1024
    if len(bytes) % 1024 != 0 {
        total++
    }
    header := uint32bytes(uint32(total))    //计算条数
    s.ToSend <- header
    //fmt.Println(header)
    for i := 0;i < total-1;i++ {
        buf := bytes[0:1024]    //发送这一段
        bytes = bytes[1024:]
        s.ToSend <- buf
        continue
    }
    //发送最后一段
    if total == 0 {
        return
    }
    buf := bytes[0:]    //发送这一段
    s.ToSend <- buf
}

func (s *TCPSession) ReadMessage() []byte {
    buf0 := <- s.Received
    buf := buf0.([]byte)
    //fmt.Println(buf)
    total := bytes4uint(buf)
    var buff []byte
    if buf[4] != 0 {    //两份报表被合并
        buff = buf[4:]
        total--
    } else {
        buff = []byte{}     
    }

    for i := uint32(0);i < total;i++ {
        buf0 := <- s.Received
        buf := buf0.([]byte)
        buff = append(buff,buf...)
    }
    return buff
}

http://www.niftyadmin.cn/n/1072277.html

相关文章

monkey-api-encrypt 1.1.2版本发布啦

时隔10多天&#xff0c;monkey-api-encrypt发布了第二个版本&#xff0c;还是要感谢一些正在使用的朋友们&#xff0c;提出了一些问题。 GitHub主页&#xff1a;https://github.com/yinjihuan/monkey-api-encrypt 本次更新内容如下&#xff1a; 支持Spring Boot配置支持注解开启…

Web前端开发工程师必读的15个设计博客

导读&#xff1a;Web设计是一个不断变化的领域&#xff0c;因此掌握最新的发展趋势及技术动向对设计师来说非常重要&#xff0c;无论是学习新技术&#xff0c;还是寻找免费资源与工具&#xff0c;设计博客都是很不错的去处。本文向大家推荐15个非常不错的设计博客。 1. Smashin…

打造先进的内存KV数据库-6 PHP支持

PHP php作为使用极广的程序设计语言&#xff0c;monkey数据库对php的支持是必须的&#xff5e; 代码实现 //test.php <?php class MonkeyDB {private $socket;private function read(){$data "";$total 0;$t fread($this->socket,1024);for($i 0;$i &l…

一条SQL统计所有表的行数

SELECT o.name AS "Table Name", i.rowcnt AS "Row Count"FROM sysobjects o, sysindexes iWHERE i.id o.idAND i.indid IN(0,1)AND o.xtype u --只统计用户表AND o.name <> sysdiagramsORDER BY i.rowcnt DESC --按行排降序COMPUTE SUM(i.rowcnt)…

打造先进的内存KV数据库-7 反射以及并发锁

反射 反射作为一种代码组织形式&#xff0c;带来了极大的不安全因素&#xff0c;同时也带来了许多便利之处&#xff0c;通过方法、对象、类型名称来获得具体实例&#xff0c;可以避免大量if-else分支&#xff0c;使得代码优雅&#xff0c;monkeyDB的服务端代码最后采用反射组织…

Sapphire算法:GC Without Stop the World(上)

Go的GC一致为人诟病&#xff0c;然而Go1.5据说大大优化了GC&#xff0c;具体可以见这篇文章http://www.oschina.net/translate/go-gc-solving-the-latency-problem-in-go-1-5 于是我打开了Go源代码&#xff0c;查看了Go GC相关代码&#xff0c;注释中说&#xff0c;Go现在使用…

星际争霸中的建筑学 生产建筑的研究

生产建筑主要包括虫&#xff1a;基地人&#xff1a;基地&#xff0c;兵营&#xff0c;重工&#xff0c;飞机场神&#xff1a;基地&#xff0c;兵营&#xff0c;轰击工厂&#xff08;VR&#xff09;&#xff0c;飞机场 其中除了虫族的基地比较特殊之外&#xff0c;其他所有生产建…

Windows Phone 7 创建自定义的控件

创建自定义的控件&#xff1a;需要从控件&#xff08;或 ContentControl&#xff09;派生&#xff0c;至少&#xff0c;为了继承基本的控件功能&#xff0c;该控件类应从 Silverlight System.Windows.Controls.Control 类派生。但是&#xff0c;它也可以从 ContentControl 和 …