fix (backend-rs): Redis streaming
This commit is contained in:
parent
f13df7e202
commit
2b0668dacd
|
@ -16,5 +16,5 @@ pub fn add_note_to_antenna(antenna_id: &str, note: ¬e::Model) -> Result<(), E
|
||||||
let stream = Stream::Antenna {
|
let stream = Stream::Antenna {
|
||||||
id: antenna_id.to_string(),
|
id: antenna_id.to_string(),
|
||||||
};
|
};
|
||||||
publish(&stream, Some("note"), Some(serde_json::to_value(note)?))
|
publish(&stream, Some("note"), Some(serde_json::to_string(note)?))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::config::server::CONFIG;
|
||||||
use crate::database::redis_conn;
|
use crate::database::redis_conn;
|
||||||
use redis::{Commands, RedisError};
|
use redis::{Commands, RedisError};
|
||||||
|
|
||||||
|
@ -35,33 +36,33 @@ pub enum Stream {
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("Redis error: {0}")]
|
#[error("Redis error: {0}")]
|
||||||
RedisError(#[from] RedisError),
|
RedisError(#[from] RedisError),
|
||||||
#[error("Json serialization error: {0}")]
|
#[error("Json (de)serialization error: {0}")]
|
||||||
JsonError(#[from] serde_json::Error),
|
JsonError(#[from] serde_json::Error),
|
||||||
#[error("Value error: {0}")]
|
#[error("Value error: {0}")]
|
||||||
ValueError(String),
|
ValueError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn publish(
|
pub fn publish(channel: &Stream, kind: Option<&str>, value: Option<String>) -> Result<(), Error> {
|
||||||
channel: &Stream,
|
|
||||||
kind: Option<&str>,
|
|
||||||
value: Option<serde_json::Value>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
#[derive(serde::Serialize)]
|
|
||||||
struct Message {
|
|
||||||
r#type: String,
|
|
||||||
body: Option<serde_json::Value>,
|
|
||||||
}
|
|
||||||
|
|
||||||
let message = if let Some(kind) = kind {
|
let message = if let Some(kind) = kind {
|
||||||
serde_json::to_value(Message {
|
format!(
|
||||||
r#type: kind.to_string(),
|
"{{ \"type\": \"{}\", \"body\": {} }}",
|
||||||
body: value,
|
kind,
|
||||||
})?
|
match value {
|
||||||
|
Some(v) => v,
|
||||||
|
None => "null".to_string(),
|
||||||
|
}
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
value.ok_or(Error::ValueError("Invalid streaming message".to_string()))?
|
value.ok_or(Error::ValueError("Invalid streaming message".to_string()))?
|
||||||
};
|
};
|
||||||
|
|
||||||
redis_conn()?.publish(channel.to_string(), message.to_string())?;
|
redis_conn()?.publish(
|
||||||
|
&CONFIG.host,
|
||||||
|
format!(
|
||||||
|
"{{ \"channel\": \"{}\", \"message\": {} }}",
|
||||||
|
channel, message,
|
||||||
|
),
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue