Compare commits

...

12 Commits

Author SHA1 Message Date
e4ea970f28 Release v0.2.0 2025-05-16 16:08:29 +00:00
5a203d70ba Add support for access token based authentication
* Extend the configuration format in order to allow access_token,
   user_id and device_id instead of username and password
 * Move the matrix login logic outside of login_and_sync for clarity
 * Add support for access token based session resuming instead of
   logging in every time (thus creating a new device each time the
   service starts up)
 * Delay the startup of feed reader loops until after the matrix module
   has had a chance to actually check authentication

This change is quite involved and there are a few caveats, namely an
intentional race condition between the feed reader loops and matrix
authentication, as well as significantly different behaviors depending
on which authentication scheme is being used: password based
authentication requires an API call while resuming a session using an
access token does not.
2025-05-16 15:21:49 +00:00
a66517d24f state: Remove useless clone() 2025-05-15 22:01:47 +00:00
121943afa7 state: Properly propagate errors when persisting state 2025-05-15 21:54:30 +00:00
f5edc44717 config: Make error handling a little bit nicer
* Implement the std::error::Error trait and add a Result type
 * Propagate generic io::Error and serde_yaml::Error using From
 * Move the Result and Error types to the bottom of the module
2025-05-15 21:27:57 +00:00
73ee8114ca matrix: login_and_sync does not need to be generic 2025-05-15 21:06:54 +00:00
3fee20f0cf main: Panic explicitely when failing to initialize the state db 2025-05-15 21:01:59 +00:00
86c6ec26f5 state: Send a warning trace whenever the state db lock is poisoned 2025-05-15 19:30:05 +00:00
e771639856 config: Rework error handling into explicit error types 2025-05-15 19:09:41 +00:00
e79dfe9fc5 Run cargo update and bump package version 2025-05-03 16:05:38 +00:00
cdb0277c57 main: Fix silly logic bug in the exponential backoff code 2025-05-03 16:03:01 +00:00
40455f0365 feedreader: format_entry does not need to return a Result 2025-05-03 15:28:18 +00:00
7 changed files with 718 additions and 445 deletions

950
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "matrix-feedbot"
version = "0.1.4"
version = "0.2.0"
edition = "2021"
license = "GPL-3.0-or-later"
authors = ["mirsal <mirsal@mirsal.fr>"]

View File

