并发是多数服务端应用所需要考虑的主要问题之一,考虑到现代微处理器的特性,并发也成为编程语言的主要关切之一。Go语言引入了“goroutine”的理念。可以把“goroutine”理解为一个“轻量级的用户空间线程”(现实中,当然远比这要复杂得多,同一线程可能会附着多路的goroutine,但这样的提法可以让你有一个大致的概念)。所谓“轻量级”,可以这样理解,由于采用了十分袖珍的堆栈,你可以同时启动数以百万计的goroutine,事实上这也是Go语言官方所推荐的方式。在Go语言中,任何函数或方法都可以生成一个goroutine。比如,只需要运行“go myAsyncTask()”就可以从“myAsyncTask”函数生成一个goroutine。示例代码如下:
// This function performs the given task concurrently by spawing a goroutine// for each of those tasks.func performAsyncTasks(task []Task) { for _, task := range tasks { // This will spawn a separate goroutine to carry out this task. // This call is non-blocking go task.Execute() }}
在编写异步或者并发的代码时,goroutine和channel这两个概念赋予了编程者大量的灵活性和简便性。可以籍此很容易的建立其他很有用的库,比如goroutine pool,举个简单的例子:
package executorimport ( "log" "sync/atomic")// The Executor struct is the main executor for tasks.// 'maxWorkers' represents the maximum number of simultaneous goroutines.// 'ActiveWorkers' tells the number of active goroutines spawned by the Executor at given time.// 'Tasks' is the channel on which the Executor receives the tasks.// 'Reports' is channel on which the Executor publishes the every tasks reports.// 'signals' is channel that can be used to control the executor. Right now, only the termination// signal is supported which is essentially is sending '1' on this channel by the client.type Executor struct { maxWorkers int64 ActiveWorkers int64 Tasks chan Task Reports chan Report signals chan int}// NewExecutor creates a new Executor.// 'maxWorkers' tells the maximum number of simultaneous goroutines.// 'signals' channel can be used to control the Executor.func NewExecutor(maxWorkers int, signals chan int) *Executor { chanSize := 1000 if maxWorkers > chanSize { chanSize = maxWorkers } executor := Executor{ maxWorkers: int64(maxWorkers), Tasks: make(chan Task, chanSize), Reports: make(chan Report, chanSize), signals: signals, } go executor.launch() return &executor}// launch starts the main loop for polling on the all the relevant channels and handling differents// messages.func (executor *Executor) launch() int { reports := make(chan Report, executor.maxWorkers) for { select { case signal := <-executor.signals: if executor.handleSignals(signal) == 0 { return 0 } case r := <-reports: executor.addReport(r) default: if executor.ActiveWorkers < executor.maxWorkers && len(executor.Tasks) > 0 { task := <-executor.Tasks atomic.AddInt64(&executor.ActiveWorkers, 1) go executor.launchWorker(task, reports) } } }}// handleSignals is called whenever anything is received on the 'signals' channel.// It performs the relevant task according to the received signal(request) and then responds either// with 0 or 1 indicating whether the request was respected(0) or rejected(1).func (executor *Executor) handleSignals(signal int) int { if signal == 1 { log.Println("Received termination request...") if executor.Inactive() { log.Println("No active workers, exiting...") executor.signals <- 0 return 0 } executor.signals <- 1 log.Println("Some tasks are still active...") } return 1}// launchWorker is called whenever a new Task is received and Executor can spawn more workers to spawn// a new Worker.// Each worker is launched on a new goroutine. It performs the given task and publishes the report on// the Executor's internal reports channel.func (executor *Executor) launchWorker(task Task, reports chan<- Report) { report := task.Execute() if len(reports) < cap(reports) { reports <- report } else { log.Println("Executor's report channel is full...") } atomic.AddInt64(&executor.ActiveWorkers, -1)}// AddTask is used to submit a new task to the Executor is a non-blocking way. The Client can submit// a new task using the Executor's tasks channel directly but that will block if the tasks channel is// full.// It should be considered that this method doesn't add the given task if the tasks channel is full// and it is up to client to try again later.func (executor *Executor) AddTask(task Task) bool { if len(executor.Tasks) == cap(executor.Tasks) { return false } executor.Tasks <- task return true}// addReport is used by the Executor to publish the reports in a non-blocking way. It client is not// reading the reports channel or is slower that the Executor publishing the reports, the Executor's// reports channel is going to get full. In that case this method will not block and that report will// not be added.func (executor *Executor) addReport(report Report) bool { if len(executor.Reports) == cap(executor.Reports) { return false } executor.Reports <- report return true}// Inactive checks if the Executor is idle. This happens when there are no pending tasks, active// workers and reports to publish.func (executor *Executor) Inactive() bool { return executor.ActiveWorkers == 0 && len(executor.Tasks) == 0 && len(executor.Reports) == 0}
Go的这一设计理念的主要用意在于,在表达想法、算法或者编码的环节,开发者可以尽量少想或者不去想“在某种编程语言中处理此事的最佳方案”,让不同的开发者可以更容易理解对方的代码。不支持泛型和异常使得Go语言并不那么完美,也因此在很多场景下束手束脚 ,因此在“Go 2”版本中,官方加入了对这些必要特性的考虑。
是否采用一种新的语言或工具,直接取决于开发者体验的好坏。就Go语言来说,其工具集是用户采纳的主要考量。同最小化的内核一样,Go的工具集也采用了同样的设计理念,最小化,但足够应付需要。执行所有Go语言工具,都采用 go 命令及其子命令,并且全部是以命令行的方式。
go get github.com/farkaskid/WebCrawler/blob/master/executor/executor.go
对于package.json这类的包,我没有看到与 goget 等价的命令。事实上也没有。在Go语言中,无须在一个单一文件中指定所有的依赖,可以在源文件中直接使用下面的命令:
import "github.com/xlab/pocketsphinx-go/sphinx"
那么,当执行go build命令的时候,运行时会自动的运行 goget 来获取所需要的依赖。完整的源码如下:
package mainimport ( "encoding/binary" "bytes" "log" "os/exec" "github.com/xlab/pocketsphinx-go/sphinx" pulse "github.com/mesilliac/pulse-simple" // pulse-simple)var buffSize intfunc readInt16(buf []byte) (val int16) { binary.Read(bytes.NewBuffer(buf), binary.LittleEndian, &val) return}func createStream() *pulse.Stream { ss := pulse.SampleSpec{pulse.SAMPLE_S16LE, 16000, 1} buffSize = int(ss.UsecToBytes(1 * 1000000)) stream, err := pulse.Capture("pulse-simple test", "capture test", &ss) if err != nil { log.Panicln(err) } return stream}func listen(decoder *sphinx.Decoder) { stream := createStream() defer stream.Free() defer decoder.Destroy() buf := make([]byte, buffSize) var bits []int16 log.Println("Listening...") for { _, err := stream.Read(buf) if err != nil { log.Panicln(err) } for i := 0; i < buffSize; i += 2 { bits = append(bits, readInt16(buf[i:i+2])) } process(decoder, bits) bits = nil }}func process(dec *sphinx.Decoder, bits []int16) { if !dec.StartUtt() { panic("Decoder failed to start Utt") } dec.ProcessRaw(bits, false, false) dec.EndUtt() hyp, score := dec.Hypothesis() if score > -2500 { log.Println("Predicted:", hyp, score) handleAction(hyp) }}func executeCommand(commands ...string) { cmd := exec.Command(commands[0], commands[1:]...) cmd.Run()}func handleAction(hyp string) { switch hyp { case "SLEEP": executeCommand("loginctl", "lock-session") case "WAKE UP": executeCommand("loginctl", "unlock-session") case "POWEROFF": executeCommand("poweroff") }}func main() { cfg := sphinx.NewConfig( sphinx.HMMDirOption("/usr/local/share/pocketsphinx/model/en-us/en-us"), sphinx.DictFileOption("6129.dic"), sphinx.LMFileOption("6129.lm"), sphinx.LogFileOption("commander.log"), ) dec, err := sphinx.NewDecoder(cfg) if err != nil { panic(err) } listen(dec)}
如你所见,Go语言是如此的简单、最小化但仍足够满足需要并且十分优雅。Go语言提供了诸多的直接的工具支持,既可用于单元测试,也可以用于benchmark的火焰图。诚然,正如前面所讲到的特性集方面的限制,Go语言也有其缺陷。比如, goget 并不支持版本化,一旦源文件中引用了某个URL,就将锁定于此。但是,Go也还在逐渐的演进,一些依赖管理的工具也正在涌现。
Protobuf 或者说 Protocol Buffers是由Google研发的一种二进制通信格式,用以对结构化数据进行序列化。格式是什么意思?类似于JSON这样?是的。Protobuf已经有10年的历史,在Google内部也已经使用了一段时间。
那么Protobuf是不是真的很快?简单回答,是的。根据Google Developer的数据,相对于XML来说,Protobuf在体积上只有前者的1/3到1/10,在速度上却要快20到100倍。毋庸置疑的是,由于采用了二进制格式,序列化的数据对于人类来说是不可读的。
相对其他传输协议格式来说,Protobuf采用了更有规划性的方式。首先需要定义 .proto 文件,这种文件与schema类似,但更强大。在 .proto 文件中定义消息结构,哪些字段是必选的哪些是可选的,以及字段的数据类型等。接下来,Protobuf编译器会生成用于数据访问的类,开发者可以在业务逻辑中使用这些类来更方便的进行数据传输。
观察某个服务的 .proto 文件,可以清晰的获知通信的细节以及暴露的特性。一个典型的 .proto 文件类似如下:
message Person { required string name = 1; required int32 id = 2; optional string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; } message PhoneNumber { required string number = 1; optional PhoneType type = 2 [default = HOME]; } repeated PhoneNumber phone = 4;}
曝个料:Stack Overflow的大牛Jon Skeet也是Protobuf项目的主要贡献者之一。
使用REST做为此类服务的通信范式变得不那么有效。一方面,采用REST API确实可以让服务的表达能力更强,但同时,如果这种表达的能力既非必要也并不出自设计者的本意,我们就需要根据不同的因素考虑其他范式了。
service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse);}message HelloRequest { string greeting = 1;}message HelloResponse { string reply = 1;}
只需要为服务定义一个 .proto 文件,并在其中描述接口名称,服务的需求,以及以Protobuf格式返回的消息即可。Protobuf编译器会生成客户端和服务端代码。客户端可以直接调用这些代码,服务端可以用这些代码实现API来填充业务逻辑。
