go 分布式系统

项目介绍

本次项目将使用原生go构建一个web分布式系统,实现服务发现,服务注册等核心功能,本项目将遵循当前分布式系统的最佳实践。

分布式系统有多个不同的服务组成,这些不同的服务运行在不同的进程之中,由service包进行统一管理,对于每个service中,有server.go以及client.go两个主要文件,顾名思义,server.go处理该service的后端逻辑,client.go则是提供给各个其他服务便捷调用本service的接口(在分布式系统中,其他service即使本service的client)

server.go

在该文件中主要存储service的业务逻辑以及http请求处理逻辑,在我们的的设计中,除了服务注册中心以为均称为普通服务,这些服务应当在server.go中完成业务逻辑代码,并且完成一个RegisterHandlers()函数准备所有的http请求处理代码。

package log

import (
    "io"
    stlog "log"
    "net/http"
    "os"
)

var log *stlog.Logger

type filelog string  //go的常用奇技淫巧:重命名类型

//业务逻辑以及http请求处理逻辑

func(f filelog) Write(data []byte) (int, error) {
    file, err := os.OpenFile(string(f), os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0600)
    if err != nil {
        return 0, err
    }
    defer file.Close()
    return file.Write(data)
}

func Run(destination string) {
    log = stlog.New(filelog(destination), "[go]", stlog.LstdFlags)
}

func RegisterHandlers() {
    http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
        switch r.Method {
        case http.MethodPost:
            msg, err := io.ReadAll(r.Body)
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                return
            }
            write(string(msg))
        default:
            w.WriteHeader(http.StatusMethodNotAllowed)
        }
    })
}

func write(data string) {  //包装
    log.Printf("%v\n", data)
}

client.go

该文件中的代码可以理解为本服务为其他service提供的调用本服务的接口,使得其他服务可以更加无感的调用服务,而非直接向目标服务的url发送http请求。

比如,在log服务中的client.go文件中,其他服务通过调用暴露出来的 SetClientLogger 函数,可以直接将自己的stlog设置为指向log服务

package log

import (
    "bytes"
    "distributed/registry"
    "fmt"
    stlog "log"
    "net/http"
)

func SetClientLogger(url string, name registry.ServiceName) {
    stlog.SetPrefix(fmt.Sprintf("[%v] - ", name))
    stlog.SetFlags(0)
    stlog.SetOutput(&clientLogger{url: url})
} 

type clientLogger struct {
    url string
}

func(c *clientLogger) Write(data []byte) (int, error) {
    b := bytes.NewBuffer([]byte(data))
    res, err := http.Post(c.url, "text/plain", b)
    if err != nil {
        return 0, err 
    }
    if res.StatusCode != http.StatusOK {
        return 0, fmt.Errorf("failed to send log message, service responded with code: %v", res.StatusCode)
    }
    return len(data), nil
}

Service Package

所有普通服务的启动将统一通过调用service package中的函数实现,这样将大大便利我们添加onLaunch 以及 onShutdown时的新feature或者函数。

package service

import (
    "context"
    "distributed/registry"
    "fmt"
    "log"
    "net/http"
)

//规定每个service的基本行为
func Start(ctx context.Context, host, port string, r registry.Registration,
    RegisterHandlers func()) (context.Context, error) {
    RegisterHandlers()
    ctx = startService(ctx, host, port, r)
    if err := registry.RegisterService(r); err != nil {
        return ctx, err
    }
    return ctx, nil
}

func startService(ctx context.Context, host, port string, 
    r registry.Registration) context.Context {
    ctx, cancel := context.WithCancel(ctx)
    //http.ServeMux is to create a mux for server's handler
    srv := http.Server{
        Addr: ":" + port,
    }

    go func() {
        log.Println(srv.ListenAndServe())
        if err := registry.ShutdownService(r.ServiceURL); err != nil {
            log.Println(err)
        }
        cancel()
    }()

    go func() {
        fmt.Printf("%v started, press any key to stop", r.ServiceName)
        var s string
        fmt.Scanln(&s)
        if err := registry.ShutdownService(r.ServiceURL); err != nil {
            log.Println(err)
        }
        srv.Shutdown(ctx)
        cancel()
    }()

    return ctx

}

服务启动的逻辑需要在这里进行说明:

  • 注册服务的路由函数

  • 获取contextWithCancel,用以监控服务状态

  • 启动两个goroutine,log.Println(srv.ListenAndServe()),只有当服务因报错退出后猜会继续进行,当服务报错退出后,调用服务注册中心中的shutdown相关函数(后续说明),另一个goroutine提供一个优雅退出功能,避免直接ctrl + c的硬退出。

  • 当goroutine退出后调用cancel(),在各个服务的main.go会通过← ctx.Donw()监控服务是否退出。

cmd

