This demo will guide you through the process of deploying a gRPC server that is capable of receiving streaming audio from a gRPC client and processing that audio with Amazon Transcribe and returning the result to the client.
Server Infrastructure
This demo will deploy a Fargate container that runs the gRPC server. This gRPC server will be available through an Application Load Balancer that is enabled with HTTPS through a Certificate generated through AWS Certificate Manager.
Certificate Manager
this.hostedZone = HostedZone.fromLookup(this, 'HostedZone', { domainName: props.domainName, }); this.certificate = new Certificate(this, 'Certificate', { domainName: `grpc.${props.domainName}`, validation: CertificateValidation.fromDns(this.hostedZone), });
Using a provided domain name, the CDK will lookup the associated HostedZone and create a Certificate that will be associated with the Application Load Balancer.
Application Load Balancer
const grpcServerTargetGroup = new ApplicationTargetGroup( this, 'gRPCServerTargetGroup', { vpc: props.vpc, port: 50051, protocol: ApplicationProtocol.HTTP, protocolVersion: ApplicationProtocolVersion.GRPC, targets: [this.fargateService], healthCheck: { healthyGrpcCodes: '12', }, }, ); this.applicationLoadBalancer.addListener('fargateListener', { port: 50051, protocol: ApplicationProtocol.HTTPS, certificates: [ListenerCertificate.fromCertificateManager(props.certificate)], open: true, defaultTargetGroups: [grpcServerTargetGroup], }); props.applicationLoadBalancerSecurityGroup.addIngressRule( Peer.anyIpv4(), Port.tcp(50051), );
This will create an Application Load Balancer Target Group that uses ApplicationProtocolVersion.GRPC
as the protocol version. Additionally, the healthCheck
will look for gRPC codes in the response from the gRPC server. The Listener associated with the Application Load Balancer uses HTTPS and the previously created Certificate. The Listener will use port 50051
and forward to port 50051
on the Fargate task. This port is also opened on the Application Load Balancer Security Group.
Route 53
new ARecord(this, 'grpcARecord', { zone: props.hostedZone, recordName: 'grpc', target: RecordTarget.fromAlias( new LoadBalancerTarget(this.applicationLoadBalancer), ), });
Finally, an A Record will be created that points to the Application Load Balancer alias use the record name grpc
. This will create a record grpc.example.com
that can be used with the Application Load Balancer.
gRPC Server
Server initialization
function main() { const server = new grpc.Server(); server.addService(audio_proto.AudioStreamer.service, { streamAudio }); server.bindAsync( '0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => { server.start(); console.log('gRPC server started on port 50051'); }, ); } main();
The main function of the server code, is used to create the server that is listening on port 50051
with HTTP. The Application Load Balancer will provide the TLS security so HTTP can be used here.
streamAudio Function
function streamAudio(call: grpc.ServerWritableStream<any, any>) { console.log('Streaming audio data received from client.'); const audioStream = new PassThrough(); startTranscription(audioStream, call).catch((error) => { console.error('Transcription error:', error); call.write({ message: error }); }); call.on('data', (data: any) => { audioStream.write(data.audioData); }); call.on('end', async () => { console.log('Streaming completed.'); audioStream.end(); }); audioStream.on('end', () => { console.log('Transcribing ended.'); }); }
Now, when a stream arrives, the streamAudio
function will begin processing the stream. In this example, it involves three major steps:
- Create
audioStream
PassThrough Stream - Start
startTranscription
function with theaudioStream
- Write data to the
audioStream
from the gRPC stream as it arrives
Transcribe the stream
const audioStream = async function* () { for await (const chunk of stream) { const buffer = Buffer.from(chunk); yield { AudioEvent: { AudioChunk: buffer } }; } }; const command = new StartStreamTranscriptionCommand({ LanguageCode: LanguageCode.EN_US, MediaEncoding: MediaEncoding.PCM, MediaSampleRateHertz: 16000, AudioStream: audioStream(), }); const response = await client.send(command);
This will send the stream to Transcribe for processing. Once we get a response back from Transcribe, we will write this back to the gRPC client via the stream.
call.write({ message: JSON.stringify( event.TranscriptEvent.Transcript.Results[0].Alternatives[0].Transcript, ), });
gRPC Client
async function main() { let target: string = 'localhost:50051'; let credentials = grpc.credentials.createInsecure(); if (TARGET) { target = `${TARGET}:50051`; credentials = grpc.credentials.createSsl(); } console.log(`Using target: ${target}`); if (!WAV_FILE) { throw new Error('WAV_FILE required'); } console.log(`Using source: ${WAV_FILE}`); const { audioStream } = await processFfmpeg(WAV_FILE); }
The main client function will determine the TARGET
and WAV_FILE
to be used. This client can be used both locally or with the deployed gRPC server. A WAV_FILE
is required and can be stored in the client\src
directory and will be copied over during the Docker build. If the WAV_FILE
is a stereo file with each channel a separate user, they will be muxed together when sent to the gRPC server. The result will be that the transcription is the combined result of both channels. If you want to get a single channel, you can modify the ffmpeg
to only pass one channel, or separate the WAV_FILE
before sending.
ffmpeg Processing
async function processFfmpeg(fileName: string): Promise<{ audioStream: PassThrough; }> { console.log('Processing with ffmpeg'); const audioStream = new PassThrough(); ffmpeg(fileName) .native() .output(audioStream) .format('wav') .audioCodec('pcm_s16le') .audioBitrate(8000) .on('error', (error: { message: string }) => { console.log('Cannot process: ' + error.message); }) .run(); return { audioStream }; }
This demo uses ffmpeg
to ensure the audio being streamed is sent at the correct rate and with the correct formatting.
Streaming
const client = new audio_proto.AudioStreamer(target, credentials); const chunkSize = 1024; const call = client.streamAudio(); audioStream.on('data', (chunk) => { let offset = 0; while (offset < chunk.length) { const end = Math.min(offset + chunkSize, chunk.length); const chunkToSend = chunk.slice(offset, end); call.write({ audioData: chunkToSend }); offset = end; } }); audioStream.on('end', () => { console.log('Audio stream ended.'); call.end(); }); call.on('data', (response: any) => { console.log(`${Date.now()}: ${response.message}`); }); call.on('end', () => { console.log('gRPC call ended.'); process.exit(0); }); call.on('error', (err: any) => { console.error('Error:', err); });
Once the audioStream
has been created through processFfmpeg
, when data arrives, it will be written to the gRPC stream in chunks through the client. Responses that are generated on the server (the transcriptions) will be surfaced through the call.on('data', () => {})
function.
In Action
In order to test this demo, you can either use it locally or through the deployed Fargate container.
Deploying
yarn launch
This will deploy the CDK stack to your account. You will need a HostedZone domain in order to use this. The A Record created will be written to the client\src\.env
file so that you can use the client with this gRPC server.
Testing
In order to test locally, you will need to start the gRPC server. In the src\resources\gRPCServer
directory, you can start the gRPC Server Docker container:
docker-compose run --build grpc-server
You will need to have AWS credentials exposed in this container in order to use Transcribe.
In the client\src
directory, you can start the gRPC Client Docker container:
docker-compose run --build -e TARGET= grpc-client
You can add the the WAV_FILE
environment variable in the command line as well
docker-compose run --build -e TARGET= -e WAV_FILE=EXAMPLE.wav grpc-client
This will use the localhost as the TARGET
. If you want to test with the hosted gRPC server and have already deployed it, you can use:
docker-compose run --build grpc-client