Compare commits
22 Commits
cd590b73fe
...
main
Author | SHA1 | Date | |
---|---|---|---|
e4ea970f28
|
|||
5a203d70ba
|
|||
a66517d24f
|
|||
121943afa7
|
|||
f5edc44717
|
|||
73ee8114ca
|
|||
3fee20f0cf
|
|||
86c6ec26f5
|
|||
e771639856
|
|||
e79dfe9fc5
|
|||
cdb0277c57
|
|||
40455f0365
|
|||
c8c9021b78
|
|||
5c009476d6
|
|||
cacbf4af45
|
|||
6dddbfb1e9
|
|||
24d824f759
|
|||
e951fcdf83
|
|||
d0ac11ea89
|
|||
b93f921bcb
|
|||
148d4e7e43
|
|||
50f0bcd3c7
|
951
Cargo.lock
generated
951
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,11 +1,12 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "matrix-feedbot"
|
name = "matrix-feedbot"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "GPL-3.0-or-later"
|
license = "GPL-3.0-or-later"
|
||||||
authors = ["mirsal <mirsal@mirsal.fr>"]
|
authors = ["mirsal <mirsal@mirsal.fr>"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
rand = "0.8"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
@ -19,7 +19,7 @@ RUN cargo build --release
|
|||||||
|
|
||||||
FROM debian:bookworm-slim
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends libssl3 && apt-get clean
|
RUN apt-get update && apt-get install -y --no-install-recommends libssl3 ca-certificates && apt-get clean
|
||||||
COPY --from=build /matrix-feedbot/target/release/matrix-feedbot .
|
COPY --from=build /matrix-feedbot/target/release/matrix-feedbot .
|
||||||
|
|
||||||
CMD ["./matrix-feedbot"]
|
CMD ["./matrix-feedbot"]
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* matrix-feedbot v0.1.0
|
* matrix-feedbot v0.2.0
|
||||||
*
|
*
|
||||||
* Copyright (C) 2024 The 1312 Media Collective
|
* Copyright (C) 2024 The 1312 Media Collective
|
||||||
*
|
*
|
||||||
@ -18,8 +18,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::error::Error;
|
use std::{fs, io, fmt};
|
||||||
use std::fs;
|
|
||||||
|
|
||||||
#[derive(Deserialize, Clone, Debug)]
|
#[derive(Deserialize, Clone, Debug)]
|
||||||
pub struct FeedConfig {
|
pub struct FeedConfig {
|
||||||
@ -28,19 +27,73 @@ pub struct FeedConfig {
|
|||||||
pub delay: u64
|
pub delay: u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
pub enum AuthConfig {
|
||||||
|
PasswordAuthConfig {
|
||||||
|
username: String,
|
||||||
|
password: String,
|
||||||
|
},
|
||||||
|
TokenAuthConfig {
|
||||||
|
user_id: String,
|
||||||
|
device_id: String,
|
||||||
|
access_token: String,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub default_room: String,
|
pub default_room: String,
|
||||||
pub feeds: Vec<FeedConfig>,
|
pub feeds: Vec<FeedConfig>,
|
||||||
pub username: String,
|
#[serde(flatten)]
|
||||||
pub password: String,
|
pub auth: AuthConfig,
|
||||||
pub homeserver_uri: String
|
pub homeserver_uri: String
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub fn load(config_file: &str) -> Result<Self, Box<dyn Error>> {
|
pub fn load(config_file: &str) -> Result<Self> {
|
||||||
let serialized_config = fs::read_to_string(config_file)?;
|
|
||||||
|
let serialized_config = fs::read_to_string(config_file).map_err(|e| {
|
||||||
|
match e.kind() {
|
||||||
|
io::ErrorKind::NotFound => Error::FileNotFoundError(config_file.into()),
|
||||||
|
io::ErrorKind::PermissionDenied => Error::PermissionDeniedError(),
|
||||||
|
_ => e.into(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
let config: Config = serde_yaml::from_str(&serialized_config)?;
|
let config: Config = serde_yaml::from_str(&serialized_config)?;
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[allow(dead_code)] // we use Debug, so inner fields are never read
|
||||||
|
pub enum Error {
|
||||||
|
FileNotFoundError(String),
|
||||||
|
PermissionDeniedError(),
|
||||||
|
InvalidFormatError(serde_yaml::Error),
|
||||||
|
IoError(io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<serde_yaml::Error> for Error {
|
||||||
|
fn from(e: serde_yaml::Error) -> Self {
|
||||||
|
Self::InvalidFormatError(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<io::Error> for Error {
|
||||||
|
fn from(e: io::Error) -> Self {
|
||||||
|
Self::IoError(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for Error {}
|
||||||
|
|
||||||
|
impl fmt::Display for Error {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
|
||||||
|
write!(fmt, "{self:?}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* matrix-feedbot v0.1.0
|
* matrix-feedbot v0.2.0
|
||||||
*
|
*
|
||||||
* Copyright (C) 2024 The 1312 Media Collective
|
* Copyright (C) 2024 The 1312 Media Collective
|
||||||
*
|
*
|
||||||
@ -46,7 +46,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
|
|||||||
info!("Fetching feed at {}", uri);
|
info!("Fetching feed at {}", uri);
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
headers.insert(USER_AGENT,
|
headers.insert(USER_AGENT,
|
||||||
"matrix-feedbot/0.1.0 (compatible; Mozilla/5.0; +https://1312.media/)"
|
"matrix-feedbot/0.2.0 (compatible; Mozilla/5.0; +https://1312.media/)"
|
||||||
.parse().unwrap());
|
.parse().unwrap());
|
||||||
|
|
||||||
headers.insert(ACCEPT,
|
headers.insert(ACCEPT,
|
||||||
@ -56,7 +56,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
|
|||||||
let http_client = reqwest::Client::builder().default_headers(headers).build()?;
|
let http_client = reqwest::Client::builder().default_headers(headers).build()?;
|
||||||
let response = http_client.get(uri).send().await?.text().await?;
|
let response = http_client.get(uri).send().await?.text().await?;
|
||||||
|
|
||||||
info!("Got response, parsing feed");
|
debug!("Got response, parsing feed from {}", uri);
|
||||||
let feed = parser::parse(response.as_bytes())?;
|
let feed = parser::parse(response.as_bytes())?;
|
||||||
|
|
||||||
let feed_title = match feed.title.clone() {
|
let feed_title = match feed.title.clone() {
|
||||||
@ -64,7 +64,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
|
|||||||
None => String::from("Untitled")
|
None => String::from("Untitled")
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Got feed with title \"{}\"", feed_title);
|
debug!("Got feed with title \"{}\"", feed_title);
|
||||||
|
|
||||||
Ok(Feed {
|
Ok(Feed {
|
||||||
uri: String::from(uri),
|
uri: String::from(uri),
|
||||||
@ -73,7 +73,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Error>> {
|
pub fn format_entry(feed: Feed, entry: model::Entry) -> Entry {
|
||||||
debug!("Formatting entry {}", entry.id);
|
debug!("Formatting entry {}", entry.id);
|
||||||
|
|
||||||
let mut e = Entry {
|
let mut e = Entry {
|
||||||
@ -82,7 +82,11 @@ pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Er
|
|||||||
Some(t) => t.content,
|
Some(t) => t.content,
|
||||||
None => String::from("Untitled")
|
None => String::from("Untitled")
|
||||||
},
|
},
|
||||||
link: entry.links[0].href.clone(),
|
link: if entry.links.len() > 0 {
|
||||||
|
entry.links[0].href.clone()
|
||||||
|
} else {
|
||||||
|
String::from("#")
|
||||||
|
},
|
||||||
ts: match entry.updated {
|
ts: match entry.updated {
|
||||||
Some(d) => d,
|
Some(d) => d,
|
||||||
None => entry.published.unwrap_or(Utc::now())
|
None => entry.published.unwrap_or(Utc::now())
|
||||||
@ -108,5 +112,5 @@ pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Er
|
|||||||
content=e.content
|
content=e.content
|
||||||
));
|
));
|
||||||
|
|
||||||
Ok(e)
|
e
|
||||||
}
|
}
|
||||||
|
78
src/main.rs
78
src/main.rs
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* matrix-feedbot v0.1.0
|
* matrix-feedbot v0.2.0
|
||||||
*
|
*
|
||||||
* Copyright (C) 2024 The 1312 Media Collective
|
* Copyright (C) 2024 The 1312 Media Collective
|
||||||
*
|
*
|
||||||
@ -24,8 +24,8 @@ mod state;
|
|||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
time::{sleep, Duration},
|
time::{sleep, Duration},
|
||||||
|
sync::{Notify, broadcast},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
sync::broadcast
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -38,33 +38,71 @@ use crate::{
|
|||||||
state::FeedReaderStateDb
|
state::FeedReaderStateDb
|
||||||
};
|
};
|
||||||
|
|
||||||
use std::{ sync::Arc, cmp::max };
|
use std::{ sync::Arc, cmp::min, cmp::max };
|
||||||
use tracing::{ info, debug };
|
use tracing::{ info, debug, error };
|
||||||
use chrono::DateTime;
|
use chrono::DateTime;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
let config = Config::load("bots.yaml").expect("Failed to load config");
|
let config = Config::load("bots.yaml").unwrap_or_else(|e| {
|
||||||
|
panic!("Failed to load config: {e:?}")
|
||||||
|
});
|
||||||
let config = Arc::new(config);
|
let config = Arc::new(config);
|
||||||
|
|
||||||
// This message passing channel is used for sending messages to the matrix module,
|
let state_db = FeedReaderStateDb::new("state.yaml").await.unwrap_or_else(|e| {
|
||||||
// it holds a tuple with an HTML message and a list of rooms to post to
|
panic!("Failed to initialize feed reader state db: {e:?}")
|
||||||
let (bcast_tx, bcast_rx) = broadcast::channel(16);
|
});
|
||||||
|
|
||||||
let state_db = FeedReaderStateDb::new("state.yaml").await
|
let matrix_ready = Arc::new(Notify::new());
|
||||||
.expect("Failed to initialize feed reader state db");
|
|
||||||
|
// This channel is used for sending messages to the matrix module,
|
||||||
|
// it holds a tuple with an HTML message and a list of rooms to post to
|
||||||
|
let (bcast_tx, bcast_rx) = broadcast::channel::<(String, Vec<String>)>(1024);
|
||||||
|
|
||||||
let handles: Vec<JoinHandle<_>> = config.feeds.clone().into_iter().map(|feed_config| {
|
let handles: Vec<JoinHandle<_>> = config.feeds.clone().into_iter().map(|feed_config| {
|
||||||
let state_db = Arc::clone(&state_db);
|
let state_db = Arc::clone(&state_db);
|
||||||
let bcast_tx = bcast_tx.clone();
|
let bcast_tx = bcast_tx.clone();
|
||||||
|
let matrix_ready = Arc::clone(&matrix_ready);
|
||||||
|
let mut backoff: u64 = feed_config.delay;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
||||||
|
debug!("Waiting until matrix is ready");
|
||||||
|
matrix_ready.notified().await;
|
||||||
|
|
||||||
|
debug!("Notified that matrix is ready, starting loop for {}", &feed_config.url);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let feed = fetch_and_parse_feed(&feed_config.url).await
|
let delay = rand::random::<u64>() % (feed_config.delay / 5);
|
||||||
.expect("Failed to parse feed");
|
|
||||||
|
debug!("Adding randomized delay: {}", delay);
|
||||||
|
sleep(Duration::from_secs(delay)).await;
|
||||||
|
|
||||||
|
let feed: Option<_> = match fetch_and_parse_feed(&feed_config.url).await {
|
||||||
|
Ok(f) => Some(f),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to parse feed {}: {:?}",
|
||||||
|
feed_config.url,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let feed = if feed.is_none() {
|
||||||
|
backoff = min(backoff * 2, 6 * 3600);
|
||||||
|
error!("Backing off for {} seconds", backoff);
|
||||||
|
sleep(Duration::from_secs(backoff)).await;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
if backoff != feed_config.delay {
|
||||||
|
debug!("Resetting exponential backoff timer");
|
||||||
|
backoff = feed_config.delay;
|
||||||
|
}
|
||||||
|
feed.unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
let rooms = feed_config.rooms.clone();
|
let rooms = feed_config.rooms.clone();
|
||||||
|
|
||||||
@ -77,12 +115,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
for entry in feed.model.entries.iter().rev() {
|
for entry in feed.model.entries.iter().rev() {
|
||||||
// FIXME: nasty clone business going on here... use Arc instead?
|
// FIXME: nasty clone business going on here... use Arc instead?
|
||||||
let parsed = format_entry(feed.clone(), (*entry).clone()).unwrap();
|
let parsed = format_entry(feed.clone(), (*entry).clone());
|
||||||
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
|
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
|
||||||
|
|
||||||
if parsed.ts > state_ts {
|
if parsed.ts > state_ts {
|
||||||
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
|
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
|
||||||
let msg = parsed.formatted.unwrap();
|
let msg = parsed.formatted.unwrap(); // XXX
|
||||||
bcast_tx.send((msg, rooms.clone())).unwrap();
|
bcast_tx.send((msg, rooms.clone())).unwrap();
|
||||||
|
|
||||||
max_ts = max(max_ts, parsed.ts);
|
max_ts = max(max_ts, parsed.ts);
|
||||||
@ -91,21 +129,23 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
if state_ts != max_ts {
|
if state_ts != max_ts {
|
||||||
info!("updating state from {} to {}", state_ts, max_ts);
|
info!("updating state from {} to {}", state_ts, max_ts);
|
||||||
state_db.set(feed.uri.as_str(), max_ts).await;
|
state_db.set(feed.uri.as_str(), max_ts).await.unwrap_or_else(|e| {
|
||||||
|
error!("Failed to set state: {e:?}. continuing...");
|
||||||
|
});
|
||||||
debug!("State update complete");
|
debug!("State update complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
|
debug!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
|
||||||
sleep(Duration::from_secs(feed_config.delay)).await;
|
sleep(Duration::from_secs(feed_config.delay)).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}).collect();
|
}).collect();
|
||||||
|
|
||||||
login_and_sync::<(String, Vec<String>)>(
|
login_and_sync(
|
||||||
config.homeserver_uri.clone().into(),
|
config.homeserver_uri.clone().into(),
|
||||||
&config.username,
|
&config.auth,
|
||||||
&config.password,
|
|
||||||
&config.default_room,
|
&config.default_room,
|
||||||
|
matrix_ready,
|
||||||
bcast_rx
|
bcast_rx
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
|
128
src/matrix.rs
128
src/matrix.rs
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* matrix-feedbot v0.1.0
|
* matrix-feedbot v0.2.0
|
||||||
*
|
*
|
||||||
* Copyright (C) 2024 The 1312 Media Collective
|
* Copyright (C) 2024 The 1312 Media Collective
|
||||||
*
|
*
|
||||||
@ -18,20 +18,34 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
use matrix_sdk::{
|
use matrix_sdk::{
|
||||||
|
matrix_auth::{
|
||||||
|
MatrixAuth,
|
||||||
|
MatrixSession,
|
||||||
|
MatrixSessionTokens
|
||||||
|
},
|
||||||
|
SessionMeta,
|
||||||
config::SyncSettings,
|
config::SyncSettings,
|
||||||
ruma::events::room::{
|
ruma::events::room::{
|
||||||
member::StrippedRoomMemberEvent,
|
member::StrippedRoomMemberEvent,
|
||||||
message::RoomMessageEventContent
|
message::RoomMessageEventContent
|
||||||
},
|
},
|
||||||
ruma::RoomId,
|
ruma::{RoomId, OwnedUserId, device_id},
|
||||||
Client, Room
|
Client, Room
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
time::{sleep, Duration},
|
time::{sleep, Duration},
|
||||||
sync::broadcast
|
sync::{broadcast, Notify},
|
||||||
|
task
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tracing::{error, info};
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use crate::config::AuthConfig;
|
||||||
|
|
||||||
async fn on_stripped_state_member(
|
async fn on_stripped_state_member(
|
||||||
room_member: StrippedRoomMemberEvent,
|
room_member: StrippedRoomMemberEvent,
|
||||||
client: Client,
|
client: Client,
|
||||||
@ -42,44 +56,85 @@ async fn on_stripped_state_member(
|
|||||||
}
|
}
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
println!("Autojoining room {}", room.room_id());
|
info!("Autojoining room {}", room.room_id());
|
||||||
let mut delay = 2;
|
let mut delay = 2;
|
||||||
|
|
||||||
while let Err(err) = room.join().await {
|
while let Err(err) = room.join().await {
|
||||||
// retry autojoin due to synapse sending invites, before the
|
// retry autojoin due to synapse sending invites, before the
|
||||||
// invited user can join for more information see
|
// invited user can join for more information see
|
||||||
// https://github.com/matrix-org/synapse/issues/4345
|
// https://github.com/matrix-org/synapse/issues/4345
|
||||||
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
|
error!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
|
||||||
|
|
||||||
sleep(Duration::from_secs(delay)).await;
|
sleep(Duration::from_secs(delay)).await;
|
||||||
delay *= 2;
|
delay *= 2;
|
||||||
|
|
||||||
if delay > 3600 {
|
if delay > 3600 {
|
||||||
eprintln!("Can't join room {} ({err:?})", room.room_id());
|
error!("Can't join room {} ({err:?})", room.room_id());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println!("Successfully joined room {}", room.room_id());
|
info!("Successfully joined room {}", room.room_id());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn login_and_sync<T: Clone>(
|
async fn send_to_room(
|
||||||
|
msg: &String,
|
||||||
|
room: Room,
|
||||||
|
) -> Result<(), Box<dyn Error>> {
|
||||||
|
|
||||||
|
info!("Joining room {}", room.room_id());
|
||||||
|
match room.client().join_room_by_id(room.room_id()).await {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Posting to room {}", room.room_id());
|
||||||
|
room.send(RoomMessageEventContent::text_html(msg.clone(), msg)).await?;
|
||||||
|
return Ok(());
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to join room {e:?}");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn login(
|
||||||
|
auth_config: &AuthConfig,
|
||||||
|
matrix_auth: MatrixAuth
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
match auth_config {
|
||||||
|
AuthConfig::PasswordAuthConfig{username, password} => {
|
||||||
|
matrix_auth.login_username(username, password)
|
||||||
|
.initial_device_display_name("bender v0.2.0").await?;
|
||||||
|
},
|
||||||
|
AuthConfig::TokenAuthConfig{user_id, device_id, access_token} => {
|
||||||
|
matrix_auth.restore_session(
|
||||||
|
MatrixSession {
|
||||||
|
meta: SessionMeta {
|
||||||
|
user_id: <OwnedUserId>::try_from(user_id.as_str())?,
|
||||||
|
device_id: device_id!(device_id.as_str()).to_owned(),
|
||||||
|
},
|
||||||
|
tokens: MatrixSessionTokens {
|
||||||
|
access_token: access_token.to_owned(),
|
||||||
|
refresh_token: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
).await?;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn login_and_sync(
|
||||||
homeserver_url: String,
|
homeserver_url: String,
|
||||||
username: &str,
|
auth_config: &AuthConfig,
|
||||||
password: &str,
|
|
||||||
default_room_id: &str,
|
default_room_id: &str,
|
||||||
|
ready: Arc<Notify>,
|
||||||
mut rx: broadcast::Receiver<(String, Vec<String>)>
|
mut rx: broadcast::Receiver<(String, Vec<String>)>
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// We are not reading encrypted messages, so we don't care about session persistence
|
// We are not reading encrypted messages, so we don't care about session persistence
|
||||||
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
|
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
|
||||||
|
|
||||||
client
|
login(auth_config, client.matrix_auth()).await?;
|
||||||
.matrix_auth()
|
|
||||||
.login_username(username, password)
|
|
||||||
.initial_device_display_name("bender v0.1.0")
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
println!("logged in as {username}");
|
|
||||||
|
|
||||||
client.add_event_handler(on_stripped_state_member);
|
client.add_event_handler(on_stripped_state_member);
|
||||||
|
|
||||||
@ -94,21 +149,44 @@ pub async fn login_and_sync<T: Clone>(
|
|||||||
// we must pass that sync token to `sync`
|
// we must pass that sync token to `sync`
|
||||||
let settings = SyncSettings::default().token(sync_token);
|
let settings = SyncSettings::default().token(sync_token);
|
||||||
|
|
||||||
|
// make sure all waiters have had a chance to register.
|
||||||
|
// Ideally the notified futures would be created before spawning
|
||||||
|
// the feed reader tasks but that would require the Notified structs to
|
||||||
|
// live for 'static, which I dislike.
|
||||||
|
task::yield_now().await;
|
||||||
|
|
||||||
|
info!("Matrix is ready, notifying waiters");
|
||||||
|
ready.notify_waiters();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok(_) = client.sync(settings.clone()) => return Ok(()),
|
res = client.sync(settings.clone()) => match res {
|
||||||
Ok((msg, rooms)) = rx.recv() => {
|
Ok(_) => return Ok(()),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Sync error: {e:?}");
|
||||||
|
()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
recv = rx.recv() => match recv {
|
||||||
|
Ok((msg, rooms)) => {
|
||||||
|
|
||||||
let msg = &msg.clone();
|
let msg = &msg.clone();
|
||||||
for r in rooms {
|
for r in rooms {
|
||||||
let room = client.join_room_by_id(<&RoomId>::try_from(r.as_str())
|
match client.get_room(<&RoomId>::try_from(r.as_str())
|
||||||
.expect("invalid room ID")
|
.expect(format!("Invalid Room ID: {}", r).as_str())) {
|
||||||
).await.expect(format!("Failed to join room {}", r).as_str());
|
|
||||||
|
|
||||||
room.send(RoomMessageEventContent::text_html(msg.clone(), msg)).await
|
Some(room) => match send_to_room(msg, room).await {
|
||||||
.expect("Failed to send matrix event");
|
Ok(_) => info!("Done sending to room {}", r),
|
||||||
}
|
Err(e) => error!("Cannot send to room {}: {e:?}", r),
|
||||||
|
},
|
||||||
|
|
||||||
|
None => error!("Room {} not found in matrix state store", r)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Broadcast channel is lagging: {e:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
29
src/state.rs
29
src/state.rs
@ -1,5 +1,5 @@
|
|||||||
/**
|
/**
|
||||||
* matrix-feedbot v0.1.0
|
* matrix-feedbot v0.2.0
|
||||||
*
|
*
|
||||||
* Copyright (C) 2024 The 1312 Media Collective
|
* Copyright (C) 2024 The 1312 Media Collective
|
||||||
*
|
*
|
||||||
@ -19,13 +19,13 @@
|
|||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
error::Error,
|
error::Error,
|
||||||
sync::{Arc, Mutex}
|
sync::{Arc, Mutex, MutexGuard}
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
use chrono::{ DateTime, Utc };
|
use chrono::{ DateTime, Utc };
|
||||||
use tracing::{ info };
|
use tracing::{ warn, info, debug };
|
||||||
|
|
||||||
use serde_yaml::mapping::Mapping as FeedReaderState;
|
use serde_yaml::mapping::Mapping as FeedReaderState;
|
||||||
|
|
||||||
@ -45,28 +45,34 @@ impl FeedReaderStateDb {
|
|||||||
filename: String::from(filename)
|
filename: String::from(filename)
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(db.clone())
|
Ok(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lock_state(&self) -> MutexGuard<FeedReaderState> {
|
||||||
|
self.state.lock().unwrap_or_else(|e| {
|
||||||
|
warn!("State db mutex has been poisoned, continuing...");
|
||||||
|
e.into_inner()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set(
|
pub async fn set(
|
||||||
&self,
|
&self,
|
||||||
uri: &str,
|
uri: &str,
|
||||||
dt: DateTime<Utc>
|
dt: DateTime<Utc>
|
||||||
) -> () {
|
) -> Result<(), Box<dyn Error>> {
|
||||||
|
|
||||||
{
|
{
|
||||||
info!("Updating feed reader state");
|
debug!("Updating feed reader state");
|
||||||
self.state.lock().unwrap().insert(uri.into(), dt.timestamp().into());
|
self.lock_state().insert(uri.into(), dt.timestamp().into());
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Persisting feed reader state");
|
self.persist().await
|
||||||
self.persist().await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(ret, level="debug")]
|
#[tracing::instrument(ret, level="debug")]
|
||||||
pub fn get(&self, uri: &str) -> Option<DateTime<Utc>> {
|
pub fn get(&self, uri: &str) -> Option<DateTime<Utc>> {
|
||||||
info!("Retrieving state for feed {}", uri);
|
debug!("Retrieving state for feed {}", uri);
|
||||||
match self.state.lock().unwrap().get(uri) {
|
match self.lock_state().get(uri) {
|
||||||
Some(t) => DateTime::from_timestamp((*t).as_i64().unwrap(), 0),
|
Some(t) => DateTime::from_timestamp((*t).as_i64().unwrap(), 0),
|
||||||
None => None
|
None => None
|
||||||
}
|
}
|
||||||
@ -79,6 +85,7 @@ impl FeedReaderStateDb {
|
|||||||
Ok(state)
|
Ok(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(ret, level="debug")]
|
||||||
async fn persist(&self) -> Result<(), Box<dyn Error>> {
|
async fn persist(&self) -> Result<(), Box<dyn Error>> {
|
||||||
let serialized_state = serde_yaml::to_string(&self.state)?;
|
let serialized_state = serde_yaml::to_string(&self.state)?;
|
||||||
fs::write(self.filename.clone(), &serialized_state.as_bytes()).await?;
|
fs::write(self.filename.clone(), &serialized_state.as_bytes()).await?;
|
||||||
|
Reference in New Issue
Block a user