所有的服务业务逻辑、请求处理逻辑、client逻辑(各自的service包中)以及服务启动逻辑(service package中),都已经准备好了,我们还需要一个入口(main)来启动这些服务,这些main函数统一放置在cmd文件夹中,在各自的main.go中,服务将设置各项依赖以及自身信息,并且调用service package将自身启动。

Registry

服务注册中心,分布式系统中实现服务发现以及服务注册的重中之重,将在下一个板块详细介绍

Registry

微服务最重要的一个部分就是服务管理中心,在本项目中,由registry package担任这项职责,registry package中有三个文件: registration.go, server.go, client.go , 该服务鉴于其特殊性,不通过调用service package启动。下面将以模块为序对服务管理中心进行介绍。

Service Register

model

registration.go 文件作为模型以及常量定义文件,内容如下:

package registry

// this file is to claim and management services except registry
// so called service discovery

type Registration struct {
    ServiceName ServiceName
    ServiceURL  string
    RequiredServices []ServiceName
    ServiceUpdateURL string
    HeartBeatURL string
}

type ServiceName string

type patchEntry struct {
    Name ServiceName
    URL string
}

type patch struct {
    Added []patchEntry
    Removed []patchEntry
}

const (
    LogService = ServiceName("LogService")
)

Registration即为一个注册条目,每一个服务向registry注册 就是向registry提交一个 Registration, 其中包含:、

  • 服务名称:作为服务的唯一识别码

  • 服务URL:该服务对应的完整路径(同一个服务可以有多个URL,这是就需要向registry提交多个registration)

  • 依赖服务:通过required_serivices, registry可以即时向该服务更新providers信息

  • 服务更新上报地址:通常是 服务URL + /services,当registry检查到依赖项有变动时,通过该URL向该服务更新providers

  • 心跳验证地址:顾名思义

patch以及patchEntry,及时服务依赖更新传递的基本结构。

server

在server.go中,我们应用单例模式,由一个全局的registry对象对于Registrations进行管理:

type registry struct {
    registrations []Registration
    mutex         *sync.RWMutex
}

var reg = registry{ //create variable in package level
    registrations: make([]Registration, 0),
    mutex:         &sync.RWMutex{}, //new(sync.Mutex)
}

并且为registry添加一个RWMutex,读写锁与普通的Mutex不同之处在于:RWMutex的读锁可以同时被多个goroutien持有,但是写锁只能被一个goroutine持有,适用于需要被频繁读取的对象。

服务注册的基本业务代码如下:

func (r *registry) add(reg Registration) error {
    r.mutex.RLock()
    r.registrations = append(r.registrations, reg)
    r.mutex.RUnlock()
    if err := r.sendRequiredServices(reg); err != nil {
        return err
    }
    r.notify(patch{
        Added: []patchEntry {
            {
                Name: reg.ServiceName,
                URL: reg.ServiceURL,
            },
        },
    })
    return nil
}

func (r *registry) remove(url string) error {
    for k, v := range r.registrations {
        if v.ServiceURL == url {
            r.notify(patch{
                Removed: []patchEntry{
                    {
                        Name: v.ServiceName,
                        URL: v.ServiceURL,
                    },
                },
            })
            r.mutex.Lock()
            r.registrations = append(r.registrations[:k], r.registrations[k+1:]...)
            r.mutex.Unlock()
            return nil
        }
    }
    return fmt.Errorf("service at URL %s not found", url)
}

其中的notify函数涉及到服务发现,将在下文介绍

http请求逻辑代码如下:

type RegistryService struct{}

func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    log.Println("Request received")
    switch r.Method {
    case http.MethodPost:
        dec := json.NewDecoder(r.Body)
        var r Registration
        if err := dec.Decode(&r); err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }
        log.Printf("Adding service: %v, at URL: %v\n", r.ServiceName, r.ServiceURL)
        if err := reg.add(r); err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }

    case http.MethodDelete:
        payload, err := io.ReadAll(r.Body)
        if err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }
        if err := reg.remove(string(payload)); err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }
        log.Printf("Removing service at URL: %v\n", string(payload))

    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
}

client

在server中,有registry服务统一管理并且提供服务处理逻辑,在client.go中,我们将定义client service与registry交互的方法,再次重申,client.go中的函数提供给client service使用到,运行在client service的进程之中。

一个服务通过registry client注册的流程应该是:

  1. 注册自己的serviceUpdateURL的路由函数,可以由client.go统一定义并完成。

  2. 向regsitry提供的URL发送一个POST请求,将自己的注册信息添加到registry.registrations中

以上流程代码如下:

func RegisterService(r Registration) error {
    serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
    if err != nil {
        return err
    }
    http.Handle(serviceUpdateURL.Path, &serviceUpdateHandler{})

    buf := new(bytes.Buffer)
    enc := json.NewEncoder(buf)
    if err := enc.Encode(r); err != nil {
        return err
    }

    res, err := http.Post(ServicesURL, "application/json", buf)
    if err != nil {
        return err
    }

    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("failed to register service. Registry service "+
            "response with code: %v", res.StatusCode)
    }

    return nil
}

