Ahmed Ashraf

Just before we start I'd like to tell you this article contains the info I was able to understand about go channels and how you can use it in a good way. so if you find any mistake please leave a comment with it :).

What is Go Channels?

Golang docs say:

Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.

so it's like a simple pipe. you send to it from one side and receive from it from its other side. and with goroutines, you can make it async as I will show you the email queue next.

How to use Channels in Go

package main

import (
    "fmt"
)


func main() {
    // Create the channel
    messages := make(chan string)

    // run a go routine to send the message
    go func() { messages <- "ping" }()
    
    // when you receive the message store it in msg variable
    msg := <-messages

    // print received message
    fmt.Println(msg)
}

  • Line 1: we created the channel using make
  • Line 2: we send the message through a goroutine
  • Line 3: we receive the messages and store it in msg variable
  • Line 4: we print the message

so at line 2, we used a goroutine, without it, the code won't work as there are no listeners and we call that a deadlock

messages := make(chan string)

messages <- "ping"

msg := <-messages

fmt.Println(msg)

Output

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox676902258/prog.go:11 +0x60

that happens because it's not clear for Go to understand where it should store the message if there are no listeners to send the message directly to them. so a simple solution is to use Buffers

Chanel Buffer

go allows us to set the buffer to a channel by using make(chan string,1), the second parameter means that we tell go to create the channel with a buffer of 1 string

messages := make(chan string,1)

messages <- "ping"

msg := <-messages

fmt.Println(msg)

Now if we try to send 2 messages before the initializing the listener it will throw a deadlock, because for the second message there is no place to store it, also we don't have a listener to send it directly

messages <- "ping"
messages <- "pong"

msg := <-messages

fmt.Println(msg)

Best design to use channels for background workers

so how we deal with channels? do we send all messages with goroutines? what about dropping all channels and just open a goroutine with every job?

all these questions we are going to answer them in the next points

a simple queue worker diagram can be like

simply we need to create a Dispatcher, a dispatcher opens X number of workers based on the requirements and these workers will receive the messages and handles them as we will see. by Workers I mean channel listeners

Create a Dispatcher

//JobQueue ... a buffered channel that we can send work requests on.
var JobQueue chan Queuable
        
//Queuable ... interface of Queuable Job
type Queuable interface {
    Handle() error
}

//Dispatcher ... worker dispatcher
type Dispatcher struct {
    maxWorkers int
    WorkerPool chan chan Queuable
    Workers    []Worker
}

The Dispatcher has 3 main properties

  • Max worker: how many workers this dispatcher owns
  • Worker Pool: we register all workers in that pool so the dispatcher can pull one of them every time to send the message to it
  • Workers: contains all workers to talk to anyone of them like close a specific

Now let's create the dispatcher creator

//NewDispatcher ... creates new queue dispatcher
func NewDispatcher(maxWorkers int) *Dispatcher {
    // make job bucket
    if JobQueue == nil {
        JobQueue = make(chan Queuable, 10)
    }
    pool := make(chan chan Queuable, maxWorkers)
    return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}

The dispatcher now needs Run method to start his job

//Run ... starts work of dispatcher and creates the workers
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.WorkerPool)
        worker.Start()
        // register in dispatcher's workers
        d.Workers = append(d.Workers, worker)
    }

    go d.dispatch()
}

the end of Run method starts dispatch of the Dispatcher

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Queuable) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}     

dispatch runs an infinite loop, with every new message(job) it will pull one of the workers and send the message to that worker to handle it.

Create Worker

//Worker … simple worker that handles queueable tasks
type Worker struct {
    Name       string
    WorkerPool chan chan Queuable
    JobChannel chan Queuable
    quit       chan bool
}
  • Name: worker name so we can see it in logs
  • WorkerPool: the pool of the dispatcher to register itself to it
  • JobChannel: The messages(jobs) the worker receives from the dispatcher
  • quit: to close the worker. it helps for autoscale.

So how worker runs after the dispatcher sends the messages to it

//Start ... initiate worker to start listening for upcoming queueable jobs
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Handle(); err != nil {
                    fmt.Printf("Error in job: %s\n", err.Error())
                }
            }
        }
    }()
}

so Start method starts an infinite loop, the first thing the worker registers itself to the WorkerPool of the dispatcher. then it listens for any message within the select scope. once it receives the message it will call handle() method to process this channel.

a simple scenario

  • Dispatcher started with 2 workers
  • The first worker registers itself and waiting for any messages
  • The second worker registers itself and waiting for any messages
  • The dispatcher receives a message and pulls the first worker from the pool as it was the first registering itself.
  • Now the pool has the second worker only. because the first one has been pulled already to serve the first message.
  • a new message came to the dispatcher. dispatcher pulls from the pool the second worker to server the message.
  • now let's say the second worker finished handling and the first one still handling the job for any reason.
  • the second worker registers itself again to the pool.
  • the first worker finished handling and register itself again to the pool.
  • Now the pool has 2 workers and will do the same at step 4.

Implement the logic with an email service

Let's say in our application we want to send welcome emails to new users.


type Email struct {
    To      string `json:"to"`
    From    string `json:"from"`
    Subject string `json:"subject"`
    Content string `json:"content"`
}

func (e Email) Handle() error {
    r := rand.Intn(200)
    time.Sleep(time.Duration(r) * time.Millisecond)
    return nil
}

let's assume the handle method will call a third-party like SendGrid to send the email.

//EmailService ... email service
type EmailService struct {
    Queue chan queue.Queuable
}

//NewEmailService ... returns email service to send emails :D
func NewEmailService(q chan queue.Queuable) *EmailService {
    service := &EmailService{
        Queue: q,
    }

    return service
}

func (s EmailService) Send(e Email) {
    s.Queue <- e
}

The EmailService now has a queueable channel and Send method to send the email

Now our main.go file can be something like

var QueueDispatcher *Dispatcher

func main() {
    QueueDispatcher = NewDispatcher(4)
    QueueDispatcher.Run()

    mailService = emails.NewEmailService(JobQueue)

    r := gin.Default()
    r.GET("/email", sendEmailHandler)
    return r
}

func sendEmailHandler(c *gin.Context) {
    emailTo := c.Query("to")
    emailFrom := c.Query("from")
    emailSubject := c.Query("subject")
    emailContent := c.Query("content")

    email := emails.Email{
        To:      emailTo,
        From:    emailFrom,
        Subject: emailSubject,
        Content: emailContent,
    }

    mailService.Send(email)

    c.String(200, "Email will be sent soon :)")
}

In the next chapter (during next 2 days), we will use Grafana & Prometheus to monitor workers and processing time and get some beautiful metrics for the application.

at

08 Sep 2019

Want to be updated with Laravel and other fun related stuff I'm doing. Just Subscribe