@ -1,5 +1,5 @@
/**
* matrix-feedbot v0.1.0
* matrix-feedbot v0.2.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
@ -18,8 +18,7 @@
*/
use serde::Deserialize;
use std::error::Error;
use std::fs;
use std::{fs, io, fmt};
#[derive(Deserialize, Clone, Debug)]
pub struct FeedConfig {
@ -28,19 +27,73 @@ pub struct FeedConfig {
pub delay: u64
}
#[derive(Deserialize, Debug)]
#[serde(untagged)]
pub enum AuthConfig {
PasswordAuthConfig {
username: String,
password: String,
},
TokenAuthConfig {
user_id: String,
device_id: String,
access_token: String,
}
}
#[derive(Deserialize, Debug)]
pub struct Config {
pub default_room: String,
pub feeds: Vec<FeedConfig>,
pub username: String,
pub password: String,
#[serde(flatten)]
pub auth: AuthConfig,
pub homeserver_uri: String
}
impl Config {
pub fn load(config_file: &str) -> Result<Self, Box<dyn Error>> {
let serialized_config = fs::read_to_string(config_file)?;
pub fn load(config_file: &str) -> Result<Self> {
let serialized_config = fs::read_to_string(config_file).map_err(|e| {
match e.kind() {
io::ErrorKind::NotFound => Error::FileNotFoundError(config_file.into()),
io::ErrorKind::PermissionDenied => Error::PermissionDeniedError(),
_ => e.into(),
}
})?;
let config: Config = serde_yaml::from_str(&serialized_config)?;
Ok(config)
}
}
#[derive(Debug)]
#[allow(dead_code)] // we use Debug, so inner fields are never read
pub enum Error {
FileNotFoundError(String),
PermissionDeniedError(),
InvalidFormatError(serde_yaml::Error),
IoError(io::Error),
}
impl From<serde_yaml::Error> for Error {
fn from(e: serde_yaml::Error) -> Self {
Self::InvalidFormatError(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Self::IoError(e)
}
}
impl std::error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
write!(fmt, "{self:?}")
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -1,5 +1,5 @@
/**
* matrix-feedbot v0.1.0
* matrix-feedbot v0.2.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
@ -46,7 +46,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
info!("Fetching feed at {}", uri);
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT,
"matrix-feedbot/0.1.0 (compatible; Mozilla/5.0; +https://1312.media/)"
"matrix-feedbot/0.2.0 (compatible; Mozilla/5.0; +https://1312.media/)"
.parse().unwrap());
headers.insert(ACCEPT,
@ -73,7 +73,7 @@ pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
})
}
pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Error>> {
pub fn format_entry(feed: Feed, entry: model::Entry) -> Entry {
debug!("Formatting entry {}", entry.id);
let mut e = Entry {
@ -112,5 +112,5 @@ pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Er
content=e.content
));
Ok(e)
e
}

View File

@ -1,5 +1,5 @@
/**
* matrix-feedbot v0.1.0
* matrix-feedbot v0.2.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
@ -24,8 +24,8 @@ mod state;
use tokio::{
time::{sleep, Duration},
sync::{Notify, broadcast},
task::JoinHandle,
sync::broadcast
};
use crate::{
@ -38,7 +38,7 @@ use crate::{
state::FeedReaderStateDb
};
use std::{ sync::Arc, cmp::max };
use std::{ sync::Arc, cmp::min, cmp::max };
use tracing::{ info, debug, error };
use chrono::DateTime;
@ -46,23 +46,34 @@ use chrono::DateTime;
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config = Config::load("bots.yaml").expect("Failed to load config");
let config = Config::load("bots.yaml").unwrap_or_else(|e| {
panic!("Failed to load config: {e:?}")
});
let config = Arc::new(config);
// This message passing channel is used for sending messages to the matrix module,
// it holds a tuple with an HTML message and a list of rooms to post to
let (bcast_tx, bcast_rx) = broadcast::channel(1024);
let state_db = FeedReaderStateDb::new("state.yaml").await.unwrap_or_else(|e| {
panic!("Failed to initialize feed reader state db: {e:?}")
});
let state_db = FeedReaderStateDb::new("state.yaml").await
.expect("Failed to initialize feed reader state db");
let matrix_ready = Arc::new(Notify::new());
// This channel is used for sending messages to the matrix module,
// it holds a tuple with an HTML message and a list of rooms to post to
let (bcast_tx, bcast_rx) = broadcast::channel::<(String, Vec<String>)>(1024);
let handles: Vec<JoinHandle<_>> = config.feeds.clone().into_iter().map(|feed_config| {
let state_db = Arc::clone(&state_db);
let bcast_tx = bcast_tx.clone();
let matrix_ready = Arc::clone(&matrix_ready);
let mut backoff: u64 = feed_config.delay;
tokio::spawn(async move {
debug!("Waiting until matrix is ready");
matrix_ready.notified().await;
debug!("Notified that matrix is ready, starting loop for {}", &feed_config.url);
loop {
let delay = rand::random::<u64>() % (feed_config.delay / 5);
@ -81,7 +92,7 @@ async fn main() -> anyhow::Result<()> {
};
let feed = if feed.is_none() {
backoff = max(backoff * 2, 6 * 3600);
backoff = min(backoff * 2, 6 * 3600);
error!("Backing off for {} seconds", backoff);
sleep(Duration::from_secs(backoff)).await;
continue;
@ -104,12 +115,12 @@ async fn main() -> anyhow::Result<()> {
for entry in feed.model.entries.iter().rev() {
// FIXME: nasty clone business going on here... use Arc instead?
let parsed = format_entry(feed.clone(), (*entry).clone()).unwrap();
let parsed = format_entry(feed.clone(), (*entry).clone());
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
if parsed.ts > state_ts {
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
let msg = parsed.formatted.unwrap();
let msg = parsed.formatted.unwrap(); // XXX
bcast_tx.send((msg, rooms.clone())).unwrap();
max_ts = max(max_ts, parsed.ts);
@ -118,7 +129,9 @@ async fn main() -> anyhow::Result<()> {
if state_ts != max_ts {
info!("updating state from {} to {}", state_ts, max_ts);
state_db.set(feed.uri.as_str(), max_ts).await;
state_db.set(feed.uri.as_str(), max_ts).await.unwrap_or_else(|e| {
error!("Failed to set state: {e:?}. continuing...");
});
debug!("State update complete");
}
@ -128,11 +141,11 @@ async fn main() -> anyhow::Result<()> {
})
}).collect();
login_and_sync::<(String, Vec<String>)>(
login_and_sync(
config.homeserver_uri.clone().into(),
&config.username,
&config.password,
&config.auth,
&config.default_room,
matrix_ready,
bcast_rx
).await?;

View File

@ -1,5 +1,5 @@
/**
* matrix-feedbot v0.1.0
* matrix-feedbot v0.2.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
@ -18,23 +18,34 @@
*/
use matrix_sdk::{
matrix_auth::{
MatrixAuth,
MatrixSession,
MatrixSessionTokens
},
SessionMeta,
config::SyncSettings,
ruma::events::room::{
member::StrippedRoomMemberEvent,
message::RoomMessageEventContent
},
ruma::RoomId,
ruma::{RoomId, OwnedUserId, device_id},
Client, Room
};
use tokio::{
time::{sleep, Duration},
sync::broadcast
sync::{broadcast, Notify},
task
};
use std::sync::Arc;
use tracing::{error, info};
use std::error::Error;
use crate::config::AuthConfig;
async fn on_stripped_state_member(
room_member: StrippedRoomMemberEvent,
client: Client,
@ -85,23 +96,45 @@ async fn send_to_room(
};
}
pub async fn login_and_sync<T: Clone>(
async fn login(
auth_config: &AuthConfig,
matrix_auth: MatrixAuth
) -> anyhow::Result<()> {
match auth_config {
AuthConfig::PasswordAuthConfig{username, password} => {
matrix_auth.login_username(username, password)
.initial_device_display_name("bender v0.2.0").await?;
},
AuthConfig::TokenAuthConfig{user_id, device_id, access_token} => {
matrix_auth.restore_session(
MatrixSession {
meta: SessionMeta {
user_id: <OwnedUserId>::try_from(user_id.as_str())?,
device_id: device_id!(device_id.as_str()).to_owned(),
},
tokens: MatrixSessionTokens {
access_token: access_token.to_owned(),
refresh_token: None,
}
}
).await?;
},
}
Ok(())
}
pub async fn login_and_sync(
homeserver_url: String,
username: &str,
password: &str,
auth_config: &AuthConfig,
default_room_id: &str,
ready: Arc<Notify>,
mut rx: broadcast::Receiver<(String, Vec<String>)>
) -> anyhow::Result<()> {
// We are not reading encrypted messages, so we don't care about session persistence
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
client
.matrix_auth()
.login_username(username, password)
.initial_device_display_name("bender v0.1.4")
.await?;
info!("logged in as {username}");
login(auth_config, client.matrix_auth()).await?;
client.add_event_handler(on_stripped_state_member);
@ -116,6 +149,15 @@ pub async fn login_and_sync<T: Clone>(
// we must pass that sync token to `sync`
let settings = SyncSettings::default().token(sync_token);
// make sure all waiters have had a chance to register.
// Ideally the notified futures would be created before spawning
// the feed reader tasks but that would require the Notified structs to
// live for 'static, which I dislike.
task::yield_now().await;
info!("Matrix is ready, notifying waiters");
ready.notify_waiters();
loop {
tokio::select! {
res = client.sync(settings.clone()) => match res {

View File

@ -1,5 +1,5 @@
/**
* matrix-feedbot v0.1.0
* matrix-feedbot v0.2.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
@ -19,13 +19,13 @@
use std::{
error::Error,
sync::{Arc, Mutex}
sync::{Arc, Mutex, MutexGuard}
};
use tokio::fs;
use chrono::{ DateTime, Utc };
use tracing::{ info, debug };
use tracing::{ warn, info, debug };
use serde_yaml::mapping::Mapping as FeedReaderState;
@ -45,27 +45,34 @@ impl FeedReaderStateDb {
filename: String::from(filename)
});
Ok(db.clone())
Ok(db)
}
fn lock_state(&self) -> MutexGuard<FeedReaderState> {
self.state.lock().unwrap_or_else(|e| {
warn!("State db mutex has been poisoned, continuing...");
e.into_inner()
})
}
pub async fn set(
&self,
uri: &str,
dt: DateTime<Utc>
) -> () {
) -> Result<(), Box<dyn Error>> {
{
debug!("Updating feed reader state");
self.state.lock().unwrap().insert(uri.into(), dt.timestamp().into());
self.lock_state().insert(uri.into(), dt.timestamp().into());
}
self.persist().await.unwrap();
self.persist().await
}
#[tracing::instrument(ret, level="debug")]
pub fn get(&self, uri: &str) -> Option<DateTime<Utc>> {
debug!("Retrieving state for feed {}", uri);
match self.state.lock().unwrap().get(uri) {
match self.lock_state().get(uri) {
Some(t) => DateTime::from_timestamp((*t).as_i64().unwrap(), 0),
None => None
}