[Из песочницы] Введение в CDRS, Cassandra driver полностью написанный на Rust
CDRS (Cassandra driver written in Rust) — это мой собственный open source проект, который я решился разрабатывать после того, как обнаружил, что в плане драйверов для Cassandra в Rust экосистеме образовался дефицит.
Конечно, я не скажу, что их совсем нет. Они есть, но одна часть это заброшенные в зачаточном состоянии Hello World пакеты, а вторая часть это, наверное, единственный binding к драйверу от DataStax, написанному на С++.
Что касается CDRS, то средствами Rust он полностью имплементирует спецификацию 4-й версии протокола.
cargo.toml
Чтобы включить драйвер в свой проект, как обычно, необходимо следующее.
Во-первых, добавить CDRS в секцию dependencies
вашего cargo.toml
файла:
[dependencies]
cdrs = "1.0.0-beta.1"
Это позволит использовать TCP соединение без шифрования.
Если вы намерены создавать SSL-шифрованное соединение со свое базой данных, то CDRS должен быть включен с фичей «ssl»:
[dependencies]
openssl = "0.9.6"
[dependencies.cdrs]
version = "1.0.0-beta.1"
features = ["ssl"]
Во-вторых, добавить его в lib.rs
extern crate CDRS
Установка соединения
TCP соединение
Для установки не шифрованного соединения вам понадобятся следующие модули
use cdrs::client::CDRS;
use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator};
use cdrs::transport::TransportPlain;
Если так случилось, что ваш кластер не требует авторизации паролем, то соединение может быть установлено следующим образом:
let authenticator = NoneAuthenticator;
let addr = "127.0.0.1:9042";
let tcp_transport = TransportPlain::new(addr).unwrap();
// pass authenticator and transport into CDRS' constructor
let client = CDRS::new(tcp_transport, authenticator);
use cdrs::compression;
// start session without compression
let mut session = try!(client.start(compression::None));
Для установки соединения, требующего авторизации паролем, вместо NoneAuthenticator
нужно использовать PasswordAuthenticator
:
let authenticator = PasswordAuthenticator::new("user", "pass");
TLS соединение
Установление TLS соединение во многом похоже на процесс, описанный в предыдущем шаге, за исключением того, что вам понадобится PEM сертификат для создания SSL транспорта.
use cdrs::client::CDRS;
use cdrs::authenticators::PasswordAuthenticator;
use cdrs::transport::TransportTls;
use openssl::ssl::{SslConnectorBuilder, SslMethod};
use std::path::Path;
let authenticator = PasswordAuthenticator::new("user", "pass");
let addr = "127.0.0.1:9042";
// here needs to be a path of your SSL certificate
let path = Path::new("./node0.cer.pem");
let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
ssl_connector_builder.builder_mut().set_ca_file(path).unwrap();
let connector = ssl_connector_builder.build();
let ssl_transport = TransportTls::new(addr, &connector).unwrap();
// pass authenticator and SSL transport into CDRS' constructor
let client = CDRS::new(ssl_transport, authenticator);
Connection pool
Для более простого управления существующими соединениям CDRS содержит ConnectionManager
, который по своей сути есть адаптор для r2d2.
use cdrs::connection_manager::ConnectionManager;
//...
let config = r2d2::Config::builder()
.pool_size(3)
.build();
let transport = TransportPlain::new(ADDR).unwrap();
let authenticator = PasswordAuthenticator::new(USER, PASS);
let manager = ConnectionManager::new(transport, authenticator, Compression::None);
let pool = r2d2::Pool::new(config, manager).unwrap();
for _ in 0..20 {
let pool = pool.clone();
thread::spawn(move || {
let conn = pool.get().unwrap();
// use the connection
// it will be returned to the pool when it falls out of scope.
});
}
Сжатие — lz4 и snappy
Чтобы использовать lz4
и snappy
сжатие, достаточно передать в конструктор сессии желаемый декодер:
// session without compression
let mut session_res = client.start(compression::None);
// session with lz4 compression
let mut session_res = client.start(compression::Lz4);
// session with snappy compression
let mut session_res = client.start(compression::Snappy);
Далее CDRS самостоятельно сообщит кластеру, что он готов принимать информацию в сжатом виде с выбранным декодером. Дальнейшая распаковка будет проходить автоматически и не требует каких-либо дальнейших действий от разработчика.
Выполнение запросов
Выполнение запросов к Cassandra серверу проходит исключительно в рамках существующей сессии, после выбора методов авторизации, сжатия, а также типа транспорта.
Для выполнения того или иного запроса необходимо создать объект Query
, который с первого взгляда может показаться несколько избыточным для простых запросов, посколько содержит множество параметров, которые, вероятно, используются не так часто.
По этой причине был создан builder
, который упрощает процесс конфигурирования запроса. Например, для простого 'USE my_namespace;
' достаточно просто
let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize();
let with_tracing = false;
let with_warnings = false;
let switched = session.query(create_query, with_tracing, with_warnings).is_ok();
Создание новой таблицы
Чтобы создать новую таблицу в Cassandra кластере, как и раньше, необходимо вначале сконфигурировать Query
и после этого выполнить запрос:
use std::default::Default;
use cdrs::query::{Query, QueryBuilder};
use cdrs::consistency::Consistency;
let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors (
id int,
name text,
messages list,
PRIMARY KEY (id)
);")
.consistency(Consistency::One)
.finalize();
let with_tracing = false;
let with_warnings = false;
let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();
Что касается самого CQL запроса создания новой таблицы, то за более полной информацией лучше обратиться к специализированным ресурсам, например DataStax.
SELECT запрос и маппинг результатов
Предположим, что в нашей базе данных существует таблица авторов, при чем каждый автор имеет список своих сообщений. Пусть эти сообщения хранятся внутри list-колонки. В терминах Rust автор должен иметь следующий вид:
struct Author {
pub name: String,
pub messages: Vec
}
Сам запрос может быть выполнен через Session::query
метод, как это было сделано в случае создания таблицы. Естественно, CQL должен быть в данном случае чем-то вроде 'SELECT * FROM keyspace.authors;
'. Если таблица содержит данные о каких-то авторах, мы можем попытаться отобразить полученные данные в коллекцию Rust структур, типа 'Vec
'
//...
use cdrs::error::{Result as CResult};
let res_body = parsed.get_body();
let rows = res_body.into_rows().unwrap();
let messages: Vec = rows
.iter()
.map(|row| {
let name: String = row.get_by_name("name").unwrap();
let messages: Vec = row
// unwrap Option>, where T implements AsRust
.get_by_name("messages").unwrap().unwrap()
.as_rust().unwrap();
return Author {
author: name,
text: messages
};
})
.collect();
Во время отображения результатов следует обратить внимание на следующие трейты:
IntoRustByName. Говоря простым языком, этот трейт применяется по отношению к сложным типам Cassandra таким, как row (которая, строго говоря не является отдельным типом, определенным в спецификации, но по своему внутреннему устройству может рассматриваться, как что-то близкое к User Defined Type) и UDT. Грубо говоря,
get_by_name
пытается отыскать «свойство» по его имени, и если находит, то возвращает результат преобразования этого свойства к Rust типу или к CDRS типам, таким какList
, 'Map',UDT
. Сами же эти типы есть отображение соответствующих типов данных определенных в спецификации.- AsRust. Этот трейт предназначен для конечного отображения в Rust типы. Полный список имплиментаторов можно увидеть в приведенной ссылке.
Prepare & Execute
Иногда бывает удобным вначале единожды подготовить сложный запрос, а после этого выполнить его несколько раз с различными данными в разное время. Для этого прекрасно подходит Prepare & Execute.
// prepare just once
let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values (?, ?, ?, ?, ?)";
let prepared = session.prepare(insert_table_cql.to_string(), true, true)
.unwrap()
.get_body()
.into_prepared()
.unwrap();
// execute later and possible few times with different values
let v: Vec = vec![Value::new_normal(String::from("john").into_bytes()),
Value::new_normal(String::from("pwd").into_bytes()),
Value::new_normal(String::from("male").into_bytes()),
Value::new_normal(String::from("09000").into_bytes()),
Value::new_normal(String::from("FL").into_bytes())];
let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize();
// without tracing and warnings
let executed = session.execute(prepared.id, execution_params, false, false);
Также имеет смысл комбинировать Prepare & Batch для выполнения сразу нескольких подготовленных запросов. Простейший пример Batch также можно найти в примерах.
Cassandra events
Кроме всего вышеописанного, CDRS предоставляет возможность подписаться и следить за событиями, которые публикует сервер.
let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap();
thread::spawn(move || listener.start(&Compression::None).unwrap());
let topology_changes = stream
// inspects all events in a stream
.inspect(|event| println!("inspect event {:?}", event))
// filter by event's type: topology changes
.filter(|event| event == &SimpleServerEvent::TopologyChange)
// filter by event's specific information: new node was added
.filter(|event| {
match event {
&ServerEvent::TopologyChange(ref event) => {
event.change_type == TopologyChangeType::NewNode
},
_ => false
}
});
println!("Start listen for server events");
for change in topology_changes {
println!("server event {:?}", change);
}
Чтобы найти полный список событий лучше всего обратиться в саму спецификацию, а также к документации драйвера.
В будущем есть планы использовать события для «умного» load balancing.
Полезные ссылки
- CDRS репозиторий, примеры, документация
- Спецификация 4-й версии протокола
- DataStax CQL, конфигурация кластера.