* Extend the configuration format in order to allow access_token, user_id and device_id instead of username and password * Move the matrix login logic outside of login_and_sync for clarity * Add support for access token based session resuming instead of logging in every time (thus creating a new device each time the service starts up) * Delay the startup of feed reader loops until after the matrix module has had a chance to actually check authentication This change is quite involved and there are a few caveats, namely an intentional race condition between the feed reader loops and matrix authentication, as well as significantly different behaviors depending on which authentication scheme is being used: password based authentication requires an API call while resuming a session using an access token does not.
194 lines
6.1 KiB
Rust
194 lines
6.1 KiB
Rust
/**
|
|
* matrix-feedbot v0.1.0
|
|
*
|
|
* Copyright (C) 2024 The 1312 Media Collective
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
use matrix_sdk::{
|
|
matrix_auth::{
|
|
MatrixAuth,
|
|
MatrixSession,
|
|
MatrixSessionTokens
|
|
},
|
|
SessionMeta,
|
|
config::SyncSettings,
|
|
ruma::events::room::{
|
|
member::StrippedRoomMemberEvent,
|
|
message::RoomMessageEventContent
|
|
},
|
|
ruma::{RoomId, OwnedUserId, device_id},
|
|
Client, Room
|
|
};
|
|
|
|
use tokio::{
|
|
time::{sleep, Duration},
|
|
sync::{broadcast, Notify},
|
|
task
|
|
};
|
|
|
|
use std::sync::Arc;
|
|
|
|
use tracing::{error, info};
|
|
use std::error::Error;
|
|
|
|
use crate::config::AuthConfig;
|
|
|
|
async fn on_stripped_state_member(
|
|
room_member: StrippedRoomMemberEvent,
|
|
client: Client,
|
|
room: Room,
|
|
) {
|
|
if room_member.state_key != client.user_id().unwrap() {
|
|
return;
|
|
}
|
|
|
|
tokio::spawn(async move {
|
|
info!("Autojoining room {}", room.room_id());
|
|
let mut delay = 2;
|
|
|
|
while let Err(err) = room.join().await {
|
|
// retry autojoin due to synapse sending invites, before the
|
|
// invited user can join for more information see
|
|
// https://github.com/matrix-org/synapse/issues/4345
|
|
error!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
|
|
|
|
sleep(Duration::from_secs(delay)).await;
|
|
delay *= 2;
|
|
|
|
if delay > 3600 {
|
|
error!("Can't join room {} ({err:?})", room.room_id());
|
|
break;
|
|
}
|
|
}
|
|
info!("Successfully joined room {}", room.room_id());
|
|
});
|
|
}
|
|
|
|
async fn send_to_room(
|
|
msg: &String,
|
|
room: Room,
|
|
) -> Result<(), Box<dyn Error>> {
|
|
|
|
info!("Joining room {}", room.room_id());
|
|
match room.client().join_room_by_id(room.room_id()).await {
|
|
Ok(_) => {
|
|
info!("Posting to room {}", room.room_id());
|
|
room.send(RoomMessageEventContent::text_html(msg.clone(), msg)).await?;
|
|
return Ok(());
|
|
},
|
|
Err(e) => {
|
|
error!("Failed to join room {e:?}");
|
|
return Err(e.into());
|
|
}
|
|
};
|
|
}
|
|
|
|
async fn login(
|
|
auth_config: &AuthConfig,
|
|
matrix_auth: MatrixAuth
|
|
) -> anyhow::Result<()> {
|
|
match auth_config {
|
|
AuthConfig::PasswordAuthConfig{username, password} => {
|
|
matrix_auth.login_username(username, password)
|
|
.initial_device_display_name("bender v0.1.5").await?;
|
|
},
|
|
AuthConfig::TokenAuthConfig{user_id, device_id, access_token} => {
|
|
matrix_auth.restore_session(
|
|
MatrixSession {
|
|
meta: SessionMeta {
|
|
user_id: <OwnedUserId>::try_from(user_id.as_str())?,
|
|
device_id: device_id!(device_id.as_str()).to_owned(),
|
|
},
|
|
tokens: MatrixSessionTokens {
|
|
access_token: access_token.to_owned(),
|
|
refresh_token: None,
|
|
}
|
|
}
|
|
).await?;
|
|
},
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn login_and_sync(
|
|
homeserver_url: String,
|
|
auth_config: &AuthConfig,
|
|
default_room_id: &str,
|
|
ready: Arc<Notify>,
|
|
mut rx: broadcast::Receiver<(String, Vec<String>)>
|
|
) -> anyhow::Result<()> {
|
|
// We are not reading encrypted messages, so we don't care about session persistence
|
|
let client = Client::builder().homeserver_url(homeserver_url).build().await?;
|
|
|
|
login(auth_config, client.matrix_auth()).await?;
|
|
|
|
client.add_event_handler(on_stripped_state_member);
|
|
|
|
#[allow(unused)]
|
|
let default_room = client
|
|
.join_room_by_id(<&RoomId>::try_from(default_room_id).expect("Invalid default room ID"))
|
|
.await.expect("Failed to join room");
|
|
|
|
// make sure we already re-joined rooms before sending events
|
|
let sync_token = client.sync_once(SyncSettings::default()).await?.next_batch;
|
|
// since we called `sync_once` before we entered our sync loop
|
|
// we must pass that sync token to `sync`
|
|
let settings = SyncSettings::default().token(sync_token);
|
|
|
|
// make sure all waiters have had a chance to register.
|
|
// Ideally the notified futures would be created before spawning
|
|
// the feed reader tasks but that would require the Notified structs to
|
|
// live for 'static, which I dislike.
|
|
task::yield_now().await;
|
|
|
|
info!("Matrix is ready, notifying waiters");
|
|
ready.notify_waiters();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
res = client.sync(settings.clone()) => match res {
|
|
Ok(_) => return Ok(()),
|
|
Err(e) => {
|
|
error!("Sync error: {e:?}");
|
|
()
|
|
}
|
|
},
|
|
recv = rx.recv() => match recv {
|
|
Ok((msg, rooms)) => {
|
|
|
|
let msg = &msg.clone();
|
|
for r in rooms {
|
|
match client.get_room(<&RoomId>::try_from(r.as_str())
|
|
.expect(format!("Invalid Room ID: {}", r).as_str())) {
|
|
|
|
Some(room) => match send_to_room(msg, room).await {
|
|
Ok(_) => info!("Done sending to room {}", r),
|
|
Err(e) => error!("Cannot send to room {}: {e:?}", r),
|
|
},
|
|
|
|
None => error!("Room {} not found in matrix state store", r)
|
|
};
|
|
}
|
|
},
|
|
Err(e) => {
|
|
panic!("Broadcast channel is lagging: {e:?}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|