Initial code dump: matrix feedbot, aka bender

This is a rewrite of our old feedbot in rust, heavily inspired from
rek2's INN matrix bot and making use of some bits from matrix-rust-sdk

This is an asynchronous tokio-based matrix client using a stateless feed
fetcher implementation based on reqwest, it uses feed_rs for parsing RSS
and Atom feeds. State persistence is achieved using a simple file-backed
datastore with serde_yaml as a serialization format.

Published under the GNU General Public License version 3 or later.
This commit is contained in:
2024-11-22 13:08:34 +00:00
commit cd590b73fe
10 changed files with 4457 additions and 0 deletions

46
src/config.rs Normal file
View File

@ -0,0 +1,46 @@
/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use serde::Deserialize;
use std::error::Error;
use std::fs;
#[derive(Deserialize, Clone, Debug)]
pub struct FeedConfig {
pub url: String,
pub rooms: Vec<String>,
pub delay: u64
}
#[derive(Deserialize, Debug)]
pub struct Config {
pub default_room: String,
pub feeds: Vec<FeedConfig>,
pub username: String,
pub password: String,
pub homeserver_uri: String
}
impl Config {
pub fn load(config_file: &str) -> Result<Self, Box<dyn Error>> {
let serialized_config = fs::read_to_string(config_file)?;
let config: Config = serde_yaml::from_str(&serialized_config)?;
Ok(config)
}
}

112
src/feedreader.rs Normal file
View File

@ -0,0 +1,112 @@
/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use reqwest::header::{ HeaderMap, ACCEPT, USER_AGENT };
use tracing::{ info, debug };
use feed_rs::{ model, parser };
use chrono::{ DateTime, Utc };
use std::error::Error;
#[derive(Clone, Debug)]
pub struct Feed {
pub uri: String,
pub title: String,
pub model: model::Feed
}
#[derive(Clone, Debug)]
pub struct Entry {
pub ts: DateTime<Utc>,
pub title: String,
pub link: String,
pub content: String,
pub feed: Feed,
#[allow(dead_code)]
pub model: model::Entry,
pub formatted: Option<String>
}
pub async fn fetch_and_parse_feed(uri: &str) -> Result<Feed, Box<dyn Error>> {
info!("Fetching feed at {}", uri);
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT,
"matrix-feedbot/0.1.0 (compatible; Mozilla/5.0; +https://1312.media/)"
.parse().unwrap());
headers.insert(ACCEPT,
"application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4"
.parse().unwrap());
let http_client = reqwest::Client::builder().default_headers(headers).build()?;
let response = http_client.get(uri).send().await?.text().await?;
info!("Got response, parsing feed");
let feed = parser::parse(response.as_bytes())?;
let feed_title = match feed.title.clone() {
Some(t) => t.content,
None => String::from("Untitled")
};
info!("Got feed with title \"{}\"", feed_title);
Ok(Feed {
uri: String::from(uri),
title: feed_title,
model: feed
})
}
pub fn format_entry(feed: Feed, entry: model::Entry) -> Result<Entry, Box<dyn Error>> {
debug!("Formatting entry {}", entry.id);
let mut e = Entry {
feed: feed,
title: match entry.title.clone() {
Some(t) => t.content,
None => String::from("Untitled")
},
link: entry.links[0].href.clone(),
ts: match entry.updated {
Some(d) => d,
None => entry.published.unwrap_or(Utc::now())
},
content: match entry.content.clone() {
Some(c) => c.body.unwrap_or(String::from("")),
None => match entry.summary.clone() {
Some(s) => s.content,
None => String::from("")
}
},
model: entry,
formatted: None
};
e.formatted = Some(format!(
"<b>{feed_title}: <a href=\"{link}\">{title}</a> on {date}</b>{content}",
feed_title=e.feed.title,
link=e.link,
title=e.title,
date=e.ts.to_rfc2822(),
content=e.content
));
Ok(e)
}

118
src/main.rs Normal file
View File

@ -0,0 +1,118 @@
/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
mod config;
mod matrix;
mod feedreader;
mod state;
use tokio::{
time::{sleep, Duration},
task::JoinHandle,
sync::broadcast
};
use crate::{
config::Config,
matrix::login_and_sync,
feedreader::{
fetch_and_parse_feed,
format_entry
},
state::FeedReaderStateDb
};
use std::{ sync::Arc, cmp::max };
use tracing::{ info, debug };
use chrono::DateTime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config = Config::load("bots.yaml").expect("Failed to load config");
let config = Arc::new(config);
// This message passing channel is used for sending messages to the matrix module,
// it holds a tuple with an HTML message and a list of rooms to post to
let (bcast_tx, bcast_rx) = broadcast::channel(16);
let state_db = FeedReaderStateDb::new("state.yaml").await
.expect("Failed to initialize feed reader state db");
let handles: Vec<JoinHandle<_>> = config.feeds.clone().into_iter().map(|feed_config| {
let state_db = Arc::clone(&state_db);
let bcast_tx = bcast_tx.clone();
tokio::spawn(async move {
loop {
let feed = fetch_and_parse_feed(&feed_config.url).await
.expect("Failed to parse feed");
let rooms = feed_config.rooms.clone();
debug!("Retrieving state ts for feed with uri {}", feed.uri);
let state_ts = state_db.get(feed.uri.as_str())
.unwrap_or(DateTime::from_timestamp(1, 0).unwrap());
debug!("db has state ts {} for this feed", state_ts);
let mut max_ts = state_ts.clone();
for entry in feed.model.entries.iter().rev() {
// FIXME: nasty clone business going on here... use Arc instead?
let parsed = format_entry(feed.clone(), (*entry).clone()).unwrap();
debug!("parsed entry with title: {}, updated on {}", parsed.title, parsed.ts);
if parsed.ts > state_ts {
info!("Entry {} has not been posted yet, sending to matrix", entry.id);
let msg = parsed.formatted.unwrap();
bcast_tx.send((msg, rooms.clone())).unwrap();
max_ts = max(max_ts, parsed.ts);
}
}
if state_ts != max_ts {
info!("updating state from {} to {}", state_ts, max_ts);
state_db.set(feed.uri.as_str(), max_ts).await;
debug!("State update complete");
}
info!("Sleeping for {} seconds before refreshing this feed", feed_config.delay);
sleep(Duration::from_secs(feed_config.delay)).await;
}
})
}).collect();
login_and_sync::<(String, Vec<String>)>(
config.homeserver_uri.clone().into(),
&config.username,
&config.password,
&config.default_room,
bcast_rx
).await?;
for h in handles {
h.abort();
h.await?;
}
Ok(())
}

