/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
mod config;
mod matrix;
mod feedreader;
mod state;
use tokio::{
time::{sleep, Duration},
task::JoinHandle,
sync::broadcast
};
use crate::{
config::Config,
matrix::login_and_sync,
feedreader::{
fetch_and_parse_feed,
format_entry
},
state::FeedReaderStateDb
};
use std::{ sync::Arc, cmp::max };
use tracing::{ info, debug, error };
use chrono::DateTime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config = Config::load("bots.yaml").expect("Failed to load config");
let config = Arc::new(config);
// This message passing channel is used for sending messages to the matrix module,
// it holds a tuple with an HTML message and a list of rooms to post to
let (bcast_tx, bcast_rx) = broadcast::channel(1024);
let state_db = FeedReaderStateDb::new("state.yaml").await
.expect("Failed to initialize feed reader state db");
let handles: Vec> = config.feeds.clone().into_iter().map(|feed_config| {
let state_db = Arc::clone(&state_db);
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
loop {
let feed: Option<_> = match fetch_and_parse_feed(&feed_config.url).await {
Ok(f) => Some(f),
Err(e) => {
error!("Failed to parse feed {}: {:?}",
feed_config.url,
e
);
None
}
};
let feed = if feed.is_none() {
debug!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
sleep(Duration::from_secs(feed_config.delay)).await;
continue;
} else {
feed.unwrap()
};
let rooms = feed_config.rooms.clone();
debug!("Retrieving state ts for feed with uri {}", feed.uri);
let state_ts = state_db.get(feed.uri.as_str())
.unwrap_or(DateTime::from_timestamp(1, 0).unwrap());
debug!("db has state ts {} for this feed", state_ts);
let mut max_ts = state_ts.clone();
for entry in feed.model.entries.iter().rev() {
// FIXME: nasty clone business going on here... use Arc instead?
let parsed = format_entry(feed.clone(), (*entry).clone()).unwrap();
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
if parsed.ts > state_ts {
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
let msg = parsed.formatted.unwrap();
bcast_tx.send((msg, rooms.clone())).unwrap();
max_ts = max(max_ts, parsed.ts);
}
}
if state_ts != max_ts {
info!("updating state from {} to {}", state_ts, max_ts);
state_db.set(feed.uri.as_str(), max_ts).await;
debug!("State update complete");
}
debug!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
sleep(Duration::from_secs(feed_config.delay)).await;
}
})
}).collect();
login_and_sync::<(String, Vec)>(
config.homeserver_uri.clone().into(),
&config.username,
&config.password,
&config.default_room,
bcast_rx
).await?;
for h in handles {
h.abort();
h.await?;
}
Ok(())
}