type serviceUpdateHandler struct{}

func (s *serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    dec := json.NewDecoder(r.Body)
    var p patch
    if err := dec.Decode(&p); err != nil {
        log.Println(err)
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    fmt.Println("Request received")
    prov.Update(p)
}

当一个服务被shutdown时,也要及时向registry提交服务被关停的信息,因此实现一个ShutdownService函数向registry URL提交服务关停信息:

func ShutdownService(url string) error {
    req, err := http.NewRequest(http.MethodDelete, ServicesURL,
        bytes.NewBuffer([]byte(url)))
    if err != nil {
        return err
    }
    req.Header.Add("Content-Type", "text/plain")
    res, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }

    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("failed to deregister service. Registry service"+
            "service responded with code: %v", res.StatusCode)
    }
    return nil
}

Providers

设置好该服务的 依赖项跟新路由并在registry中注册了本服务的信息之后,registry就可以根据该服务注册的信息,向依赖项跟新路由中提交依赖项服务的信息,前文提及过,同名服务可能有多个不同的URL,为了管理这些依赖项的信息,我们在client.go中定义一个providers,为我们维护这些依赖信息,同样使用单例模式。

type providers struct {
    services map[ServiceName][]string
    mutex    *sync.RWMutex
}

var prov = providers{
    services: make(map[ServiceName][]string),
    mutex:    &sync.RWMutex{},
}

更新providers以及获得provider信息的业务代码如下:

func (p *providers) Update(pat patch) error {
    p.mutex.RLock()
    defer p.mutex.RUnlock()

    for _, patchEntry := range pat.Added {
        if _, ok := p.services[patchEntry.Name]; !ok {
            p.services[patchEntry.Name] = make([]string, 0)
        }
        var urls = p.services[patchEntry.Name] // slice is passsed by reference
        checked := true
        for _, url := range urls {
            if url == patchEntry.URL {
                checked = false
                break
            }
        }
        if checked {
            urls = append(urls, patchEntry.URL)
        }
    }

    for _, patchEntry := range pat.Removed {
        if urls, ok := p.services[patchEntry.Name]; ok {
            for k, url := range urls {
                if url == patchEntry.URL {
                    urls = append(urls[:k], urls[k+1:]...)
                }
            }
        }
    }
    return nil
}

func  (p *providers) get(name ServiceName) (string, error) {
    if _, ok := p.services[name]; !ok {
        return "", fmt.Errorf("failed to get service from providers")
    }
    idx := int(rand.Float32() * float32(len(p.services[name])))
    return p.services[name][idx], nil
}

func GetProvider(name ServiceName) (string, error) {
    return prov.get(name) //包装接口向client service提供
}

注意业务代码对于client service可见性的处理。

get函数中将从依赖服务的URL中随机挑选一个完成负载均衡。

Service Discovery

服务发现,就是向注册在registry中的服务 即时通知其依赖项变动的feature,当有新的服务注册时,registry遍历registrations,寻找依赖该服务的service,向其更新服务变动,当有服务关停时同理。

相关的调用已经在 Service Register - server 中的add,remove函数中有所体现。

Required service initialization

当服务注册到registry时,registry应检查当前的registration,将该服务需要的依赖项打包发送到serviceUpdateURL:

func (r *registry) sendRequiredServices(reg Registration) error {
    r.mutex.RLock()
    defer r.mutex.RUnlock()
    var p patch
    for _, serviceReg := range r.registrations {
        for _, reqService := range reg.RequiredServices {
            if serviceReg.ServiceName == reqService {
                p.Added = append(p.Added, patchEntry{
                    Name: serviceReg.ServiceName,
                    URL:  serviceReg.ServiceURL,
                })
            }
        }
    }
    err := r.sendPatch(p, reg.ServiceUpdateURL)
    if err != nil {
        return err
    }
    return nil
}

Required service notification

当服务被注册之后,该服务的一个依赖项发生了变动(新增一个URL,或关停一个URL),这是应当向该服务发送服务变动通知(服务状态监控):

func (r *registry) notify(fullpatch patch) {
    r.mutex.RLock()
    defer r.mutex.RUnlock() //大胆随便加,一个rw锁可以被多个goroutine持有

    for _, reg := range r.registrations { // 善用并发优化循环
        go func (reg Registration) {
            p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
            sendUpdate := false
            for _, reqService := range reg.RequiredServices {
                for _, added := range fullpatch.Added {
                    if added.Name == reqService {
                        p.Added = append(p.Added, added)
                        sendUpdate = true
                    }
                }
                for _, removed := range fullpatch.Removed {
                    if removed.Name == reqService {
                        p.Removed = append(p.Removed, removed)
                        sendUpdate = true
                    }
                }
            }
            if sendUpdate {
                if err := r.sendPatch(p, reg.ServiceURL); err != nil {
                    log.Println(err)
                    return
                }
            }
        }(reg)
    }
}

