1use std::{
7 collections::{HashMap, hash_map::DefaultHasher},
8 hash::{Hash, Hasher},
9 sync::Arc,
10 time::Duration,
11};
12
13use lazy_static::lazy_static;
14use log::{debug, error, info};
15use parking_lot::Mutex;
16use serde_json::{Value, from_slice, to_vec};
17use tokio::time::timeout;
18use tonic::transport::Channel;
19
20use super::{
21 Error::VineError,
22 Generated::{GenericNotification, GenericRequest, cocoon_service_client::CocoonServiceClient},
23};
24
25type CocoonClient = CocoonServiceClient<Channel>;
26
27lazy_static! {
28 static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
29}
30
31pub async fn ConnectToSideCar(SideCarIdentifier:String, Address:String) -> Result<(), VineError> {
33 info!("[VineClient] Connecting to sidecar '{}' at '{}'...", SideCarIdentifier, Address);
34
35 let endpoint = format!("http://{}", Address);
36
37 let channel = Channel::from_shared(endpoint)?.connect().await?;
38
39 let client = CocoonServiceClient::new(channel);
40
41 SIDECAR_CLIENTS.lock().insert(SideCarIdentifier.clone(), client);
42
43 info!("[VineClient] Successfully connected to sidecar '{}'.", SideCarIdentifier);
44
45 Ok(())
46}
47
48pub async fn SendNotification(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
50 let mut client = {
51 let guard = SIDECAR_CLIENTS.lock();
52
53 guard.get(&SideCarIdentifier).cloned()
54 };
55
56 if let Some(ref mut client) = client {
57 let request = GenericNotification { method:Method, parameter:to_vec(&Parameters)? };
58
59 client.send_mountain_notification(request).await?;
60
61 Ok(())
62 } else {
63 Err(VineError::ClientNotConnected(SideCarIdentifier))
64 }
65}
66
67pub async fn SendRequest(
69 SideCarIdentifier:&str,
70
71 Method:String,
72
73 Parameters:Value,
74
75 TimeoutMilliseconds:u64,
76) -> Result<Value, VineError> {
77 debug!(
78 "[VineClient] Sending request '{}' to sidecar '{}'...",
79 Method, SideCarIdentifier
80 );
81
82 let mut client = {
83 let guard = SIDECAR_CLIENTS.lock();
84
85 guard.get(SideCarIdentifier).cloned()
86 };
87
88 if let Some(ref mut client) = client {
89 let mut hasher = DefaultHasher::new();
90
91 uuid::Uuid::new_v4().hash(&mut hasher);
92
93 let RequestIdentifier = hasher.finish();
94
95 let request = GenericRequest {
96 request_identifier:RequestIdentifier,
97
98 method:Method.clone(),
99
100 parameter:to_vec(&Parameters)?,
101 };
102
103 let future = client.process_mountain_request(request);
104
105 match timeout(Duration::from_millis(TimeoutMilliseconds), future).await {
106 Ok(Ok(response)) => {
107 let response_data = response.into_inner();
108
109 if let Some(rpc_error) = response_data.error {
110 error!(
111 "[VineClient] Received RPC error from sidecar '{}': {}",
112 SideCarIdentifier, rpc_error.message
113 );
114
115 Err(VineError::RPCError(rpc_error.message))
116 } else {
117 let deserialized_value = from_slice(&response_data.result)?;
118
119 Ok(deserialized_value)
120 }
121 },
122
123 Ok(Err(status)) => {
124 error!(
125 "[VineClient] gRPC status error from sidecar '{}': {}",
126 SideCarIdentifier, status
127 );
128
129 Err(VineError::from(status))
130 },
131
132 Err(_) => {
133 error!("[VineClient] Request to sidecar '{}' timed out.", SideCarIdentifier);
134
135 Err(VineError::RequestTimeout {
136 SideCarIdentifier:SideCarIdentifier.to_string(),
137
138 MethodName:Method,
139
140 TimeoutMilliseconds,
141 })
142 },
143 }
144 } else {
145 Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()))
146 }
147}