异步消息解决方案:Go 每日一库之 watermill

幸运草
幸运草
幸运草
896
文章
3
评论
2020年4月13日18:49:41 评论 209

 作为学习,message-bus确实不错。但是在实际使用上,message-bus的功能就有点捉襟见肘了。例如,message-bus将消息发送到订阅者管道之后就不管了,这样如果订阅者处理压力较大,会在管道中堆积太多消息,一旦订阅者异常退出,这些消息将会全部丢失!另外,message-bus不负责保存消息,如果订阅者后启动,之前发布的消息,这个订阅者是无法收到的。这些问题,我们将要介绍的watermill都能解决!

watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill内置了多种订阅-发布实现,包括Kafka/RabbitMQ,甚至还支持HTTP/MySQL binlog。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。

快速使用

watermill内置了很多订阅-发布实现,最简单、直接的要属GoChannel。我们就以这个实现为例介绍watermill的特性。

安装:

$ go get github.com/ThreeDotsLabs/watermill

使用:

package main

import (
"context"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}

首先,我们创建一个GoChannel对象,它是一个消息管理器。可以调用其Subscribe订阅某个主题(topic)的消息,调用其Publish()以某个主题发布消息。Subscribe()方法会返回一个<-chan *message.Message,一旦该主题有消息发布,GoChannel就会将消息发送到该管道中。订阅者只需监听此管道,接收消息进行处理。在上面的例子中,我们启动了一个消息处理的goroutine,持续从管道中读取消息,然后打印输出。主goroutine在一个死循环中每隔 1s 发布一次消息。

message.Message这个结构是watermill库的核心,每个消息都会封装到该结构中发送。Message保存的是原始的字节流([]byte),所以可以将 JSON/protobuf/XML 等等格式的序列化结果保存到Message中。

有两点注意:

  • 收到的每个消息都需要调用Message的Ack() 方法确认,否则GoChannel会重发当前消息;
  • Message有一个UUID字段,建议设置为唯一的,方便定位问题。watermill提供方法NewUUID()生成唯一 id。

下面看示例运行:

异步消息解决方案:Go 每日一库之 watermill

路由

上面的发布和订阅实现是非常底层的模式。在实际应用中,我们通常想要监控、重试、统计等一些功能。而且上面的例子中,每个消息处理结束需要手动调用Ack()方法,消息管理器才会下发后面一条信息,很容易遗忘。还有些时候,我们有这样的需求,处理完某个消息后,重新发布另外一些消息。

这些功能都是比较通用的,为此watermill提供了路由(Router)功能。直接拿来官网的图:

异步消息解决方案:Go 每日一库之 watermill

路由其实管理多个订阅者,每个订阅者在一个独立的goroutine中运行,彼此互不干扰。订阅者收到消息后,交由注册时指定的处理函数(HandlerFunc)。路由还可以设置插件(plugin)和中间件(middleware),插件是定制路由的行为,而中间件是定制处理器的行为。处理器处理消息后会返回若干消息,这些消息会被路由重新发布到(另一个)管理器中。

var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("n> Received message: %sn> %sn>n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}

首先,我们创建一个路由:

router, err := message.NewRouter(message.RouterConfig{}, logger)

然后为路由注册处理器。注册的处理器有两种类型,一种是:

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

这个方法原型为:

func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler

该方法的作用是创建一个名为handlerName的处理器,监听subscriber中主题为subscribeTopic的消息,收到消息后调用handlerFunc处理,将返回的消息以主题publishTopic发布到publisher中。

另外一种处理器是下面这种形式:

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

从名字我们也可以看出,这种形式的处理器只处理接收到的消息,不发布新消息。

最后,我们调用router.Run()运行这个路由。

其中,创建GoChannel发布消息和上面的没什么不同。

使用路由还有个好处,处理器返回时,若无错误,路由会自动调用消息的Ack()方法;若发生错误,路由会调用消息的Nack()方法通知管理器重发这条消息。

上面只是路由的最基本用法,路由的强大之处在于中间件。

中间件

watermill中内置了几个比较常用的中间件:

  • IgnoreErrors:可以忽略指定的错误;
  • Throttle:限流,限制单位时间内处理的消息数量;
  • Poison:将处理失败的消息以另一个主题发布;
  • Retry:重试,处理失败可以重试;
  • Timeout:超时,如果消息处理时间超过给定的时间,直接失败。
  • InstantAck:直接调用消息的Ack()方法,不管后续成功还是失败;
  • RandomFail:随机抛出错误,测试时使用;
  • Duplicator:调用两次处理函数,两次返回的消息都重新发布出去,double~
  • Correlation:处理函数生成的消息都统一设置成原始消息中的correlation id,方便追踪消息来源;
  • Recoverer:捕获处理函数中的panic,包装成错误返回。

中间件的使用也是比较简单和直接的:调用router.AddMiddleware()。例如,我们想要把处理返回的消息 double 一下:

router.AddMiddleware(middleware.Duplicator)

想重试?可以:

router.AddMiddleware(middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware)

上面设置最大重试次数为 3,重试初始时间间隔为 100ms。

一般情况下,生产环境需要保证稳定性,某个处理异常不能影响后续的消息处理。故设置Recoverer是比较好的选择:

router.AddMiddleware(middleware.Recoverer)

也可以实现自己的中间件:

func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}

