Revision: Sun, 01 Oct 2023 21:52:08 GMT
v2.8 – outdated
This version of the documentation is outdated. Consider upgrading your project to Spiral Framework 3.8
Edit this page

GRPC - Streaming

In some cases, you might need to provide large portions of data for the consumer. Combine the ability to write a custom Golang GRPC service, Jobs, and Broadcast to stream data from PHP application.

Attention, this article an example implementation. Make sure to implement proper backoff strategy and timeout management before going to production. Make sure to read other GRPC articles before this section.

Service Definition

We can define the service as a singular endpoint with a streaming response. The client will connect the streaming service and must stop consuming after the null message received. The consuming will initiate based on the provided id.

syntax = "proto3";

package stream;

message Request {
    string id = 1;

message Data {
    int32 sequence = 1;
    bytes data = 2;

service Streamer {
    rpc Stream (Request) returns (stream Data) {

Create direction stream and generate client and server SDK for Golang using:

$ mkdir stream
$ protoc -I proto/ proto/stream.proto --go_out=plugins=grpc:strea


The client/consumer application will be displaying all streamed content directly into stdout. You can create it in a separate directory. Copy the stream directory and app.crt to your client application.

$ go mod init client

The application will look as follows:

package main

import (
	stream "client/stream"

func main() {
	creds, err := credentials.NewClientTLSFromFile("app.crt", "")
	if err != nil {

	conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))
	if err != nil {
	defer conn.Close()

	str, err := stream.NewStreamerClient(conn).Stream(context.Background(), &stream.Request{Id: "request-id"})
	if err != nil {

	for {
		data, err := str.Recv()
		if err != nil {

		if data.Sequence == 0 {
			log.Println("Stream is over")

		log.Printf("Sequence[%v]: %s\n", data.Sequence, string(data.Data))


The producer application contains Golang and PHP parts. The Golang will route message to background PHP process using spiral/jobs package and later read the produced response using unique broadcast topic.

The service will look as following:

package stream

import (
	grpc ""
	grpc2 ""

const ID = "stream"

type Message struct {
	SequenceID int    `json:"sequenceID"`
	Data       string `json:"data"`

type Service struct {
	queue  *jobs.Service
	pubsub *broadcast.Service

func (s *Service) Init(
	g *grpc.Service,
	q *jobs.Service,
	p *broadcast.Service,
) (bool, error) {
	s.queue = q
	s.pubsub = p

	return true, g.AddService(func(server *grpc2.Server) {
		RegisterStreamerServer(server, s)

func (s *Service) Stream(r *Request, srv Streamer_StreamServer) error {
	client := s.pubsub.NewClient()
	defer client.Close()

	// use request id as unique topic name
	if err := client.Subscribe(r.Id); err != nil {
		return err

	// start the background producer
	_, err := s.queue.Push(&jobs.Job{
		Job:     "app.job.Produce",
		Payload: `{"requestID":"` + r.Id + `"}`,
		Options: &jobs.Options{},
	if err != nil {
		return err

	// forward data from topic to stream
	for msgData := range client.Channel() {
		msg := &Message{}
		if err := json.Unmarshal(msgData.Payload, msg); err != nil {
			return err

		if err := srv.Send(&Data{
			Sequence: int32(msg.SequenceID),
			Data:     []byte(msg.Data),
		}); err != nil {
			return err

	return nil

Make sure to register service in main.go:

rr.Container.Register(stream.ID, &stream.Service{})

The application will require jobs and broadcast services enabled (in both .rr and application bootloaders). You do not need any GRPC workers.

  listen: tcp://
  tls.key:  "app.key"
  tls.cert: "app.crt"

    app-job-*.pipeline: "local"
      broker: "ephemeral"
  consume: ["local"]
    command: "php app.php"
    pool.numWorkers: 2


The PHP Job will be located in app/src/Job/Produce.php:



namespace App\Job;

use Spiral\Broadcast\BroadcastInterface;
use Spiral\Broadcast\Message;
use Spiral\Jobs\JobHandler;

class Produce extends JobHandler
    public function invoke(string $requestID, BroadcastInterface $broadcast)
        dumprr("Streaming for: {$requestID}");

        for ($i = 0; $i < 100; $i++) {
            $broadcast->publish(new Message($requestID, ['sequenceID' => $i + 1, 'data' => "DATA $i"]));

        // the stream is over
        $broadcast->publish(new Message($requestID, ['sequenceID' => 0, 'data' => null]));

You can split the streaming into multiple smaller jobs.

Run the server and then the client to test the streaming.