Skip to content

Instantly share code, notes, and snippets.

@amlwwalker
Last active March 2, 2024 23:31
Show Gist options
  • Save amlwwalker/cb8d07d50e47239b6189e34927f356c5 to your computer and use it in GitHub Desktop.
Save amlwwalker/cb8d07d50e47239b6189e34927f356c5 to your computer and use it in GitHub Desktop.
Golang VAD and Google Speech To Text
package main
import (
"bytes"
"context"
"flag"
"io"
"path/filepath"
"fmt"
"log"
"os"
speech "cloud.google.com/go/speech/apiv1"
speechpb "google.golang.org/genproto/googleapis/cloud/speech/v1"
)
type audioManager struct {
client speech.Client
}
func main() {
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "/path/to/service-account.json")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s <AUDIOFILE>\n", filepath.Base(os.Args[0]))
fmt.Fprintf(os.Stderr, "<AUDIOFILE> must be a path to a local audio file. Audio file must be a 16-bit signed little-endian encoded with a sample rate of 16000.\n")
}
flag.Parse()
if len(flag.Args()) != 1 {
log.Fatal("Please pass path to your local audio file as a command line argument")
}
fmt.Println("file is ", flag.Arg(0))
audioFile := flag.Arg(0)
ctx := context.Background()
client, err := speech.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
stream, err := client.StreamingRecognize(ctx)
if err != nil {
log.Fatal(err)
}
// Send the initial configuration message.
if err := stream.Send(&speechpb.StreamingRecognizeRequest{
StreamingRequest: &speechpb.StreamingRecognizeRequest_StreamingConfig{
StreamingConfig: &speechpb.StreamingRecognitionConfig{
Config: &speechpb.RecognitionConfig{
Encoding: speechpb.RecognitionConfig_LINEAR16,
SampleRateHertz: 8000,
LanguageCode: "en-GB",
AudioChannelCount: 2,
},
//SingleUtterance: true,
},
},
}); err != nil {
log.Fatal(err)
}
f, err := os.Open(audioFile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
//now we need a channel for the silence detection engine
var rawDataChanel = make(chan []byte)
var vadControlChannel = make(chan DURATION)
// e.g 8kHz
// XkHz => (X * 10) * 2
const mode = 3
const sampleRateHertz = 8000
const sample10msSize = (sampleRateHertz / 1000) * 10 * 2
vadProcessor, err := NewVAD(sampleRateHertz, sample10msSize, mode)
if err != nil {
fmt.Println("could not start VAD ", err)
} else {
fmt.Printf("vadProcessor: %+v\r\n", vadProcessor)
}
//set the vad up for checking for silence
go vadProcessor.Worker(rawDataChanel, vadControlChannel)
//
////on another channel deal with the resulting silence
go func(incomingSilence chan DURATION) {
for {
select {
case s := <-incomingSilence:
fmt.Println("incoming VAD control message ", s.String())
if s == PAUSE {
//we have actuve data
fmt.Println("Pause experienced - processing active audio")
d := vadProcessor.ActiveAudio.Bytes()
vadProcessor.ActiveAudio.Reset()
sendStream(&stream, d)
}
}
}
}(vadControlChannel)
go func() {
//this is over the top but fine while reading a file in
buf := make([]byte, 1024)
for {
n, err := f.Read(buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
// Nothing else to pipe, close the stream.
fmt.Println("closing stream due to ", err)
if err := stream.CloseSend(); err != nil {
log.Fatalf("Could not close stream: %v", err)
}
return
}
if err != nil {
log.Printf("Could not read from %v", err)
continue
}
//fmt.Println("length of n ", n)
if n == 0 {
fmt.Println("WARNING - NO BYTES LEFT!")
continue
}
//now send to stt
vadProcessor.PreProcess(rawDataChanel, buf[:n])
}
}()
//for{}
//
////wait indefinitely for something to translate
for {
resp, err := stream.Recv()
fmt.Println("receiving ", resp)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Cannot stream results: %v", err)
}
if err := resp.Error; err != nil {
log.Fatalf("Could not recognize: %v", err)
}
for _, result := range resp.Results {
fmt.Printf("Result: %+v\n", result)
}
}
}
//we want the buffer to be written to continuously, in a similar way to the file was reading the bytes continuously
//
func streamCurrentData(stream speechpb.Speech_StreamingRecognizeClient, activeAudio []byte) {
buf := make([]byte, 1024)
r := bytes.NewBuffer(activeAudio)
for {
n, err := io.ReadFull(r, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
// Nothing else to pipe, close the stream.
fmt.Println("closing stream due to ", err)
if err := stream.CloseSend(); err != nil {
log.Fatalf("Could not close stream: %v", err)
}
return
}
if err != nil {
log.Printf("Could not read from %v", err)
continue
}
//fmt.Println("length of n ", n)
if n == 0 {
fmt.Println("WARNING - NO BYTES LEFT!")
continue
}
//ok so the new plan is that between silences we send the data for transcribing
//so at this point we have a buffer of data, we want to know if it includes any silence
if n > 0 {
if err := stream.Send(&speechpb.StreamingRecognizeRequest{
StreamingRequest: &speechpb.StreamingRecognizeRequest_AudioContent{
AudioContent: buf[:n],
},
}); err != nil {
log.Printf("Could not send audio: %v", err)
}
}
}
}
func sendStream(stream *speechpb.Speech_StreamingRecognizeClient, data []byte) {
fmt.Println("prcessing data of length ", len(data))
//ok so the new plan is that between silences we send the data for transcribing
//so at this point we have a buffer of data, we want to know if it includes any silence
frame := make([]byte, 1024)
r := bytes.NewBuffer(data)
//var s = make([]byte, 160)
//copy(s, someBytes) // n == 3, s == []int{0, 1, 2}
//fmt.Printf("s ", s)
for {
n, err := io.ReadFull(r, frame)
if err == io.EOF || err == io.ErrUnexpectedEOF {
//fmt.Println("detected EOF", err)
break
}
if err != nil {
log.Fatal("error here ", err)
break
}
if n > 0 {
if err := (*stream).Send(&speechpb.StreamingRecognizeRequest{
StreamingRequest: &speechpb.StreamingRecognizeRequest_AudioContent{
AudioContent: frame[:n],
},
}); err != nil {
log.Printf("Could not send audio: %v", err)
}
}
}
}
package main
import (
"bytes"
"errors"
"fmt"
"github.com/maxhawkins/go-webrtcvad"
"io"
"log"
)
// DURATION is a sweetener so that it is obvious what this control message is in response to
type DURATION int
// ControlMessage is a VAD type control message
type ControlMessage struct {
Code DURATION
data string
}
// The enumeratable types of DURATION
const (
SILENCE DURATION = iota
PAUSE
)
// The human readable output for a control message
var ops = map[DURATION]ControlMessage{
SILENCE: ControlMessage{SILENCE, "Silence"},
PAUSE: ControlMessage{PAUSE, "Pause"},
}
// String will extract the human readable type for a control message
func (e DURATION) String() string {
if op, found := ops[e]; found {
return op.data
}
return "???"
}
// VadProcessor is responsible for configuring a VAD for a conversation
type VadProcessor struct {
*webrtcvad.VAD
ActiveAudio *bytes.Buffer
framesize int
sampleRateHertz int
sample10msSize int
silenceTimeout int //a silence is the smallest unit of nothing detected
pauseTimeout int //a pause is a unit of nothing detected, made up of silences
currentSilence int
currentPause int
}
func (v *VadProcessor) PreProcess(rawAudio chan []byte, someBytes []byte) {
frame := make([]byte, v.framesize)
r := bytes.NewBuffer(someBytes)
for {
n, err := io.ReadFull(r, frame)
if err == io.EOF || err == io.ErrUnexpectedEOF {
//fmt.Println("detected EOF", err)
break
}
if err != nil {
log.Fatal("error here ", err)
break
}
if n > 0 {
rawAudio <- frame
} else {
break
}
}
}
func NewVAD(sampleRateHertz, sample10msSize, mode int) (VadProcessor, error) {
tmpVadProcessor, err := webrtcvad.New()
if err != nil {
return VadProcessor{}, err
}
var vProcessor VadProcessor
vProcessor.VAD = tmpVadProcessor
// 8kHz
// XkHz => (X * 10) * 2
//const sample10msSize = 160
vProcessor.framesize = 160
vProcessor.sample10msSize = sample10msSize
vProcessor.sampleRateHertz = sampleRateHertz
vProcessor.silenceTimeout = 500 //definition of a silence
vProcessor.pauseTimeout = 3000 //length of silence before moving on
vProcessor.currentSilence = 0 //length of current silence
vProcessor.currentPause = 0 //ms until the next question
vProcessor.ActiveAudio = new(bytes.Buffer)
// set its aggressiveness mode, which is an integer between 0 and 3
if err := vProcessor.VAD.SetMode(mode); err != nil { //agression cuts out background noise...
return VadProcessor{}, err
}
if ok := vProcessor.VAD.ValidRateAndFrameLength(sampleRateHertz, sample10msSize); !ok {
return VadProcessor{}, errors.New("invalid rate or frame length")
}
return vProcessor, nil
}
// Worker is responsible for the Voice Activity Detection, taking an input byte array and
// returning an integer representing the duration of the silence.
func (v *VadProcessor) Worker(rawAudio chan []byte, vadControl chan DURATION) {
fmt.Println("VAD working....")
//var voiceSize int //seems to just track the total speaking time since last silence, leave for now.
// run init to not get index not found
v.restartSilence()
for {
select {
//we need a context to cancel if necesssary
//case <-manager.Canceled():
// return
case frame := <-rawAudio:
frameMs := len(frame)
if frameMs != v.sample10msSize {
log.Printf("vad: invalid frame size (%v) (%v) \n", frameMs, v.sample10msSize)
continue
}
frameActive, err := v.VAD.Process(v.sampleRateHertz, frame)
if err != nil {
log.Printf("vad Process error: (%v) \n", err)
continue
}
if frameActive {
v.ActiveAudio.Write(frame)
}
v.saveResults(frameActive, frameMs)
v.sendAndResetSilence(frameActive, vadControl)
}
}
}
func (v *VadProcessor) saveResults(frameActive bool, frameMs int) {
// save size of silence and voice
if frameActive {
//restart silenceSize
v.restartSilence()
} else {
// there is no voice in this sample
v.saveSilenceResult(frameMs)
}
}
func (v *VadProcessor) sendAndResetSilence(frameActive bool, vadControl chan DURATION) {
//its just a check to see if the state has changed (i think) - not foolproof
if frameActive {
//vadControl <- 0
return
}
if v.currentSilence == v.silenceTimeout*16 {
v.currentSilence = 0
vadControl <- SILENCE
}
if v.currentPause == v.pauseTimeout*16 {
v.currentPause = 0
vadControl <- PAUSE
}
}
func (v *VadProcessor) saveSilenceResult(frameMs int) {
v.currentSilence += frameMs
v.currentPause += frameMs
}
func (v *VadProcessor) restartSilence() {
v.currentSilence = 0
v.currentPause = 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment