/**
* matrix-feedbot v0.1.0
*
* Copyright (C) 2024 The 1312 Media Collective
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
use matrix_sdk::{
config::SyncSettings,
ruma::events::room::{
member::StrippedRoomMemberEvent,
message::RoomMessageEventContent
},
ruma::RoomId,
Client, Room
};
use tokio::{
time::{sleep, Duration},
sync::broadcast
};
use tracing::{error, info};
use std::error::Error;
async fn on_stripped_state_member(
room_member: StrippedRoomMemberEvent,
client: Client,
room: Room,
) {
if room_member.state_key != client.user_id().unwrap() {
return;
}
tokio::spawn(async move {
info!("Autojoining room {}", room.room_id());
let mut delay = 2;
while let Err(err) = room.join().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
error!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
error!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
info!("Successfully joined room {}", room.room_id());
});
}
async fn send_to_room(
msg: &String,
room: Room,
) -> Result<(), Box> {
info!("Joining room {}", room.room_id());
match room.client().join_room_by_id(room.room_id()).await {
Ok(_) => {
info!("Posting to room {}", room.room_id());
room.send(RoomMessageEventContent::text_html(msg.clone(), msg)).await?;
return Ok(());
},
Err(e) => {
error!("Failed to join room {e:?}");
return Err(e.into());
}
};
}
pub async fn login_and_sync(
homeserver_url: String,
username: &str,
password: &str,
default_room_id: &str,
mut rx: broadcast::Receiver<(String, Vec)>
) -> anyhow::Result<()> {
// We are not reading encrypted messages, so we don't care about session persistence
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
client
.matrix_auth()
.login_username(username, password)
.initial_device_display_name("bender v0.1.5")
.await?;
info!("logged in as {username}");
client.add_event_handler(on_stripped_state_member);
#[allow(unused)]
let default_room = client
.join_room_by_id(<&RoomId>::try_from(default_room_id).expect("Invalid default room ID"))
.await.expect("Failed to join room");
// make sure we already re-joined rooms before sending events
let sync_token = client.sync_once(SyncSettings::default()).await?.next_batch;
// since we called `sync_once` before we entered our sync loop
// we must pass that sync token to `sync`
let settings = SyncSettings::default().token(sync_token);
loop {
tokio::select! {
res = client.sync(settings.clone()) => match res {
Ok(_) => return Ok(()),
Err(e) => {
error!("Sync error: {e:?}");
()
}
},
recv = rx.recv() => match recv {
Ok((msg, rooms)) => {
let msg = &msg.clone();
for r in rooms {
match client.get_room(<&RoomId>::try_from(r.as_str())
.expect(format!("Invalid Room ID: {}", r).as_str())) {
Some(room) => match send_to_room(msg, room).await {
Ok(_) => info!("Done sending to room {}", r),
Err(e) => error!("Cannot send to room {}: {e:?}", r),
},
None => error!("Room {} not found in matrix state store", r)
};
}
},
Err(e) => {
panic!("Broadcast channel is lagging: {e:?}");
}
}
}
}
}