Grove/Transport/
gRPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12use tracing::{debug, info, instrument};
13
14use crate::Transport::{
15 Strategy::{TransportStats, TransportStrategy, TransportType},
16 TransportConfig,
17};
18
19#[derive(Clone, Debug)]
21pub struct gRPCTransport {
22 Endpoint:String,
24 Channel:Arc<RwLock<Option<Channel>>>,
26 Configuration:TransportConfig,
28 Connected:Arc<RwLock<bool>>,
30 Statistics:Arc<RwLock<TransportStats>>,
32}
33
34impl gRPCTransport {
35 pub fn New(Address:&str) -> anyhow::Result<Self> {
37 Ok(Self {
38 Endpoint:Address.to_string(),
39 Channel:Arc::new(RwLock::new(None)),
40 Configuration:TransportConfig::default(),
41 Connected:Arc::new(RwLock::new(false)),
42 Statistics:Arc::new(RwLock::new(TransportStats::default())),
43 })
44 }
45
46 pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
48 Ok(Self {
49 Endpoint:Address.to_string(),
50 Channel:Arc::new(RwLock::new(None)),
51 Configuration,
52 Connected:Arc::new(RwLock::new(false)),
53 Statistics:Arc::new(RwLock::new(TransportStats::default())),
54 })
55 }
56
57 pub fn Address(&self) -> &str { &self.Endpoint }
59
60 pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
62 self.Channel
63 .read()
64 .await
65 .as_ref()
66 .cloned()
67 .ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
68 }
69
70 pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
72
73 fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
75 let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
76 .timeout(self.Configuration.ConnectionTimeout)
77 .connect_timeout(self.Configuration.ConnectionTimeout)
78 .tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
79 Ok(EndpointValue)
80 }
81}
82
83#[async_trait]
84impl TransportStrategy for gRPCTransport {
85 type Error = gRPCTransportError;
86
87 #[instrument(skip(self))]
88 async fn connect(&self) -> Result<(), Self::Error> {
89 info!("Connecting to gRPC endpoint: {}", self.Endpoint);
90
91 let EndpointValue = self
92 .BuildEndpoint()
93 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
94
95 let ChannelValue = EndpointValue
96 .connect()
97 .await
98 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
99
100 *self.Channel.write().await = Some(ChannelValue);
101 *self.Connected.write().await = true;
102
103 info!("gRPC connection established: {}", self.Endpoint);
104 Ok(())
105 }
106
107 #[instrument(skip(self, request))]
108 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
109 let Start = std::time::Instant::now();
110
111 if !self.is_connected() {
112 return Err(gRPCTransportError::NotConnected);
113 }
114
115 debug!("Sending gRPC request ({} bytes)", request.len());
116
117 let Response:Vec<u8> = vec![];
118 let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
119
120 let mut Stats = self.Statistics.write().await;
121 Stats.record_sent(request.len() as u64, LatencyMicroseconds);
122 Stats.record_received(Response.len() as u64);
123
124 debug!("gRPC request completed in {}µs", LatencyMicroseconds);
125 Ok(Response)
126 }
127
128 #[instrument(skip(self, data))]
129 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
130 if !self.is_connected() {
131 return Err(gRPCTransportError::NotConnected);
132 }
133
134 debug!("Sending gRPC notification ({} bytes)", data.len());
135
136 let mut Stats = self.Statistics.write().await;
137 Stats.record_sent(data.len() as u64, 0);
138 Ok(())
139 }
140
141 #[instrument(skip(self))]
142 async fn close(&self) -> Result<(), Self::Error> {
143 info!("Closing gRPC connection: {}", self.Endpoint);
144 *self.Channel.write().await = None;
145 *self.Connected.write().await = false;
146 info!("gRPC connection closed: {}", self.Endpoint);
147 Ok(())
148 }
149
150 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
151
152 fn transport_type(&self) -> TransportType { TransportType::gRPC }
153}
154
155#[derive(Debug, thiserror::Error)]
157pub enum gRPCTransportError {
158 #[error("Connection failed: {0}")]
160 ConnectionFailed(String),
161 #[error("Send failed: {0}")]
163 SendFailed(String),
164 #[error("Receive failed: {0}")]
166 ReceiveFailed(String),
167 #[error("Not connected")]
169 NotConnected,
170 #[error("Timeout")]
172 Timeout,
173 #[error("gRPC error: {0}")]
175 Error(String),
176}
177
178impl From<tonic::transport::Error> for gRPCTransportError {
179 fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
180}
181
182impl From<tonic::Status> for gRPCTransportError {
183 fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn TestgRPCTransportCreation() {
192 let Result = gRPCTransport::New("127.0.0.1:50050");
193 assert!(Result.is_ok());
194 let Transport = Result.unwrap();
195 assert_eq!(Transport.Address(), "127.0.0.1:50050");
196 }
197
198 #[tokio::test]
199 async fn TestgRPCTransportNotConnected() {
200 let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
201 assert!(!Transport.is_connected());
202 }
203}