115
src/matrix.rs Normal file
View File

@ -0,0 +1,115 @@
/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use matrix_sdk::{
config::SyncSettings,
ruma::events::room::{
member::StrippedRoomMemberEvent,
message::RoomMessageEventContent
},
ruma::RoomId,
Client, Room
};
use tokio::{
time::{sleep, Duration},
sync::broadcast
};
async fn on_stripped_state_member(
room_member: StrippedRoomMemberEvent,
client: Client,
room: Room,
) {
if room_member.state_key != client.user_id().unwrap() {
return;
}
tokio::spawn(async move {
println!("Autojoining room {}", room.room_id());
let mut delay = 2;
while let Err(err) = room.join().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
eprintln!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
println!("Successfully joined room {}", room.room_id());
});
}
pub async fn login_and_sync<T: Clone>(
homeserver_url: String,
username: &str,
password: &str,
default_room_id: &str,
mut rx: broadcast::Receiver<(String, Vec<String>)>
) -> anyhow::Result<()> {
// We are not reading encrypted messages, so we don't care about session persistence
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
client
.matrix_auth()
.login_username(username, password)
.initial_device_display_name("bender v0.1.0")
.await?;
println!("logged in as {username}");
client.add_event_handler(on_stripped_state_member);
#[allow(unused)]
let default_room = client
.join_room_by_id(<&RoomId>::try_from(default_room_id).expect("Invalid default room ID"))
.await.expect("Failed to join room");
// make sure we already re-joined rooms before sending events
let sync_token = client.sync_once(SyncSettings::default()).await?.next_batch;
// since we called `sync_once` before we entered our sync loop
// we must pass that sync token to `sync`
let settings = SyncSettings::default().token(sync_token);
loop {
tokio::select! {
Ok(_) = client.sync(settings.clone()) => return Ok(()),
Ok((msg, rooms)) = rx.recv() => {
let msg = &msg.clone();
for r in rooms {
let room = client.join_room_by_id(<&RoomId>::try_from(r.as_str())
.expect("invalid room ID")
).await.expect(format!("Failed to join room {}", r).as_str());
room.send(RoomMessageEventContent::text_html(msg.clone(), msg)).await
.expect("Failed to send matrix event");
}
}
}
}
}

88
src/state.rs Normal file
View File

@ -0,0 +1,88 @@
/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::{
error::Error,
sync::{Arc, Mutex}
};
use tokio::fs;
use chrono::{ DateTime, Utc };
use tracing::{ info };
use serde_yaml::mapping::Mapping as FeedReaderState;
#[derive(Debug)]
pub struct FeedReaderStateDb {
state: Mutex<FeedReaderState>,
filename: String
}
impl FeedReaderStateDb {
pub async fn new(filename: &str)
-> Result<Arc<Self>, Box<dyn Error>> {
info!("loading {}", filename);
let db = Arc::new(FeedReaderStateDb {
state: Mutex::new(Self::load(filename).await?),
filename: String::from(filename)
});
Ok(db.clone())
}
pub async fn set(
&self,
uri: &str,
dt: DateTime<Utc>
) -> () {
{
info!("Updating feed reader state");
self.state.lock().unwrap().insert(uri.into(), dt.timestamp().into());
}
info!("Persisting feed reader state");
self.persist().await.unwrap();
}
#[tracing::instrument(ret, level="debug")]
pub fn get(&self, uri: &str) -> Option<DateTime<Utc>> {
info!("Retrieving state for feed {}", uri);
match self.state.lock().unwrap().get(uri) {
Some(t) => DateTime::from_timestamp((*t).as_i64().unwrap(), 0),
None => None
}
}
#[tracing::instrument(ret, level="debug")]
async fn load(state_file: &str) -> Result<FeedReaderState, Box<dyn Error>> {
let serialized_state = fs::read_to_string(state_file).await?;
let state: FeedReaderState = serde_yaml::from_str(&serialized_state)?;
Ok(state)
}
async fn persist(&self) -> Result<(), Box<dyn Error>> {
let serialized_state = serde_yaml::to_string(&self.state)?;
fs::write(self.filename.clone(), &serialized_state.as_bytes()).await?;
Ok(())
}
}