In some cases, you might need to provide large portions of data for the consumer. Combine the ability to write a custom Golang GRPC service, Jobs, and Broadcast to stream data from PHP application.
Attention, this article an example implementation. Make sure to implement proper backoff strategy and timeout management before going to production. Make sure to read other GRPC articles before this section.
We can define the service as a singular endpoint with a streaming response. The client will connect the streaming
service and must stop consuming after the null message received. The consuming will initiate based on the provided id
.
syntax = "proto3";
package stream;
message Request {
string id = 1;
}
message Data {
int32 sequence = 1;
bytes data = 2;
}
service Streamer {
rpc Stream (Request) returns (stream Data) {
}
}
Create direction stream
and generate client and server SDK for Golang using:
$ mkdir stream
$ protoc -I proto/ proto/stream.proto --go_out=plugins=grpc:strea
The client/consumer application will be displaying all streamed content directly into stdout
. You can create it in a
separate directory. Copy the stream
directory and app.crt
to your client application.
$ go mod init client
The application will look as follows:
package main
import (
stream "client/stream"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
)
func main() {
creds, err := credentials.NewClientTLSFromFile("app.crt", "")
if err != nil {
panic(err)
}
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))
if err != nil {
panic(err)
}
defer conn.Close()
str, err := stream.NewStreamerClient(conn).Stream(context.Background(), &stream.Request{Id: "request-id"})
if err != nil {
panic(err)
}
for {
data, err := str.Recv()
if err != nil {
panic(err)
}
if data.Sequence == 0 {
log.Println("Stream is over")
return
}
log.Printf("Sequence[%v]: %s\n", data.Sequence, string(data.Data))
}
}
The producer application contains Golang and PHP parts. The Golang will route message to background PHP process using spiral/jobs package and later read the produced response using unique broadcast topic.
The service will look as following:
package stream
import (
"encoding/json"
"github.com/spiral/broadcast"
"github.com/spiral/jobs/v2"
grpc "github.com/spiral/php-grpc"
grpc2 "google.golang.org/grpc"
)
const ID = "stream"
type Message struct {
SequenceID int `json:"sequenceID"`
Data string `json:"data"`
}
type Service struct {
queue *jobs.Service
pubsub *broadcast.Service
}
func (s *Service) Init(
g *grpc.Service,
q *jobs.Service,
p *broadcast.Service,
) (bool, error) {
s.queue = q
s.pubsub = p
return true, g.AddService(func(server *grpc2.Server) {
RegisterStreamerServer(server, s)
})
}
func (s *Service) Stream(r *Request, srv Streamer_StreamServer) error {
client := s.pubsub.NewClient()
defer client.Close()
// use request id as unique topic name
if err := client.Subscribe(r.Id); err != nil {
return err
}
// start the background producer
_, err := s.queue.Push(&jobs.Job{
Job: "app.job.Produce",
Payload: `{"requestID":"` + r.Id + `"}`,
Options: &jobs.Options{},
})
if err != nil {
return err
}
// forward data from topic to stream
for msgData := range client.Channel() {
msg := &Message{}
if err := json.Unmarshal(msgData.Payload, msg); err != nil {
return err
}
if err := srv.Send(&Data{
Sequence: int32(msg.SequenceID),
Data: []byte(msg.Data),
}); err != nil {
return err
}
}
return nil
}
Make sure to register service in main.go
:
rr.Container.Register(stream.ID, &stream.Service{})
The application will require jobs
and broadcast
services enabled (in both .rr
and application bootloaders).
You do not need any GRPC workers.
grpc:
listen: tcp://0.0.0.0:50051
tls.key: "app.key"
tls.cert: "app.crt"
jobs:
dispatch:
app-job-*.pipeline: "local"
pipelines:
local:
broker: "ephemeral"
consume: ["local"]
workers:
command: "php app.php"
pool.numWorkers: 2
The PHP Job will be located in app/src/Job/Produce.php
:
<?php
declare(strict_types=1);
namespace App\Job;
use Spiral\Broadcast\BroadcastInterface;
use Spiral\Broadcast\Message;
use Spiral\Jobs\JobHandler;
class Produce extends JobHandler
{
public function invoke(string $requestID, BroadcastInterface $broadcast)
{
dumprr("Streaming for: {$requestID}");
for ($i = 0; $i < 100; $i++) {
usleep(100000);
$broadcast->publish(new Message($requestID, ['sequenceID' => $i + 1, 'data' => "DATA $i"]));
}
// the stream is over
$broadcast->publish(new Message($requestID, ['sequenceID' => 0, 'data' => null]));
}
}
You can split the streaming into multiple smaller jobs.
Run the server and then the client to test the streaming.