分布式购物车
这个购物车的原宗旨是帮学姐完成她的毕业设计,论文的需求以及需要的代码以及一个简单的实现(半成品)会待会在文章结尾放出。
论文上介绍了一种分布式的数据库(anna),这个数据库提供了两种操作,一个操作为add,另外一个操作为get。同时还有一个delete。这个delete实现方式为特殊的put,只不过put的是一个空值。
之所以是分布式的数据库,是因为,它们虽然共享同一个数据库,但是各个节点之间的数据是“不怎么”共享的。简单来说,就是当add的时候,我们只在本节点添加add,但是不修改其他节点的数据集。为了实现这样的功能,我们需要涉及到一个概念,就是我们所有的操作都是单调的(monotonic),简单来说,就是只能增加不能减少。为什么需要这样呢,我们因为基本不在各个数据节点之间同步,最后get的时候要求要求之前所有的操作(put和delete)都是和操作的顺序无关的。我们因此维持两个列表,(这里我们和anna有所不同)我们这里的实现方式为put和delete都维持一个列表就是把本节点所有的操作都记录进去,最后check_out的时候我的实现方式是将各个节点的添加和删除的所有结果求和后传送给查询的节点,然后查询节点求和后打印最终结果。
(好了,接下来我们要看具体的实现了,第一次写go语言,不是很习惯,不过没关系,反正就只有我自己看,不管了)
首先是网络通信模块,接收端放在check_out里面,发送端放在主函数里面,这里同时还需要使用多线程,不过go语言的多线程很简单,直接go后面接一个函数名就行了。
首先是发送端
func SconnHandler(c net.Conn, cart *shoping_cart) { //用来监听别的节点发过来的请求
if c == nil {
log.Panic("连接其他节点失败")
}
cart.Lock()//给我们的数据集加锁
defer cart.Unlock()//处理完成后自动解锁
var cart_message map[string]int = make(map[string]int)
for item_ := range cart.cart {
cart_message[item_] = total_number(cart.cart[item_])
}
message_to_send, err := json.Marshal(cart_message)
if err == nil {
c.Write(message_to_send)//发送的内容是byte数组,但是这儿我们就需要转换一下,虽然json是用来转换json格式的,但是我发现用来直接转换map也很方便
}
}
func (cart *shoping_cart) ServerListen() {
server, err := net.Listen("tcp", ":8086")
if err != nil {
fmt.Println("开启socket服务失败")
}
fmt.Println("正在开启Server")
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("连接出错")
}
//并发模式 接收来自客户端的连接请求,一个连接 建立一个 conn,服务器资源有可能耗尽 BIO模式
go SconnHandler(conn, cart)
}
}
go cart.ServerListen()//这个函数是写在主函数里面的
我们在建立需要发送的消息的同时还要注意下,最好加一个锁,就是不让发送的同时数据产生修改。
数据的接收端
func cConnHandler(c net.Conn) map[string]int {
message_receive := make([]byte, 4096)//进行数据接收的时候我们需要先定义好接收需要的byte的数组的大小,但是我对这个有疑问,万一数据量很大怎么办。
cnt, err := c.Read(message_receive)
tmp := message_receive[0:cnt]//这个地方需要注意下,因为是byte数组,要是不定义大小的话,最后超出部分未定义会导致最后解码成map失败。
if err != nil || cnt == 0 {
fmt.Println("接收消息失败")
}
//fmt.Println(tmp)
other_cart := make(map[string]int)
json.Unmarshal(tmp, &other_cart)
return other_cart
}
func ClientRead() map[string]int {
conn, err := net.Dial("tcp", "127.0.0.1:8087")//我们还没写自动添加节点的版本,现在这个版本就是只能两个节点,然后写死了
if err != nil {
fmt.Println("客户端建立连接失败")
}
return cConnHandler(conn)
}
//下面这个就是我们check_out的定义
func (m *shoping_cart) check_out() {
var other_cart = make(map[string]int)
var total_cart = make(map[string]int)
other_cart = ClientRead()
total_cart = merge(m.cart, other_cart)
fmt.Println("您购买的物品如下:")
for item_ := range total_cart {
fmt.Println("名称:", item_, " 数量:", total_cart[item_])
}
}
接下来我们分析下我们合并数据集的部分
func merge(local_cart map[string]item, other_cart map[string]int) map[string]int {
var tmp_cart = make(map[string]int)
for name := range local_cart {
cnt, err := other_cart[name]
if err {
tmp_cart[name] = total_number(local_cart[name]) + cnt
} else {
tmp_cart[name] = total_number(local_cart[name])
}
}
for name_ := range other_cart {
_, err := local_cart[name_]
if !err {
tmp_cart[name_] = other_cart[name_]
}
}
return tmp_cart
}
简单来说就是一个个比较,然后对数据集取并。
接下来是关于数据集的定义,以及初始化。
type item struct {
amount int
add_ *list.List
delete_ *list.List
}//我们设立了两个list,这两个list用来存放每一次添加以及删除的数量。
type shoping_cart struct {
cart map[string]item//每个物品有两个list,还有一个int,但是这个int没有使用过,也可以直接删除了。
sync.Mutex
}//在我们的购物车里面有一个锁,这个锁的作用就是在传送数据集的时候保证不会修改数据集
func (m *shoping_cart) init_item(name string) {
m.cart[name] = item{amount: 0, add_: list.New(), delete_: list.New()}
}
好了,我觉得没啥好说的了。
我就直接上完整代码以及论文了。
package main
import (
"container/list"
"encoding/json"
"fmt"
"log"
"net"
"sync"
)
type item struct {
amount int
add_ *list.List
delete_ *list.List
}
type shoping_cart struct {
cart map[string]item
sync.Mutex
}
func (m *shoping_cart) init_item(name string) {
m.cart[name] = item{amount: 0, add_: list.New(), delete_: list.New()}
}
func total_number(m item) int {
total := 0
for i := m.add_.Front(); i != nil; i = i.Next() {
tmp, _ := i.Value.(int)
total += tmp
}
for x := m.delete_.Front(); x != nil; x = x.Next() {
tmp, _ := x.Value.(int)
total -= tmp
}
return total
}
func (m *shoping_cart) put(name string, number int) {
_, exists := m.cart[name]
if !exists {
m.init_item(name)
m.cart[name].add_.PushBack(number)
} else {
m.cart[name].add_.PushBack(number)
}
fmt.Println("添加成功")
}
func (m *shoping_cart) remove(name string, number int) {
num, exists := m.cart[name]
if !exists {
fmt.Println("对不起,您要清空的物品不存在,请您确认后再次输入")
return
} else if total_number(num) <= 0 {
fmt.Println("对不起,物品的数量不足,请确认数量后再次输入")
return
}
m.cart[name].delete_.PushBack(number)
if total_number(m.cart[name]) <= 0 {
delete(m.cart, name)
}
fmt.Println("清除成功")
}
func SconnHandler(c net.Conn, cart *shoping_cart) { //用来监听别的节点发过来的请求
if c == nil {
log.Panic("连接其他节点失败")
}
cart.Lock()
defer cart.Unlock()
var cart_message map[string]int = make(map[string]int)
for item_ := range cart.cart {
cart_message[item_] = total_number(cart.cart[item_])
}
message_to_send, err := json.Marshal(cart_message)
if err == nil {
c.Write(message_to_send)
}
}
func (cart *shoping_cart) ServerListen() {
server, err := net.Listen("tcp", ":8087")
if err != nil {
fmt.Println("开启socket服务失败")
}
fmt.Println("正在开启Server")
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("连接出错")
}
//并发模式 接收来自客户端的连接请求,一个连接 建立一个 conn,服务器资源有可能耗尽 BIO模式
go SconnHandler(conn, cart)
}
}
func cConnHandler(c net.Conn) map[string]int {
message_receive := make([]byte, 4096)
cnt, err := c.Read(message_receive)
tmp := message_receive[0:cnt]
if err != nil || cnt == 0 {
fmt.Println("接收消息失败")
}
//fmt.Println(tmp)
other_cart := make(map[string]int)
json.Unmarshal(tmp, &other_cart)
return other_cart
}
func ClientRead() map[string]int {
conn, err := net.Dial("tcp", "127.0.0.1:8086")
if err != nil {
fmt.Println("客户端建立连接失败")
}
return cConnHandler(conn)
}
func merge(local_cart map[string]item, other_cart map[string]int) map[string]int {
var tmp_cart = make(map[string]int)
for name := range local_cart {
cnt, err := other_cart[name]
if err {
tmp_cart[name] = total_number(local_cart[name]) + cnt
} else {
tmp_cart[name] = total_number(local_cart[name])
}
}
for name_ := range other_cart {
_, err := local_cart[name_]
if !err {
tmp_cart[name_] = other_cart[name_]
}
}
return tmp_cart
}
func (m *shoping_cart) check_out() {
var other_cart = make(map[string]int)
var total_cart = make(map[string]int)
other_cart = ClientRead()
total_cart = merge(m.cart, other_cart)
fmt.Println("您购买的物品如下:")
for item_ := range total_cart {
fmt.Println("名称:", item_, " 数量:", total_cart[item_])
}
}
func main() {
var cart shoping_cart
cart.cart = make(map[string]item)
var choice string
go cart.ServerListen()
for {
fmt.Print("请输入您的选择: ")
fmt.Scanln(&choice)
switch choice {
case "remove":
var name string
var number int
fmt.Print("请输入您想移除的物品名称: ")
fmt.Scanln(&name)
fmt.Print("请输入数量: ")
fmt.Scanln(&number)
cart.remove(name, number)
case "put":
var name string
var number int
fmt.Print("请输入你想添加的物品名称: ")
fmt.Scanln(&name)
fmt.Print("请输入数量: ")
fmt.Scanln(&number)
cart.put(name, number)
case "check_out":
cart.check_out()
}
}
}
下面这个就是所有的论文了,主要关注介绍anna以及2019年和coordination的论文。
CALMœ‡πÿ