| Crates.io | agntcy-protoc-srpc-plugin |
| lib.rs | agntcy-protoc-srpc-plugin |
| version | 0.1.0 |
| created_at | 2025-09-09 17:15:27.366434+00 |
| updated_at | 2025-09-09 17:15:27.366434+00 |
| description | A protoc plugin for generating SRPC (Slim RPC) code |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1831281 |
| size | 27,809 |
The SRPC Compiler (protoc-srpc-plugin) is a protoc plugin that generates Python client stubs and server servicers for SRPC (Slim RPC) from Protocol Buffer service definitions. This plugin enables you to build high-performance RPC services using the SRPC framework.
protoc and buf build systemscargo install agntcy-protoc-srpc-plugin
This will install the protoc-srpc-plugin binary to your Cargo bin directory (usually ~/.cargo/bin).
git clone https://github.com/agntcy/slim.git
cd slim/data-plane/srpc-compiler
cargo build --release
data-plane/target/release/protoc-srpc-pluginCreate a file called example.proto:
syntax = "proto3";
package example_service;
service Test {
rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse);
rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse);
rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse);
rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse);
}
message ExampleRequest {
string example_string = 1;
int64 example_integer = 2;
}
message ExampleResponse {
string example_string = 1;
int64 example_integer = 2;
}
Make sure you have:
protoc (Protocol Buffer compiler) installedprotoc-srpc-plugin binary in your PATH or specify its full path# Generate both the protobuf Python files and SRPC files
protoc \
--python_out=. \
--pyi_out=. \
--plugin=~/.cargo/bin/protoc-srpc-plugin \
--srpc_out=. \
example.proto
This will generate:
example_pb2.py - Standard protobuf Python bindingsexample_pb2_srpc.py - SRPC client stubs and server servicersYou can specify a custom import for the types module. This allows to import the types from an external package.
For instance, if you don't want to generate the types and you want to import them from a2a.grpc.a2a_pb2`,, you can do:
protoc \
--plugin=~/.cargo/bin/protoc-srpc-plugin \
--srpc_out=types_import="from a2a.grpc import a2a_pb2 as a2a__pb2":. \
example.proto
buf CLI installedprotoc-srpc-plugin binary in your PATHCreate a buf.gen.yaml file in your project root:
version: v2
managed:
enabled: true
inputs:
- proto_file: example.proto
plugins:
- local: /path/to/protoc-srpc-plugin
out: .
- remote: buf.build/protocolbuffers/python
out: .
buf generate
Or generate from a specific file:
buf generate --path example.proto
For more complex setups with custom options:
version: v2
managed:
enabled: true
plugins:
- local: protoc-srpc-plugin
out: generated
opt:
- types_import=from .pb2_types import example_pb2 as pb2
strategy: all
- remote: buf.build/protocolbuffers/python
out: generated
strategy: all
For the example above, the generated example_pb2_srpc.py will contain:
class TestStub:
"""Client stub for Test."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A srpc.Channel.
"""
self.ExampleUnaryUnary = channel.unary_unary(...)
self.ExampleUnaryStream = channel.unary_stream(...)
# ... other methods
class TestServicer():
"""Server servicer for Test. Implement this class to provide your service logic."""
def ExampleUnaryUnary(self, request, context):
"""Method for ExampleUnaryUnary. Implement your service logic here."""
raise srpc_rpc.SRPCResponseError(
code=code__pb2.UNIMPLEMENTED, message="Method not implemented!"
)
# ... other methods
def add_TestServicer_to_server(servicer, server: srpc.Server):
# Registers the servicer with the SRPC server
pass
The plugin supports the following parameters:
types_import: Customize how protobuf types are imported
types_import="from my_package import types_pb2 as pb2"import asyncio
import logging
from collections.abc import AsyncGenerator
import srpc
from srpc.examples.simple.types.example_pb2 import ExampleRequest
from srpc.examples.simple.types.example_pb2_srpc import TestStub
logger = logging.getLogger(__name__)
async def amain() -> None:
channel = srpc.Channel(
local="agntcy/grpc/client",
slim={
"endpoint": "http://localhost:46357",
"tls": {
"insecure": True,
},
},
enable_opentelemetry=False,
shared_secret="my_shared_secret",
remote="agntcy/grpc/server",
)
# Stubs
stubs = TestStub(channel)
# Call method
try:
request = ExampleRequest(example_integer=1, example_string="hello")
response = await stubs.ExampleUnaryUnary(request, timeout=2)
logger.info(f"Response: {response}")
responses = stubs.ExampleUnaryStream(request, timeout=2)
async for resp in responses:
logger.info(f"Stream Response: {resp}")
async def stream_requests() -> AsyncGenerator[ExampleRequest]:
for i in range(10):
yield ExampleRequest(example_integer=i, example_string=f"Request {i}")
response = await stubs.ExampleStreamUnary(stream_requests(), timeout=2)
logger.info(f"Stream Unary Response: {response}")
except asyncio.TimeoutError:
logger.error("timeout while waiting for response")
import asyncio
import logging
from collections.abc import AsyncIterable
from srpc.context import Context
from srpc.examples.simple.types.example_pb2 import ExampleRequest, ExampleResponse
from srpc.examples.simple.types.example_pb2_srpc import (
TestServicer,
add_TestServicer_to_server,
)
from srpc.server import Server
logger = logging.getLogger(__name__)
class TestService(TestServicer):
async def ExampleUnaryUnary(
self, request: ExampleRequest, context: Context
) -> ExampleResponse:
logger.info(f"Received unary-unary request: {request}")
return ExampleResponse(example_integer=1, example_string="Hello, World!")
async def ExampleUnaryStream(
self, request: ExampleRequest, context: Context
) -> AsyncIterable[ExampleResponse]:
logger.info(f"Received unary-stream request: {request}")
# generate async responses stream
for i in range(5):
logger.info(f"Sending response {i}")
yield ExampleResponse(example_integer=i, example_string=f"Response {i}")
async def ExampleStreamUnary(
self, request_iterator: AsyncIterable[ExampleRequest], context: Context
) -> ExampleResponse:
logger.info(f"Received stream-unary request: {request_iterator}")
async for request in request_iterator:
logger.info(f"Received stream-unary request: {request}")
response = ExampleResponse(
example_integer=1, example_string="Stream Unary Response"
)
return response
async def ExampleStreamStream(
self, request_iterator: AsyncIterable[ExampleRequest], context: Context
) -> AsyncIterable[ExampleResponse]:
"""Missing associated documentation comment in .proto file."""
raise NotImplementedError("Method not implemented!")
def create_server(
local: str,
slim: dict,
enable_opentelemetry: bool = False,
shared_secret: str = "",
) -> Server:
"""
Create a new SRPC server instance.
"""
server = Server(
local=local,
slim=slim,
enable_opentelemetry=enable_opentelemetry,
shared_secret=shared_secret,
)
return server
async def amain() -> None:
server = create_server(
local="agntcy/grpc/server",
slim={
"endpoint": "http://localhost:46357",
"tls": {
"insecure": True,
},
},
enable_opentelemetry=False,
shared_secret="my_shared_secret",
)
# Create RPCs
add_TestServicer_to_server(
TestService(),
server,
)
await server.run()
If you get an error that the plugin is not found:
protoc-srpc-plugin is in your PATH--plugin=protoc-gen-srpc=/full/path/to/protoc-srpc-pluginIf you encounter Python import errors:
*_pb2.py files are in your Python pathtypes_import parameter to customize import pathsIf the plugin fails to build:
cargo clean && cargo build --releasePlease see the main repository's contributing guidelines at CONTRIBUTING.md.
This project is licensed under the Apache 2.0 License - see the LICENSE.md file for details.