Files
matrix-feedbot/src/main.rs

151 lines
5.1 KiB
Rust

/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
mod config;
mod matrix;
mod feedreader;
mod state;
use tokio::{
time::{sleep, Duration},
task::JoinHandle,
sync::broadcast
};
use crate::{
config::Config,
matrix::login_and_sync,
feedreader::{
fetch_and_parse_feed,
format_entry
},
state::FeedReaderStateDb
};
use std::{ sync::Arc, cmp::min, cmp::max };
use tracing::{ info, debug, error };
use chrono::DateTime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config = Config::load("bots.yaml").unwrap_or_else(|e| {
panic!("Failed to load config: {e:?}")
});
let config = Arc::new(config);
let state_db = FeedReaderStateDb::new("state.yaml").await.unwrap_or_else(|e| {
panic!("Failed to initialize feed reader state db: {e:?}")
});
// This message passing channel is used for sending messages to the matrix module,
// it holds a tuple with an HTML message and a list of rooms to post to
let (bcast_tx, bcast_rx) = broadcast::channel::<(String, Vec<String>)>(1024);
let handles: Vec<JoinHandle<_>> = config.feeds.clone().into_iter().map(|feed_config| {
let state_db = Arc::clone(&state_db);
let bcast_tx = bcast_tx.clone();
let mut backoff: u64 = feed_config.delay;
tokio::spawn(async move {
loop {
let delay = rand::random::<u64>() % (feed_config.delay / 5);
debug!("Adding randomized delay: {}", delay);
sleep(Duration::from_secs(delay)).await;
let feed: Option<_> = match fetch_and_parse_feed(&feed_config.url).await {
Ok(f) => Some(f),
Err(e) => {
error!("Failed to parse feed {}: {:?}",
feed_config.url,
e
);
None
}
};
let feed = if feed.is_none() {
backoff = min(backoff * 2, 6 * 3600);
error!("Backing off for {} seconds", backoff);
sleep(Duration::from_secs(backoff)).await;
continue;
} else {
if backoff != feed_config.delay {
debug!("Resetting exponential backoff timer");
backoff = feed_config.delay;
}
feed.unwrap()
};
let rooms = feed_config.rooms.clone();
debug!("Retrieving state ts for feed with uri {}", feed.uri);
let state_ts = state_db.get(feed.uri.as_str())
.unwrap_or(DateTime::from_timestamp(1, 0).unwrap());
debug!("db has state ts {} for this feed", state_ts);
let mut max_ts = state_ts.clone();
for entry in feed.model.entries.iter().rev() {
// FIXME: nasty clone business going on here... use Arc instead?
let parsed = format_entry(feed.clone(), (*entry).clone());
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
if parsed.ts > state_ts {
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
let msg = parsed.formatted.unwrap(); // XXX
bcast_tx.send((msg, rooms.clone())).unwrap();
max_ts = max(max_ts, parsed.ts);
}
}
if state_ts != max_ts {
info!("updating state from {} to {}", state_ts, max_ts);
state_db.set(feed.uri.as_str(), max_ts).await.unwrap_or_else(|e| {
error!("Failed to set state: {e:?}. continuing...");
});
debug!("State update complete");
}
debug!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
sleep(Duration::from_secs(feed_config.delay)).await;
}
})
}).collect();
login_and_sync(
config.homeserver_uri.clone().into(),
&config.username,
&config.password,
&config.default_room,
bcast_rx
).await?;
for h in handles {
h.abort();
h.await?;
}
Ok(())
}