中间件有两种实现方式,如果不需要参数或依赖,那么直接实现为函数即可,像上面这样。如果需要有参数,那么可以实现为一个结构:

type myMiddleware struct {
Name string
}

func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}

这两种中间件的添加方式有所不同,第一种直接添加:

router.AddMiddleware(MyMiddleware)

第二种要构造一个对象,然后将其Middleware方法传入,在该方法中可以访问MyMiddleware对象的字段:

router.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)

设置

如果运行上面程序,你很可能会看到这样一条日志:

No subscribers to send message

因为发布消息是在另一个goroutine,我们没有控制何时发布,可能发布消息时,我们还未订阅。我们观察后面的处理日志,对比 uuid 发现这条消息直接被丢弃了。watermill提供了一个选项,可以将消息都保存下来,订阅某个主题时将该主题之前的消息也发送给它:

pubSub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
}, logger)

创建GoChannel时将Config中Persistent字段设置为true即可。此时运行,我们仔细观察一下,出现No subscribers to send message信息的消息后续确实被处理了。

RabbitMQ

除了GoChannel,watermill还内置了其他的发布-订阅实现。这些实现除了发布-订阅器创建的方式不同,其他与我们之前介绍的基本一样。这里我们简单介绍一下RabbitMQ,其他的可自行研究。

使用RabbitMQ需要先运行RabbitMQ程序,RabbitMQ采用Erlang开发。我们之前很多文章也介绍过 windows 上的软件安装神器choco。使用choco安装RabbitMQ:

$ choco install rabbitmq

启动RabbitMQ服务器:

$ rabbitmq-server.bat

watermill对RabbitMQ的支持使用独立库的形式,需要另行安装:

$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp

发布订阅:

var amqpURI = "amqp://localhost:5672/"

func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}

messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}

publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}

如果有自定义发布-订阅实现的需求,可以参考RabbitMQ的实现:github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp。

总结

watermill提供丰富的功能,且预留了扩展点,可自行扩展。另外,源码中处理goroutine创建和通信、多种并发模式的应用都是值得一看的。官方 GitHub 上还有一个事件驱动示例:https://github.com/ThreeDotsLabs/event-driven-example。

特别声明:以上文章内容仅代表作者本人观点,不代表变化吧观点或立场。如有关于作品内容、版权或其它问题请于作品发表后的30日内与变化吧联系。

转载请注明:{{title}}-变化吧
  • 赞助本站
  • 微信扫一扫
  • weinxin
  • 赞助本站
  • 支付宝扫一扫
  • weinxin
幸运草
Go语言接口规则 前端框架

Go语言接口规则

