Чат в терминале на Rust
Всем привет! Эта статья — туториал по написанию небольшого чат сервиса (серверное и клиентское приложения) на Rust, используя функционал TCP сокетов из стандартной библиотеки Rust. Сам чат для пользователя будет выглядеть, как приложение в терминале. Полный код приложений есть в гитхабе.
Демонстрация работы чата
Начало
Много объяснений будет записано в качестве комментариев к коду.
У нас будет 2 приложения: сервер, который будет принимать сообщения и раздавать их всем пользователям, подключённым к чату, и клиент, который будет показывать юзеру сообщения полученные от сервера и отправлять серверу сообщения от юзера. Создать шаблон для этих приложений можно через cargo new
. После этого нашим приложениям надо прописать базовые состояния и типы, которые они будут использовать на протяжении всей свой работы.
Для сервера начнём со структуры Settings. Она будет парсить и сохранять аргументы пользователя при запуске программы. Для парсинга будет использоваться clap.
Код структуры Settings на сервере (Файл server/src/settings.rs)
// Файл server/src/settings.rs
// Импортирование нужного трейта из clap
use clap::Parser;
// Объявление того, что будет парсится в качестве аргументов.
// С помощью derive макроса можно навесить на структуру макрос импортированного
// трейта в качестве атрибута этой структуры. В нашем случае таким образом
// будет сгенерирован нужный impl c функционалом Parser'а для структуры Args.
#[derive(Parser)]
pub struct Args {
// Макрос arg так же импортируется из clap автоматически
// и позволяет объявить поле аргументом и задать ему нужные свойства.
// short означает, что аргумент можно будет вписать сокращённо
// вот так "-p 8080". long, что можно использовать
// полное название "--port 8080". А help это просто вспомогательный
// текст, который будет показываться при запуске приложения с --help
#[arg(short, long, help = "Port that the server will serve")]
pub port: u16,
}
// Трейт Debug позволяет удобно выводить структуру через print в консоль, а
// Clone добавляет функционал для клонирования инстансов структуры.
// В derive мы опять передаём макросы этих трейтов чтобы они сгенерировали нам
// impl'ы с реализациями Debug и Clone, чтобы вручную не писать это.
#[derive(Debug, Clone)]
pub struct Settings {
pub port: u16,
}
// Внутри impl прописываются методы структуры
impl Settings {
pub fn new() -> Settings {
// используем метод от трейта Parser
let args = Args::parse();
// Создаём инстанс структуры Settings и возвращаем его
Settings {
port: args.port,
}
}
}
Добавим создание объекта Settings в наш main.rs. После этого при запуске приложения будут запрашиваться аргументы, а нашем случае — порт сервера.
Код server/src/main.rs
// Файл server/src/main.rs
// anyhow это небольшая библиотека, которая добавляет enum Result,
// почти аналогичный Result'у из std, с единственным отличием, что этот
// может принимать любую ошибку
use anyhow::Result;
// Импортирование нашей новой структуры
use settings::Settings;
// Обязательное указание модуля, иначе файл виден не будет
mod settings;
fn main() -> Result<()> {
// Создание инстанса нстроек
let settings = Settings::new();
// возвращение Result::Ok() значения
Ok(())
}
Следующее, что нужно сделать это структуру состояния (State) нашего серверного приложения. Так как сервер, будет работать сразу в несколько потоков, то и состояние должно поддерживать многопоточность. Для этого внутри структуры данные завёрнуты в Arc и Mutex, подробнее в коде.
Код структуры State на сервере (Файл server/src/state.rs)
// Файл server/src/state.rs
use std::{
// Arc (Atomic Reference Counter) это smart pointer, который реализует
// множественное владение переменной. То-есть, грубо говоря, данные на
// которые указывает Arc не исчезнут пока есть
// хотя бы один клон этого Arc'а. По сути, то же самое делает и
// Rc (Reference Counter), но Rc не поддерживает многопоточность.
sync::Arc,
collections::HashMap
};
// Это аналог Mutex'а из стандартной библиотеки, но работающий намного быстрее.
// Сам Mutex это структура, которая блокируется для доступа из других потоков,
// если в одном из них она уже используется. И соответственно после использования
// она становится доступна для других потоков. Это нужно для того чтобы не было
// рассинхрона данных между потоками.
use parking_lot::{Mutex, MutexGuard};
use crate::settings::Settings;
// Каждый юзер после подключения будет записываться в стейт,
// структура UserData описывает, что будет хранить в себе запись
// о подключенном юзере.
#[derive(Debug, Clone)]
pub struct UserData {
// Ip адрес подключённого пользователя + его сокета
pub address: String,
}
#[derive(Debug, Clone)]
pub struct StateData {
// Настройки приложения, которые мы описали ранее
pub settings: Settings,
// HashMap'а хранящая данные о подключённых юзерах, где ключ это никнейм,
// значение это UserData
pub users: HashMap,
}
// Arc, как я писал выше реализует множественно владение данными, но он не
// позволяет эти данные менять. Для этого чтобы это было возможно и безопасно мы
// дополнительно оборачиваем StateData в Mutex.
pub struct State(Arc>);
impl State {
pub fn new(settings: Settings) -> State {
State(
Arc::new(Mutex::new(StateData {
settings,
users: HashMap::new()
}))
)
}
// Метод для упрощения доступа к данным. Он блокирует Mutex для работы с
// данными только в текущем потоке. И возвращает MutexGuard. Пока MutexGuard
// жив другие потоки не смогут заблокировать данные для себя.
pub fn get(&self) -> MutexGuard {
self.0.lock()
}
}
// Реализация трейта Clone для State. Просто повесить макрос трейта Clone
// через derive не получится, потому что копировать нужно внутренний Arc.
// Поэтому необходимые для Clone методы реализуем вручную.
impl Clone for State {
fn clone(&self) -> Self {
State(Arc::clone(&self.0))
}
fn clone_from(&mut self, source: &Self) {
*self = source.clone();
}
}
Теперь так же перенесём State в нашу main функцию.
Обновлённый код функции main для сервера (Файл server/src/main.rs)
// Файл server/src/main.rs
use anyhow::Result;
use settings::Settings;
use state::State; // +
mod settings;
mod state; // +
fn main() -> Result<()> {
let settings = Settings::new();
let state = State::new(settings)); // +
Ok(())
}
Для серверного приложения состояние и базовые параметры готовы, тоже самое нужно прописать для клиента.
Код структуры Settings для клиента (Файл client/src/settings.rs)
// Файл client/src/settings.rs
use clap::Parser;
#[derive(Parser)]
pub struct Args {
// Адрес сервера с портом, к которому будет производится подключение
#[arg(short, long, help = "Server address")]
pub address: String,
}
#[derive(Debug, Clone)]
pub struct Settings {
pub server_address: String,
}
impl Settings {
pub fn new() -> Settings {
let args = Args::parse();
Settings {
server_address: args.address
}
}
}
State для клиента немного отличается, но суть та же. Структура, чтобы хранить состояние приложения, с возможностью раздачи его на несколько потоков.
Код структуры State для клиента (Файл client/src/state.rs)
// Файл client/src/state.rs
use std::{
sync::{
// mpsc нужно для передачи сообщений по каналу между несколькими потоками.
// В нашем случае будут два потока (главный и созданный), один из которых
// будет передавать второму сигналы по каналу mpsc.
mpsc::{
Sender,
Receiver,
self
},
Arc
},
io::{
self,
// BufReader будем использовать для чтения данных с tcp сокета.
// Он работает по такому принципу: делает редкие, но объемные read
// запросы по файл дескриптору и далее мы можем удобно, что он прочитал.
// Для чтения строк из tcp сокета это очень хорошо подходит.
BufReader,
// Два трейта. Один для чтения из BufReader'а, другой для записи в файл
// (в нашем случае в сокет).
BufRead,
Write
}
};
use parking_lot::Mutex;
pub struct State {
// Ник, который юзер введёт при запуске приложения
pub username: String,
// Принимающая часть канала mpsc. В качестве типа передаваемых данных
// указан unit (пустой tuple), так как нам нужен будет сам факт наличия
// нового сообщения, его внутренности интересовать не будут.
// Указывается как Option, потому что в будет передана другому потоку и
// после этого доступна не будет и тут будет храниться None.
pub chat_reload_receiver: Option>,
// Часть канала mpsc, которая отправляет информацию принимающему потоку.
pub chat_reload_sender: Sender<()>,
// В user_input'е будет лежать текущий ввод пользователя. Пример:
// юзер пишет "привет", но не отправляет его в чат. "приве" лежит
// в user_input'е. Обычно такая реализация не требуется, но у нас часто будет
// полностью перерисовываться чат, и при этом будет пропадать дефолтный
// ввод юзера. Поэтому чтобы это ввод не исчезал, приходится хранить его
// отдельно. Подробнее об этом будет позже, когда перейдём к месту
// реализации ввода сообщения.
pub user_input: Arc>,
// Массив, полученных с сервера сообщений.
pub messages: Arc>>
}
impl State {
pub fn new() -> io::Result {
// Создание mpsc канала. Так как функция вернёт tuple, его можно
// сразу разбить на две переменные
let (sx, rx) = mpsc::channel::<()>();
let user_input = Arc::new(Mutex::new(String::new()));
let messages = Arc::new(Mutex::new(Vec::::new()));
let mut instance = State {
username: String::new(),
chat_reload_receiver: Some(rx),
chat_reload_sender: sx,
user_input,
messages,
};
// Вызов метода для получения username'а
instance.read_username()?;
Ok(instance)
}
// Метод, который запрашивает у user'а ввод его ника и
// записывает полученные данные в state.
fn read_username(&mut self) -> io::Result<()> {
// Для некоторых манипуляций с терминалом, будем использовать termion.
// Библиотека позволяет "стирать" все из терминала, красить текст,
// менять режим у stdout'а (об этом позже) и тд.
// В данном случае нам нужно очистить терминал.
println!("{}", termion::clear::All);
print!("Username: ");
// Макрос print! добавляет в буфер текст, но не выполняет flush
// и из-за этого после простого выполнения print! в консоли вы
// ничего не увидите. Чтобы это исправить нужно вызвать flush вручную.
std::io::stdout().flush()?;
let mut username = String::new();
// Чтение строки из stdin и запись содержимого в username
// через передачу мутабельной ссылки на username в read_line.
io::stdin().read_line(&mut username)?;
// Обрезаем с начала и конца ненужные символы
// (пробелы, перенос строки и тд) и записываем в наш объект State.
self.username = username.trim().to_owned();
// Снова всё очищаем.
println!("{}", termion::clear::All);
Ok(())
}
}
Код функции main для клиента (Файл client/src/main.rs)
// Файл client/src/main.rs
use std::io;
use crate::{
settings::Settings,
state::State
};
mod settings;
mod state;
fn main() -> io::Result<()> {
let settings = Settings::new();
let state = State::new()?;
Ok(())
}
Прописывание общих типов для клиента и сервера
Теперь когда у нас готовы базовые вещи, можно начинать делать логику.
И так, у нас сервер и клиент будут передавать друг другу сообщения в одном и том же формате. Эти сообщения называются «сигналы». Сам формат сигналов похож на формат передаваемых данных в http, только очень сильно упрощен.
В начале сигнала идут хедеры (список ниже). Хедеры разделяются символами »\r\n».
/*
Кто отправляет Поле Значение
USER USERNAME Строка
SERVER AUTH_STATUS "ACCEPTED", "DECLINED"
USER+SERVER WITH_MESSAGE Нет
USER+SERVER SIGNAL_TYPE "CONNECTION", "NEW_MESSAGE"
SERVER SERVER_MESSAGE Нет
*/
/*
Определения хедеров
USERNAME Имя пользователя, от которого пришло сообщение
AUTH_STATUS Статус авторизации
WITH_MESSAGE В сигнале есть сообщение
SIGNAL_TYPE Тип сигнала: запрос на авторизацию или сообщение
SERVER_MESSAGE Серверное сообщение
*/
Потом в случае если в хедерах сигнала есть «WITH_MESSAGE», то после хедеров идет ещё один разделитель »\r\n» и начинается сообщение, которое заканчивается символами »\r\n\r\n».
Нам нужно уметь парсить сигналы и легко формировать свои. Для этого нужно прописать ряд типов, которые будут иметь вспомогательные методы, которыми мы будем пользоваться.
Перед этим создадим нашу кастомную ошибку, которую мы будем отдавать при возникновении проблем с парсингом.
Код кастомной ошибки парсинга (начало файла с типами
// Файл /src/types.rs
use std::{
// Импорт утилит для форматирования и вывода строк
fmt,
// Импорт трейта Error (все ошибки как правило должны его имлементить)
error::Error
};
// Трейт Debug обязателен для Error, поэтому навешиваем
// макрос Debug на нашу структуру.
#[derive(Debug)]
pub struct ParseSignalDataError;
// impl Error для структуры. Внутри при желании можно не
// реализовывать методы, потому что все они реализованы по умолчанию.
impl Error for ParseSignalDataError {}
// Error так же требует реализации трейта fmt::Display
impl fmt::Display for ParseSignalDataError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "invalid signal data")
}
}
Можно начинать постепенно реализовывать нужные типы. Первым из них будет простенькие enum SignalType. Он описывает 2 варианта возможных типов сигнала: Connection (подключение, отправляется клиентом серверу), NewMessage (новое сообщение, клиент и сервер отправляют их друг другу).
Код enum«а SignalType (продолжение файла с типами
// Файл /src/types.rs
// ...
#[derive(Debug, Clone, Copy)]
pub enum SignalType {
Connection,
NewMessage,
}
// Трейт FromStr идет из стандартной библиотеки и добавляет функцию
// для создания нужного типа из строки.
impl FromStr for SignalType {
type Err = ParseSignalDataError;
fn from_str(s: &str) -> Result {
match s {
"CONNECTION" => Ok(SignalType::Connection),
"NEW_MESSAGE" => Ok(SignalType::NewMessage),
_ => Err(ParseSignalDataError)
}
}
}
// Трейт ToString также идёт из стандартной библиотеки и добавляет функцию
// для создания уже строки из типа.
impl ToString for SignalType {
fn to_string(&self) -> String {
match self {
// .to_owned() делает из заимствованного типа (ссылки), владеющий.
// В данном случае делает из &str String. Подробно об этом
// останавливаться не буду. Лучше отдельно почитать статьи про
// ownership модель в Rust.
SignalType::Connection => "CONNECTION".to_owned(),
SignalType::NewMessage => "NEW_MESSAGE".to_owned(),
}
}
}
Далее идёт ещё один простой enum AuthStatus. Он содержит значения, которые возвращает сервер в ответ на попытку авторизации юзером. ACCEPTED — авторизация прошла успешно, DENIED — авторизация отклонена. После DENIED соединение сбрасывается.
Код enum«а AuthStatus (продолжение файла с типами
// Файл /src/types.rs
// ...
#[derive(Debug, Clone, Copy)]
pub enum AuthStatus {
ACCEPTED,
DENIED
}
impl FromStr for AuthStatus {
type Err = ParseSignalDataError;
fn from_str(s: &str) -> Result {
match s {
"ACCEPTED" => Ok(AuthStatus::ACCEPTED),
"DENIED" => Ok(AuthStatus::DENIED),
_ => Err(ParseSignalDataError)
}
}
}
impl ToString for AuthStatus {
fn to_string(&self) -> String {
match self {
AuthStatus::ACCEPTED => "ACCEPTED".to_owned(),
AuthStatus::DENIED => "DENIED".to_owned()
}
}
}
Теперь нужно реализовать enum SignalHeader. Это уже тип поинтереснее, он содержит хедеры сигнала и значения, которые они передают. Подробнее про то, какие есть хедеры и какие значения имеют я писал выше, поэтому на этом не буду заострять особо внимание.
Код enum«а SignalHeader (продолжение файла с типами
// Файл /src/types.rs
// ...
pub enum SignalHeader {
Username(String),
AuthStatus(AuthStatus),
SignalType(SignalType),
WithMessage,
ServerMessage
}
impl FromStr for SignalHeader {
type Err = ParseSignalDataError;
fn from_str(s: &str) -> Result {
let (header, value) = s.split_once(':').unwrap_or((s, s));
match header {
"USERNAME" => Ok(SignalHeader::Username(value.trim().to_owned())),
"AUTH_STATUS" => {
match AuthStatus::from_str(value.trim()) {
Ok(v) => return Ok(SignalHeader::AuthStatus(v)),
Err(_) => Err(ParseSignalDataError)
}
},
"SIGNAL_TYPE" => {
match SignalType::from_str(value.trim()) {
Ok(v) => return Ok(SignalHeader::SignalType(v)),
Err(_) => Err(ParseSignalDataError)
}
}
"WITH_MESSAGE" => Ok(SignalHeader::WithMessage),
"SERVER_MESSAGE" => Ok(SignalHeader::ServerMessage),
_ => Err(ParseSignalDataError)
}
}
}
impl ToString for SignalHeader {
fn to_string(&self) -> String {
match self {
SignalHeader::Username(v) => format!("USERNAME: {v}\r\n"),
SignalHeader::AuthStatus(v) => format!("AUTH_STATUS: {}\r\n", v.to_string()),
SignalHeader::SignalType(v) => format!("SIGNAL_TYPE: {}\r\n", v.to_string()),
SignalHeader::WithMessage => "WITH_MESSAGE\r\n".to_owned(),
SignalHeader::ServerMessage => "SERVER_MESSAGE\r\n".to_owned()
}
}
}
Осталось создать самую важную для типов структуры — структуру, которая будет формировать и парсить сигнал. Она самая объёмная в плане кода в файле types.rs. Хотя сам код конечно тривиальный.
Код структуры SignalData (конец файла с типами
// Файл /src/types.rs
// ...
#[derive(Debug, Clone)]
pub struct SignalData {
pub username: Option,
pub password: Option,
pub key: Option,
pub auth_status: Option,
pub signal_type: Option,
pub with_message: bool,
pub message: Option,
pub server_message: bool
}
impl SignalData {
// Метод создания нового инстанса SignalData. В него передаётся
// массив хедеров, который будет иметь сигнал и сообщение.
// Так как сообщения может не быть, то типом сообщения является
// enum Option. Этот enum идет с растом из коробки и имеет два значения
// Some(v) и None.
pub fn new(headers: Vec, message: Option<&str>) -> SignalData {
let mut data = SignalData {
username: None,
password: None,
key: None,
auth_status: None,
signal_type: None,
with_message: false,
message: None,
server_message: false
};
for header in headers {
match header {
SignalHeader::Username(v) => {
data.username = Some(v);
},
SignalHeader::AuthStatus(v) => {
data.auth_status = Some(v);
},
SignalHeader::SignalType(v) => {
data.signal_type = Some(v);
},
SignalHeader::WithMessage => {
data.with_message = true;
data.message = Some(message.unwrap_or("").to_owned());
},
SignalHeader::ServerMessage => {
data.server_message = true;
}
}
}
data
}
}
impl FromStr for SignalData {
type Err = ParseSignalDataError;
// Метод для парсинга данных сигнала.
fn from_str(s: &str) -> Result {
let mut data = SignalData {
username: None,
password: None,
key: None,
auth_status: None,
signal_type: None,
with_message: false,
message: None,
server_message: false,
};
let splitted = s.split("\r\n");
for string in splitted {
let header = match SignalHeader::from_str(string) {
Ok(v) => v,
Err(_) => continue
};
match header {
SignalHeader::Username(v) => {
data.username = Some(v);
},
SignalHeader::AuthStatus(v) => {
data.auth_status = Some(v);
},
SignalHeader::SignalType(v) => {
data.signal_type = Some(v);
}
SignalHeader::WithMessage => {
data.with_message = true;
},
SignalHeader::ServerMessage => {
data.server_message = true;
}
}
}
if data.with_message {
let splitted = s.split_once("\r\n\r\n");
if let Some(v) = splitted {
if v.1.ends_with("\r\n\r\n") {
let string = v.1.to_owned();
data.message = Some(string[..string.len() - 4].to_owned());
}
else {
data.message = Some(v.1.to_owned());
}
}
else {
return Err(ParseSignalDataError);
}
}
if let None = data.signal_type {
return Err(ParseSignalDataError)
}
Ok(data)
}
}
impl ToString for SignalData {
// Метод для преобразования объекта сигнала в строку, для
// отправки клиенту/серверу.
fn to_string(&self) -> String {
let mut res_str = String::new();
if let Some(v) = &self.username {
res_str.push_str(&SignalHeader::Username(v.to_owned()).to_string());
}
if let Some(v) = &self.auth_status {
res_str.push_str(&SignalHeader::AuthStatus(v.clone()).to_string());
}
if let Some(v) = &self.signal_type {
res_str.push_str(&SignalHeader::SignalType(v.clone()).to_string());
}
if self.server_message {
res_str.push_str(&SignalHeader::ServerMessage.to_string());
}
if self.with_message {
if let Some(v) = &self.message {
res_str.push_str(&SignalHeader::WithMessage.to_string());
res_str.push_str("\r\n");
res_str.push_str(&v);
}
}
res_str.push_str("\r\n\r\n");
res_str
}
}
Принятие запросов от клиентов сервером
Для обмена сигналами клиенту и серверу нужно установить соединение. Для этого мы будем использовать TCP сокеты. Сервер при запуске будет открывать один listening (прослушивающий) сокет (сокет, который слушает запросы на соединение), а клиент будет запрашивать подключение к этому сокету.
Важно знать, что соединение клиента с сервером, после того, как прослушивающий сокет принял запрос, устанавливается не между прослушивающим сокетом сервера и подключающимся сокетом клиента, а между сокетом клиента и ещё одним новым созданным сервером сокетом, с которым и будет общаться клиент.
Вообщем, нам нужно сначала, чтобы сервер создавал при запуске прослушивающий сокет. Для этого создадим отдельную пустую структуру Service, в которой у нас будет единственный метод, отвечающий за запуск нашего сервиса. Метод будет принимать в себя аргументом state, который мы уже формируем при запуске. Оттуда он возьмёт для себя всё необходимое (пока что это только порт, который привяжется к прослушивающему сокету).
Код структуры Service для сервера (Файл server/src/service.rs)
// Файл server/src/service.rs
use std::{net::TcpListener, thread, sync::Arc};
use anyhow::Result;
use parking_lot::Mutex;
use crate::state::State;
pub struct Service;
impl Service {
pub fn run(state: State) -> Result<()> {
// Метод bind у TcpListener создаст tcp сокет, привяжет к нему
// адрес, который мы передали, и поставит его в режим прослушивания.
let listener = TcpListener::bind(format!("0.0.0.0:{}", state.get().settings.port))?;
// Метод incoming возвращает итератор, который на каждую
// следующую итерацию вызывает у tcpListener'а метод .accept().
// Этот метод блокирует текущий поток и ждёт пока не появится
// соединение, которое он сможет принять. Поэтому цикл
// не умрет, пока жив tcpListener.
for con in listener.incoming() {
// Для оптимизации работы с подключениями, каждое из них мы будем
// обрабатывать в отдельном потоке. Важно помнить, что переменные,
// которые используются в потоке, перемещаются в него и обратно не
// возвращаются)) По крайней мере в нашем случае.
thread::spawn(move || -> Result<()> {
// Пока что нам с самим соединением делать нечего, так что
// давайте просто выведем адрес подключенного сокета клиента.
// "?" нужен тут чтобы достать из enum'а Result успешный результат.
// Так как con это enum Result, а он содержит
// значения Ok(v) и Err(e), то "?" вернёт нам значение из Ok,
// то-есть TcpStream. В случае если con это Err, то
// ошибка передастся вверх функции родителю.
println!("{:?}", con?.peer_addr());
Ok(())
});
}
Ok(())
}
}
Осталось вызвать метод run у сервиса в функции main.
Обновленный код функции main для сервера (Файл server/src/main.rs)
// Файл server/src/main.rs
use anyhow::Result;
use service::Service;
use settings::Settings;
use state::State;
mod settings;
mod state;
mod service;
fn main() -> Result<()> {
let settings = Settings::new();
let state = State::new(settings);
Service::run(state)?; // +
Ok(())
}
Теперь наконец-то можно запустить сервер и увидеть что-то интересное. Впишите в терминале команду cargo run — --port 8080 (--port это аргумент, который мы определяли в структуре Settings). Дальше попробуйте отправить на этот порт запрос через curl (команда curl 127.0.0.1:8080) и вы должны увидеть в выводе сервера нечто подобное.
Вывод сервера
Он будет выводить вам адреса TCP сокетов клиентов. Так как у любого TCP сокета должен быть какой-то порт, в том числе у того, который производить запрос на подключение, система сама автоматически выдаст ему свободный порт. Поэтому в выводе вы будете видеть локалхост в связке со случайными портами.
Подключение клиента к серверу
Подключение к серверу на клиенте будет завернуто в структуру Connection. В ней будет храниться сам TcpStream (сокет подключённый к серверу) и BufReader, с помощью которого будем читать данные из сокета. Про него я немного рассказал выше в описание кода структуры State для клиента.
Connection будет выполнять 2 функции: подключение к серверу с отправлением всех нужных данных (юзернейма и типа сигнала) и чтение данных из сокета. Для этого будут реализованы методы new и read_signal соответственно.
Код структуры Connection для клиента (Файл client/src/connection.rs)
// Файл client/src/connection.rs
use std::{
net::TcpStream,
io::{
self,
Write,
Error,
ErrorKind,
BufRead,
BufReader
},
};
use crate::types::{
SignalType,
SignalHeader,
SignalData,
AuthStatus
};
pub struct Connection {
pub stream: TcpStream,
// инстанс BufReader'а нужен только для внутреннего пользования,
// поэтому делать его публичным нет смысла
reader: io::BufReader
}
impl Connection {
pub fn new(address: &str, username: &str) -> io::Result {
// Формирование сигнала. Передаётся тип сигнала и имя пользователя,
// который подключается.
let signal = SignalData::new(
vec![
SignalHeader::SignalType(SignalType::Connection),
SignalHeader::Username(username.to_owned())
],
None
);
// Метод connect у TcpStream создаст TCP сокет, и
// попытается подключиться к сокету, адрес которого
// был передан аргументом.
let mut connection = TcpStream::connect(address)?;
// Метод write_all запишет байты в сокет и они будут
// переданы сокету на другом конце, то-есть серверу.
connection.write_all(signal.to_string().as_bytes())?;
// Создаём инстанс BufReader'а, который будет читать данные с сокета.
let reader = BufReader::new(connection.try_clone()?);
// Создаём инстанс Connection
let mut instance = Connection {
stream: connection,
reader
};
// С сервера должен прийти AuthStatus. Ждём его и когда он приходит
// в зависимости от статуса, либо отдаём ошибку, либо отдаём
// инстанс Connection.
let data_from_socket = instance.read_signal()?;
if data_from_socket.contains(&AuthStatus::DENIED.to_string()) {
return Err(Error::new(ErrorKind::ConnectionAborted, "Access denied"));
}
return Ok(instance)
}
pub fn read_signal(&mut self) -> io::Result {
let mut res_line = String::new();
// Индикатор того, что хедеры были прочитаны
let mut headers_read = false;
loop {
let mut buf_line = String::new();
// Ридер читает по одной строчке и записывает в буферную переменную.
// Если находит ошибку, то всё вылетает. Если он читает 0 байт, то
// значит, что соединение потеряно (специфика чтения сокетов).
match self.reader.read_line(&mut buf_line) {
Err(e) => panic!("Got an error: {}", e),
Ok(0) => return Err(Error::new(ErrorKind::BrokenPipe, "Connection closed")),
Ok(_) => (),
};
res_line.push_str(&buf_line);
// Если он натыкается на "\r\n\r\n", то значит хедеры закончились
// и если нет хедера WITH_MESSAGE, то сигнал можно считать прочитанным
// и возвращать его. Если есть, то цикл уходить на второй круг
// и читает сообщение.
if res_line.ends_with("\r\n\r\n"){
if !res_line.contains(&SignalHeader::WithMessage.to_string()) || headers_read {
break;
}
headers_read = true;
}
}
Ok(res_line)
}
}
// Реализация трейта Clone для Connection. Просто навесить
// макрос для Clone не получится, потому что нужно чуть иначе клонировать
// TcpStream и BufReader.
impl Clone for Connection {
fn clone(&self) -> Self {
Connection {
stream: self.stream.try_clone().unwrap(),
reader: BufReader::new(self.stream.try_clone().unwrap())
}
}
}
Основная часть логики работы клиента будет описываться в структуре Service. В нашем серверном приложении тоже есть такая структура, но там на неё ложиться ответственность только за запуск приложения. В принципе на данном этапе в клиенте нам тоже требуется только описать запуск.
Начало файла client/src/service.rs
// Файл client/src/service.rs
use std::io;
use crate::{
settings::Settings,
state::State,
connection::Connection
};
pub struct Service {
pub connection: Connection,
pub settings: Settings,
pub state: State,
}
impl Service {
pub fn run(settings: Settings, state: State) -> io::Result<()> {
let connection = Connection::new(
&settings.server_address.to_owned(),
&state.username
)?;
let mut instance = Service {
connection,
settings,
state
};
Ok(())
}
}
Теперь добавим вызов нашего нового метода в main функцию клиента. Пока при запуске приложения он просто подключится к серверу и сразу закроется. Более никаких изменений в main«е клиента не будет.
Код main функции клиента (Файл client/src/main.rs)
// Файл client/src/main.rs
use std::io;
use service::Service;
use crate::{
settings::Settings,
state::State
};
mod settings;
mod types;
mod connection;
mod state;
mod service;
fn main() -> io::Result<()> {
let settings = Settings::new();
let state = State::new()?;
Service::run(settings, state)?;
Ok(())
}
Пул сообщений на сервере (MessagesPool)
Наш сервер будет работать с подключениями в отдельных потоках. Для каждого пользователя будет создано 2 отдельных потока: на принятие сообщений от него и на отправку сообщений ему от других пользователей. Новые сообщения необходимо отправлять всем пользователям чата, то-есть потоки, пользователей, которые отвечают за отправку им новых сообщений должны получить каким-то образом сигнал о том, что в чате появилось новое сообщение и его надо отправить юзеру.
Для решения этой проблемы я решил создавать на время жизни процесса сервера структуру, которая будет хранить в себе списком 256 последних сообщений. Потоки отправляющие сообщения юзеру будут с определённым интервалом читать сообщения из этой структуры, начиная с последнего прочитанного. А в свою очередь потоки принимающие сообщения от пользователей будут в конец списка добавлять новые сообщения.
Опишем тип самого сообщения, которое будет храниться в пуле.
Начало файла server/src/messages_pool.rs
// Файл server/src/messages_pool.rs
use std::{collections::{HashMap, VecDeque}, iter};
#[derive(Debug, Clone)]
pub struct PoolMessage {
// Уникальный идентификатор сообщения
pub id: String,
pub username: String,
pub message: String,
// Сообщениям, отправленным сервером ставится true. Это сообщения
// о входе и выходе пользователя.
pub from_server: bool,
}
impl PoolMessage {
fn new() -> PoolMessage {
PoolMessage {
id: String::new(),
username: String::new(),
message: String::new(),
from_server: false,
}
}
}
Далее приступим к описанию самого пула. Простыми словами пул — это просто массив с ограниченной длинной (256 сообщений). Каждое новое сообщения будет добавляться в конец, а если сообщений в пуле 256, то первое сообщение будет удалено для того чтобы в конце освободилось место для нового.
Продолжение файла server/src/messages_pool.rs
// Файл server/src/messages_pool.rs
//...
pub struct MessagesPool {
// Вектор с сообщениям. Используется VecDeque, а не Vec
// для того чтобы можно было удалять и добавлять сообщения
// и в конец и в начало вектора.
pool: VecDeque,
// Для упрощения вычислений индексы сообщений в векторе хранятся
// в отдельной таблице.
indexes: HashMap,
// Кол-во сообщений в пуле.
length: u16,
}
impl MessagesPool {
pub fn new() -> MessagesPool {
// Создём вектор на 256 элементов. Метод repeat_with создас бесконечный
// итератор, который будет повторять одни и теже данные. Далее методом
// take можно преобразовать этот итератор в итератор поменьше. И
// collect'ом собираем итератор в коллекцию (в VecDeque в нашем случае).
let arr: VecDeque = iter::repeat_with(|| PoolMessage::new())
.take(256)
.collect();
MessagesPool {
pool: arr,
indexes: HashMap::new(),
length: 0
}
}
// Метод push добавляет новое сообщение в пул, обновляет
// внутренние индексы и увеличивает кол-во сообщений. Если
// кол-во сообщений в пуле больше 256, то сообщение на нулевом индексе
// удаляется и в конец записывается новое.
pub fn push(&mut self, v: PoolMessage) {
if self.length == 256 {
// обновляем пул
self.pool.pop_front();
self.pool.push_back(v);
// обновляем индексы
let mut new_indexes: HashMap = HashMap::new();
// enumerate - метод итератора, позволяющий преобразовать итератор
// в итератор, который вместе с текущим значением, отдаёт
// и текущую итерацию.
for (index, message) in self.pool.iter().enumerate() {
new_indexes.insert(message.id.clone(), index as u8);
}
self.indexes = new_indexes;
}
else {
// "as u8" преобразует u16 в u8. Мы не можем поставить полю
// length u8, потому что длина может быть равна 256, а оно
// не входит в u8.
let index = self.length as u8;
self.pool[index as usize] = v.clone();
self.length += 1;
self.indexes.insert(v.id.clone(), index);
}
}
// Возвращает сообщения, начиная с определённого id
// и id последнего сообщения.
fn read_from(&self, id: &str) -> (Vec, Option) {
let found_index = self.indexes.get(id);
// Проверяет найдено сообщение с таким id в индексах или нет.
// Если нет, значит в один момент появилось много новых сообщений и их
// не успели отправить пользователю. В таком случае читаем
// сообщения с начала списка.
match found_index {
Some(v) => {
let index: u16 = v.to_owned() as u16 + 1;
let sliced_pool = &Vec::from(self.pool.clone())[index.into()..self.length.into()];
let sliced_pool_last = {
if sliced_pool.len() == 0 {
None
}
else {
Some(sliced_pool.last().unwrap().clone().id)
}
};
return (sliced_pool.clone().into(), sliced_pool_last)
},
None => {
let last_el = self.last();
let index = match last_el {
Some(v) => Some(v.id.clone()),
None => None
};
let sliced_pool = &Vec::from(self.pool.clone())[..self.length.into()];
return (sliced_pool.into(), index)
}
}
}
// Метод проверяющий пул на наличие новых сообщений и возвращает их.
pub fn has_new(&self, id: &str) -> Option<(Vec, Option)> {
let last_el = self.last();
match last_el {
Some(_) => Some(self.read_from(id)),
None => None,
}
}
// Возвращает последнее сообщение в соответствии с текущей длиной пула.
fn last(&self) -> Option {
let last_index = {
if self.length > 0 {
self.length - 1
} else {
self.length
}
};
let last_el = &self.pool[last_index.into()];
if last_el.id == "".to_owned() {
None
} else {
Some(last_el.to_owned())
}
}
}
Чтение сигналов от пользователя на сервере
Чтение сигналов от клиента на сервере будет аналогично чтению сигналов от сервера на клиенте, которое мы писали для структуры Connection. Единственное отличие, что привязываться метод чтения сигналов будет не к какой-то структуре, у которой внутри лежит BufReader
Trait StreamReader (Файл server/src/reader.rs)
// Файл server/src/reader.rs
use std::{io::{BufReader, self, BufRead, Error, ErrorKind}, self, net::TcpStream};
use crate::types::SignalHeader;
pub trait StreamReader {
fn read_signal(&mut self) -> io::Result;
}
impl StreamReader for BufReader {
fn read_signal(&mut self) -> io::Result {
let mut res_line = String::new();
let mut headers_read = false;
loop {
let mut buf_line = String::new();
match self.read_line(&mut buf_line) {
Err(_) => return Err(Error::new(ErrorKind::ConnectionAborted, "boom boom")),
Ok(0) => return Err(Error::new(ErrorKind::BrokenPipe, "boom boom")),
Ok(m) => m,
};
res_line.push_str(&buf_line);
if res_line.ends_with("\r\n\r\n"){
if !res_line.contains(&SignalHeader::WithMessage.to_string()) || headers_read {
break;
}
headers_read = true;
}
}
Ok(res_line)
}
}
Менеджер подключений (основная серверная логика)
Вся серверная логика будет существовать в рамках структуры Manager. Эта структура будет создаваться отдельно для каждого подключения и заниматься взаимодействием с ним.
Так как логика сама достаточно громоздкая, мы её поделим на два основных типа: работа с сокетом и работа с данными. Разделение это будет производиться за счёт разных trait«ов. То-есть по сути все методы будут привязаны к структуре Manager, но сигнатуры этих методов и реализации будут лежать в отдельных от Manager«а файлах.
Структура Manager (Файл server/src/managers/manager.rs)
// Файл server/src/managers/manager.rs
use std::{
net::TcpStream,
io::BufReader,
sync::Arc
};
use parking_lot::Mutex;
use anyhow::Result;
use crate::{state::State, messages_pool::MessagesPool};
pub struct Manager {
// Подключеённый сокет с клиентом
pub stream: TcpStream,
// Ридер, которые будет читать данные с сокета
pub reader: BufReader,
// Состояние приложения
pub state: State,
// Общий пул сообщений
pub messages_pool: Arc>,
// Последнее прочитанное из пула сообщение
pub last_read_message_id: String,
// Имя присоединённого юзера
pub connected_user_username: Option,
// Адрес присоединённого юзера
pub connected_peer_addr: String
}
impl Manager {
pub fn new(stream: TcpStream, state: State, messages_pool: Arc>) -> Result<()> {
let mut manager = Manager {
stream: stream.try_clone()?,
reader: BufReader::new(stream.try_clone()?),
state,
messages_pool,
last_read_message_id: String::new(),
connected_user_username: None,
connected_peer_addr: stream.try_clone()?.peer_addr()?.to_string()
};
Ok(())
}
}
Так как мы создали структуру Manager в отдельной папке, нам нужно определить эту папку как модуль, чтобы иметь возможность взаимодействовать с ней. Для этого в этой папке создайте файл mod.rs и выведите в публичный доступ структуру Manager.
Файл server/src/mangers/mod.rs
// Файл server/src/mangers/mod.rs
mod manager;
pub use manager::Manager;
Теперь нужно добавить модуль managers в список модулей в файле main.rs.
Обновление в файле server/src/main.rs
// Файл server/src/main.rs
// ...
mod managers;
// ...
После того как структура Manager и папка для неё были созданы, можно начинать писать два основных trait«а, которые и будут содержать весь функционал менеджера, а именно: StreamManager (взаимодействие с сокетом) и DataManger (взаимодействие с данными).
Начнём с трейта DataManager, так как большая часть его методов потом будет использоваться в StreamManager«е. Итак, он будет содержать следующий функционал: определение стоит или не стоит пускать юзера на сервер, удаление юзера из списка авторизованных и вывод сообщения о его выходе из чата, отправка юзеру новых сообщений и обработка полученных от него.
Код DataManager (Файл server/src/managers/data_manager.rs)
// Файл server/src/managers/data_manager.rs
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread;
use std::time::Duration;
use std::str::FromStr;
use anyhow::Result;
use parking_lot::Mutex;
// Библиотека для генерации уникальных идентификаторов.
use uuid::Uuid;
use crate::messages_pool::{PoolMessage, MessagesPool};
use crate::state::UserData;
use crate::types::{
AuthStatus,
SignalData,
SignalHeader,
AuthConnectionError,
IncomingMessageError,
SignalType
};
use super::manager::Manager;
// !! Будет описан ниже !!
use super::stream_manager::StreamManager;
pub trait DataManager {
// Отправляет клиенту сигнал о неудачной авторизации.
fn deny_auth(&mut self) -> Result<()>;
// Пытается авторизовать пользоватяля
// (проверяет есть ли юзер с таким ником на сервере или нет).
fn auth(&mut self, signal: String) -> Result<()>;
// Удаляет юзера из списка подключенных к серверу и
// отправялет сообщение о его выходе.
fn remove_user(&mut self, username: String) -> Result<()>;
// В цикле просматривает пул сообщений и отправляет новые пользователю.
fn process_messages_pool(&mut self, receiver: Receiver<()>) -> Result<()>;
// Проверяет сигнал нового сообщения и добавляет его в пул сообщений.
fn process_incoming_message(messages_pool: Arc>, signal: String) -> Result<()>;
}
impl DataManager for Manager {
fn deny_auth(&mut self) -> Result<()> {
let response = SignalData::new(
vec![SignalHeader::AuthStatus(AuthStatus::DENIED)],
None
);
// !! Метод описан в StreamManager ниже !!
self.send_data(&response.to_string())?;
Ok(())
}
fn auth(&mut self, signal: String) -> Result<()> {
let data = SignalData::from_str(&signal)?;
match data.signal_type.unwrap() {
SignalType::Connection => {
if let None = data.username {
return Err(AuthConnectionError.into());
}
let mut state = self.state.get();
if state.users.contains_key(&data.username.clone().unwrap()) {
return Err(AuthConnectionError.into())
}
state.users.insert(data.username.clone().unwrap().to_owned(), UserData {
address: self.stream.peer_addr()?.to_string(),
});
self.messages_pool.lock().push(PoolMessage {
id: Uuid::new_v4().to_string(),
username: String::new(),
message: format!("{} joined the chat!", data.username.clone().unwrap()),
from_server: true
});
}
_ => return Err(AuthConnectionError.into()),
}
self.connected_user_username = Some(data.username.unwrap());
let response = SignalData::new(
vec![SignalHeader::AuthStatus(AuthStatus::ACCEPTED)],
None
);
// !! Метод описан в StreamManager ниже !!
self.send_data(&response.to_string())?;
Ok(())
}
fn remove_user(&mut self, username: String) -> Result<()> {
let mut state = self.state.get();
if state.users.contains_key(&username) {
state.users.remove(&username);
self.messages_pool.lock().push(PoolMessage {
id: Uuid::new_v4().to_string(),
username: String::new(),
message: format!("{username} left the chat!"),
from_server: true
});
}
Ok(())
}
fn process_messages_pool(&mut self, receiver: Receiver<()>) -> Result<()> {
loop {
// Проверяем наличие сообщения от потока, слушающего
// сообщения от пользователя. Если сообщение есть,
// значит соединение потеряно и можно обрывать цикл. Подробнее
// об этом в описании трейта StreamManager.
if let Ok(()) = receiver.try_recv() {
break;
};
// Так как self.messages_pool это Arc, его данные не получиться
// получить просто прописав self.messages_pool.clone().lock(),