diff options
-rw-r--r-- | Cargo.lock | 3 | ||||
-rw-r--r-- | Cargo.toml | 15 | ||||
-rw-r--r-- | src/main.rs | 67 | ||||
-rw-r--r-- | src/websocket.rs | 91 |
4 files changed, 105 insertions, 71 deletions
@@ -1174,9 +1174,10 @@ dependencies = [ [[package]] name = "rocket_test" -version = "0.3.0" +version = "0.3.1" dependencies = [ "chrono", + "rand_chacha", "rocket", "serde", "serde_json", @@ -1,14 +1,15 @@ [package] name = "rocket_test" -version = "0.3.0" +version = "0.3.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = "1.0.130" -serde_json = {version = "1.0.93", features = ["std"]} -chrono = "0.4.23" -sqlite = "0.30.4" -rocket = {version = "0.5.0-rc.2", features = ["json"]} -tungstenite = "0.18.0"
\ No newline at end of file +serde = "1.0.130" # Serialization +serde_json = {version = "1.0.93", features = ["std"]} # Json serialization +chrono = "0.4.23" # Time and date +sqlite = "0.30.4" # Database +rocket = {version = "0.5.0-rc.2", features = ["json"]} # Web endpoints +tungstenite = "0.18.0" # Websockets +rand_chacha = "0.3.1" # Session keys
\ No newline at end of file diff --git a/src/main.rs b/src/main.rs index fd76ec7..f2e9c56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,12 @@ #[macro_use] extern crate rocket; mod database; +mod websocket; use rocket::serde::json::Json; use rocket::State; use std::sync::{Mutex, Arc}; use database::types::*; -use tungstenite::accept; struct SharedDB { @@ -159,70 +159,12 @@ fn rocket() -> _ { let websocketdb = Arc::clone(&database); let rocketdb = Arc::clone(&database); + // Websockets std::thread::spawn(move || { - let listen = std::net::TcpListener::bind("127.0.0.1:8001").unwrap(); - - loop { - for connection in listen.incoming() { - - let wsconnectiondb = Arc::clone(&websocketdb); - std::thread::spawn(move || { - let mut websocket = accept(connection.unwrap()).unwrap(); - let mut newest = 0; - - loop { - // TX - let mut db = wsconnectiondb.lock().unwrap(); - let local_newest = match db.get_message_newest_id() { - Some(n) => n.into(), - None => 0 - }; - drop(db); - - if local_newest > newest { - let m = serde_json::to_string(&WebSocketMessage::NewMessage).unwrap(); - websocket.write_message(tungstenite::Message::text(m)).unwrap(); - newest = local_newest; - } - - std::thread::sleep(std::time::Duration::from_millis(100)); - - // RX - // let _ = match websocket.read_message() { - // Ok(n) => n, - // Err(_) => {println!("closed"); break} - // }; - - // if msg.is_binary() { - // if msg.len() > 0 { - // match msg.into_data()[0..=1].into() { - // BinaryMessage::GetMessageNewestId => { - // let db = wsconnectiondb.lock().unwrap(); - // let m = db.get_message_newest_id(); - // drop(db); - - // let m = serde_json::to_string(&m).unwrap(); - // websocket.write_message(tungstenite::Message::text(m)).unwrap(); - // }, - // // Return error of unsupported data - // _ => (), - // } - // } else { - // // Return error of empty data - // websocket.write_message(msg).unwrap(); - // } - - // } else if msg.is_text() { - // println!("{}", msg.into_text().unwrap()); - // } - - } - - }); - } - } + websocket::websocket(websocketdb); }); + // Rocket rocket::build() .mount("/api", routes![ get_message, @@ -238,7 +180,6 @@ fn rocket() -> _ { get_message_list, set_user_data, set_user_status, - ]) .mount("/", routes![api_index]) .manage(SharedDB{sdb: rocketdb}) diff --git a/src/websocket.rs b/src/websocket.rs new file mode 100644 index 0000000..19038ff --- /dev/null +++ b/src/websocket.rs @@ -0,0 +1,91 @@ +use tungstenite::accept; +use std::net; +use super::*; + +// Setup a new listener for websockets +// Iterate through them and create a new worker for each +pub fn websocket(db: Arc<Mutex<database::Database>>) { + let listen = std::net::TcpListener::bind("127.0.0.1:8001").unwrap(); + + loop { + for connection in listen.incoming() { + + let wsconnectiondb = Arc::clone(&db); + std::thread::spawn(move || { + worker(wsconnectiondb, connection.unwrap()) + }); + } + } +} + +// Accept the connection and send a message to the client +// if a new message has been recieved by the server +fn worker(db: Arc<Mutex<database::Database>>, connection: net::TcpStream) { + let mut websocket = accept(connection).unwrap(); + let mut newest = 0; + + // Main worker loop + loop { + // TX + + // Get the newest message id + let mut db = db.lock().unwrap(); + let local_newest = match db.get_message_newest_id() { + Some(n) => n.into(), + None => 0 + }; + drop(db); + + // If there is a newer message then send a message to the client + // and update the newest message + if local_newest > newest { + let m = serde_json::to_string(&WebSocketMessage::NewMessage).unwrap(); + + // Close the client connection if we cannot send a message + match websocket.write_message(tungstenite::Message::text(m)) { + Ok(_) => (), + Err(_) => break, + }; + newest = local_newest; + } + + // Sleep for 100ms to prevent cpu usage + std::thread::sleep(std::time::Duration::from_millis(100)); + + // RX + // let _ = match websocket.read_message() { + // Ok(n) => n, + // Err(_) => {println!("closed"); break} + // }; + + // // If the message is binary and contains a two byte header + // // then convert that header into a function call + // // and the rest of the message into json + // if msg.is_binary() { + // if msg.len() > 1 { + // match msg.into_data()[0..=1].into() { + // BinaryMessage::GetMessageNewestId => { + // let db = wsconnectiondb.lock().unwrap(); + // let m = db.get_message_newest_id(); + // drop(db); + + // let m = serde_json::to_string(&m).unwrap(); + // websocket.write_message(tungstenite::Message::text(m)).unwrap(); + // }, + // // Return error of unsupported data + // _ => (), + // } + + // // Return error of empty data + // } else { + // websocket.write_message(msg).unwrap(); + // } + + // // Otherwise if the message is text then print the text + // } else if msg.is_text() { + // println!("{}", msg.into_text().unwrap()); + // } + + } + +}
\ No newline at end of file |