Launch

因为我们在service package的服务启动代码中添加了调用registry服务相关的代码,因此registry服务自身的启动当然不能通过service package进行,需要我们自行在main.go实现,就是service package中的逻辑去除掉服务注册相关的部分:

package main

import (
    "context"
    "distributed/registry"
    "fmt"
    "log"
    "net/http"
)

func main() {
    registry.SetupRegistryService()
    http.Handle("/services", &registry.RegistryService{})
    ctx, cancel := context.WithCancel(context.Background())

    defer cancel()

    srv := http.Server{
        Addr: registry.ServerPort,
    }

    go func ()  {
        log.Println(srv.ListenAndServe())
        cancel()
    }()

    go func() {
        fmt.Println("Registry starting, press any key to shutdown")
        var s string
        fmt.Scanln(&s)
        cancel()
    }()

    <- ctx.Done()
    fmt.Println("Shutting down registry service")
}

与service package中的逻辑唯一不同的是,在最开始我们调用了一个自定义的setup函数registry.SetupRegistryService(),然我们一起观察一下:

var once sync.Once 

func SetupRegistryService() {
    once.Do(func() {
        go reg.heartBeat(3 * time.Second)
    })
}

func (r *registry) heartBeat(freq time.Duration) {
    for {
        var wg sync.WaitGroup 
        for _, reg := range r.registrations {
            wg.Add(1)
            go func (reg Registration) {
                defer wg.Done()
                success := true
                for attempts := 0; attempts < 3; attempts ++ {
                    res, err := http.Get(reg.HeartBeatURL)
                    if err != nil {
                        log.Println(err)
                    } else if res.StatusCode == http.StatusOK {
                        log.Printf("HeartBeat check passed for %v", reg.ServiceName)
                        if !success {
                            r.add(reg)
                        }
                        break;
                    }
                    log.Printf("HeartBeat check failed for %v", reg.ServiceName)
                    if success {
                        success = false
                        r.remove(reg.ServiceURL)
                    }
                    time.Sleep(time.Second)
                }
            }(reg)
            wg.Wait()
            time.Sleep(freq)
        }
    }
}

调用setup函数,启动了一个无限循环的heartbeat函数用以检查各个服务的状态,为了确保该函数指挥被启动一次,我们使用了var once sync.Once

Difference between Handle & HandleFunc

In summary, the primary difference between http.Handle() and http.HandleFunc() is how they associate handlers with URL patterns. http.Handle() is used to register custom http.Handler implementations, while http.HandleFunc() is used to register functions with the specific signature for handling HTTP requests. The choice between the two depends on your preference and requirements for your HTTP server implementation.

http.Handle("/custom", http.HandlerFunc(customHandler))
http.HandleFunc("/custom", customHandler)

ResponseWriter is ususally a small object so not use pointer.

关于读写锁

sync.Mutexsync.RWMutex 都是 Go 语言中用于控制并发访问共享资源的同步原语,但它们有不同的应用场景和行为。

  1. sync.Mutex(互斥锁):

    • sync.Mutex 是一种常用的互斥锁,用于保护共享资源不被多个 goroutine 同时访问。

    • 当一个 goroutine 获得了 Mutex 的锁,其他试图获取锁的 goroutine 将被阻塞,直到拥有锁的 goroutine 释放锁。

    • Mutex 是排他的,只有一个 goroutine 能够访问被锁保护的资源。

    • 这种锁适用于对临界区进行互斥访问的场景,但不支持多个 goroutine 并发读取。

goCopy code
var mu sync.Mutex

// 锁定
mu.Lock()
// 访问或修改共享资源
mu.Unlock() // 解锁
  1. sync.RWMutex(读写锁):

    • sync.RWMutex 也是用于保护共享资源的锁,但它支持读多写少的场景,以提高并发性能。

    • RWMutex 有两种锁状态:读锁和写锁。多个 goroutine 可以同时持有读锁,但只能有一个 goroutine 持有写锁,而且写锁是排他的。

    • 当一个 goroutine持有写锁时,其他 goroutine 无法获得读锁或写锁,从而确保数据的一致性。

    • 当没有写锁时,多个 goroutine 可以同时获得读锁,允许并发读取共享资源。

goCopy code
var rwmu sync.RWMutex

// 读锁定
rwmu.RLock()
// 读取共享资源
rwmu.RUnlock() // 读解锁

// 写锁定
rwmu.Lock()
// 修改共享资源
rwmu.Unlock() // 写解锁
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