Go语言接口规则 接口是一个或多个方法签名的集合。任何类型的方法集中只要拥有该接口对应的全部方法签名。就表示它 "实现" 了该接口,无须在该类型上显式声明实现了哪个接口。对应方法,是指有相同名称、参数列表 (不包括参数名) 以及返回值,该类型也可以有其他方法。 接口赋值 对象赋值给接口时,会发生拷贝,而接口内部存储的是指向这个复制品的指针,既无法修改复制品的状态,也无法获取指针。 package main import "fmt" type User struct {     id   int     name string } func main() {     u := User{18, "oldboy"}     var i interface{} = u     u.id = 20     u.name = "Golang"     fmt.Printf("u : %vn", u)     fmt.Printf("i.(User) : %vn", i.(User)) } 运行结果: u : {20 Golang} i.(User) : {18 oldboy} 接口转型返回临时对象,只有使用指针才能修改其状态。 package main import "fmt" type User struct {     id   int     name string } func main() {     u := User{18, "oldboy"}     var vi, pi interface{} = u, &u     // vi.(User).name = "Golang"     pi.(*User).name = "Golang"     fmt.Printf("vi.(User) : %vn", vi.(User))     fmt.Printf("pi.(*User) : %vn", pi.(*User)) } 空接口 只有当接口存储的类型和对象都为nil时,接口才等于nil。 package main import (     "fmt" ) func main() {     var i interface{}     fmt.Printf("i => %vn", i)     fmt.Printf("(i == nil) => %vn", i == nil)     var p *int = nil     // i 指向 p,指向的对象是个nil,但是存在类型不是nil,是个指针     i = p     fmt.Printf("i => %vn", i)     fmt.Printf("(i == nil) => %vn", i == nil) } 运行结果: i => <nil> (i == nil) => true i => <nil> (i == nil) => false 接口实现 接口只有方法声明,没有数据字段,没有实现,也不需要显示的实现。只要一个变量,含有接口类型中的所有方法,那么这个变量就实现这个接口。 package main import (     "fmt" ) type Info interface {     GetAge() int     GetName() string } type User struct {     name string     age  int } func (u User) GetAge() int {     return u.age } func (u User) GetName() string {     return u.name } func main() {     var user Info = User{"oldboy", 18}     age := user.GetAge()     name := user.GetName()     fmt.Println(age, name) } 如果一个变量含有了多个interface类型的方法,那么这个变量就实现了多个接口。 package main import (     "fmt" ) type Age interface {     GetAge() int } type Name interface {     GetName() int } type User struct {     name string...
Go语言中处理 HTTP 服务器 前端框架

Go语言中处理 HTTP 服务器

1 概述 包 net/http 提供了HTTP服务器端和客户端的实现。本文说明关于服务器端的部分。 快速开始: package main import (   "log"   "net/http" ) func main() {   // 设置 路由   http.HandleFunc("/", IndexAction)   // 开启监听   log.Fatal(http.ListenAndServe(":8888", nil)) } func IndexAction(w http.ResponseWriter, r *http.Request) {  w.Write(byte(`<h1 align="center">来自变化吧的问候</h1>`)) } 运行程序,在浏览器上请求: localhost:8888,你会看到我们的结果 Go语言构建HTTP服务器还是很容易的。深入说明。 2 http.Server 类型 HTTP 服务器在 Go 语言中是由 http.Server 结构体对象实现的。参考 http.ListenAndServe() 的实现: // 文件:src/net/http/server.go // ListenAndServe always returns a non-nil error. func ListenAndServe(addr string, handler Handler) error {   server := &Server{Addr: addr, Handler: handler}   return server.ListenAndServe() } 可见过程是先实例化 Server 对象,再完成 ListenAndServe 。其中 Serve 对象就是表示 HTTP 服务器的对象。其结构如下 : // 文件:src/net/http/server.go type Server struct {   Addr    string  // TCP 监听地址, 留空为:":http"   Handler Handler // 调用的 handler(路由处理器), 设为 nil 表示 http.DefaultServeMux   TLSConfig *tls.Config // TLS 配置对象   ReadTimeout time.Duration // 请求超时时长   ReadHeaderTimeout time.Duration // 请求头超时时长   WriteTimeout time.Duration // 响应超时时长   IdleTimeout time.Duration // 请求空闲时长(keep-alive下两个请求间)   MaxHeaderBytes int // 请求头的最大长度   TLSNextProto mapfunc(*Server, *tls.Conn, Handler) // NPN 型协议升级出现时接管TLS连接的处理器函数映射表   ConnState func(net.Conn, ConnState) // 状态转换事件处理器   ErrorLog *log.Logger // 日志记录对象   disableKeepAlives int32     // accessed atomically.   inShutdown        int32     // accessed atomically (non-zero means we're in Shutdown)   nextProtoOnce     sync.Once // guards setupHTTP2_* init   nextProtoErr      error     // result of http2.ConfigureServer if used   mu         sync.Mutex   listeners  mapstruct{}   activeConn mapstruct{}   doneChan   chan struct{}   onShutdown func() } 可见 Server 定义了服务器需要的信息。 实例化了 Server 对象后,调用其 (srv *Server) ListenAndServe() error 方法。该方法会监听 srv.Addr 指定的 TCP 地址,并通过 (srv *Server) Serve(l net.Listener) error 方法接收浏览器端连接请求。Serve 方法会接收监听器 l 收到的每一个连接,并为每一个连接创建一个新的服务进程。 该 go...
go语言动态库的编译和使用 前端框架

go语言动态库的编译和使用

本文主要介绍go语言动态库的编译和使用方法,以linux平台为例,windows平台步骤一样,具体环境如下: $ echo $GOPATH /media/sf_share/git/go_practice $ echo $GOROOT /usr/lib/golang/ $ tree $GOPATH/src /media/sf_share/git/go_practice/src |-- demo |   `-- demo.go `-- main.go 1 directory, 2 files 在$GOPATH/src目录,有demo包和使用demo包的应用程序main.go,main.go代码如下: package main import "demo" func main() {    demo.Demo() } demo包中的demo.go代码如下: package demo import "fmt" func Demo() {    fmt.Println("call demo ...") } 由于demo.go是$GOPATH/src目录下的一个包,main.go在import该包后,可以直接使用,运行main.go: $ go run main.go call demo ... 现在,需要将demo.go编译成动态库libdemo.so,让main.go以动态库方式编译,详细步骤如下: 1 将go语言标准库编译成动态库 $ go install -buildmode=shared -linkshared  std 在命令行运行go install -buildmode=shared -linkshared  std命令,-buildmode指定编译模式为共享模式,-linkshared表示链接动态库,成功编译后会在$GOROOT目录下生标准库的动态库文件libstd.so,一般位于$GOROOT/pkg/linux_amd64_dynlink目录: $ cd $GOROOT/pkg/linux_amd64_dynlink $ ls libstd.so libstd.so 2 将demo.go编译成动态库 $ go install  -buildmode=shared -linkshared demo $ cd $GOPATH/pkg $ ls linux_amd64_dynlink/ demo.a  demo.shlibname  libdemo.so 成功编译后会在$GOPATH/pkg目录生成相应的动态库libdemo.so。 3 以动态库方式编译main.go $ go...
go语言 - Scheduler原理以及查看Goroutine执行 前端框架

go语言 - Scheduler原理以及查看Goroutine执行

最近看了看go scheduler的基本原理,本文介绍go语言scheduler的基本原理以及如何查看go代码中的go routine的执行情况。 0)Scheduler(调度器) 熟悉go语言的小伙伴应该都使用过goroutine。goroutine就是Go语言提供的一种用户态线程。Scheduler是调度goroutine的调度器。 Go的调度器内部有三个重要概念:M,P,G。 M (machine): 代表真正的内核操作系统里面的线程,和POSIX里的thread差不多,也是真正执行goroutine逻辑的部分。 G (Goroutine): 代表一个goroutine。 P (Processor): 代表调度的上下文,可以理解成一个局部调度器。 Go语言实现了多个Goroutine到多个Processor的映射(调度)。注意的是,针对X个Processor,Scheduler可能创建多于X个M(有些M可能会暂时被block)。还需要理解额外两个概念:GRQ(Global Running Queue)以及 LRQ(Local Running Queue)。未指定Processor的Goroutine会存放在GRQ上,在调度到合适的Processor后,会将一个Goroutine从GRQ移动到LRQ。 Go程序中发生了四类事件,允许调程序做出调度决策。 a. 使用关键字go b. 垃圾收集 c. 系统调用 d. 同步 1)Processor的个数 Processor的个数可以通过GOMAXPROCS环境变量设置。GOMAXPROCS默认值是CPU的核数。Processor的个数可以通过如下的go代码进行查询: package main import ( "fmt" "runtime" ) func main() { // NumCPU returns the number of logical // CPUs usable by the current process. fmt.Println(runtime.NumCPU()) } 也就是通过runtime.NumCPU函数可以获得Processor的个数。查看go语言的源代码(runtime/os_linux.c),NumCPU函数的实现函数如下:  func getproccount() int32 { const maxCPUs = 64 * 1024 var buf byte r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf) if r < 0 { return 1 } n := int32(0) for _, v := range buf { for v != 0 { n += int32(v...