Notes on Buffered Channels

  • They are channels with capacity to hold one or more values
    buffChan := make(chan string, 10)

  • They don't require both the sender and receiver to be available at the same instant

  • They have different blocking rules

    • A receive will block only if there is no value on the channel to receive
    • A send will block only if there is no available buffer to place the value being sent.

Here is an example.

import (
    "fmt"
    "math/rand"
    "os"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

func init() {
    // allocate 1 logical processor for the scheduler to use
    runtime.GOMAXPROCS(1)
    // allocate logical processors per cpu
    //runtime.GOMAXPROCS(runtime.NumCPU())
    rand.Seed(time.Now().Unix())
}

var counter int64
var wg sync.WaitGroup
var shutdown int64
var mtx sync.Mutex

const (
    NumOfWorkers = 4
    AmountOfWork = 10
)
func main() {
    wg.Add(NumOfWorkers)
    backLog := make(chan string, AmountOfWork)
    for worker := 0; worker < NumOfWorkers; worker++ {
        go work(worker+1, backLog)
    }
    // add work to the backlog
    for work := 0; work < AmountOfWork; work++ {
        backLog <- fmt.Sprintf("\"Work Item %d\"", work+1)
    }
    // close the channel
    close(backLog)
    wg.Wait()
    fmt.Printf("Completed Work")
}
func work(workerId int, backLog chan string) {
    defer wg.Done()
    for {
        task, ok := <-backLog
        // channel is closed and no more work
        if !ok {
            fmt.Printf("No more work\n")
            break
        }
        fmt.Printf("Worker %d started work on item %s\n", workerId, task)
        //simulate work
        time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
        fmt.Printf("Worker %d finished work on item %s\n", workerId, task)
    }
}
0