The last month I wrote a blog post about the LMDB Cache database and my
wish to use that in Fractal. To summarize, LMDB is a memory-mapped key-value
database that persist the data to the filesystem. I want to use this in the
Fractal desktop application to replace the current state storage system
(we're using simple json files) and as a side effect we can use this storage
system to share data between threads because currently we're using a big
struct AppOp
shared with Arc<Mutex<AppOp>>
and this cause some problems
because we need to share and lock and update the state there.
The main goal is to define an app data model with smaller structs and store this using LMDB, then we can access to the same data querying the LMDB and we can update the app state storing to the LMDB.
With this change we don't need to share these structs, we only need to query to the LMDB to get the data and the work with that, and this should simplify our code. The other main benefit will be that we'll have this state in the filesystem by default so when we open the app after close, we'll stay in the same state.
Take a look to the gtk TODO example app to view how to use mdl with signals in a real gtk app.
What is mdl
mdl is Data model library to share app state between threads and process and persist the data in the filesystem. Implements a simple way to store structs instances in a LMDB database and other methods like BTreeMap.
I started to play with the LMDB rust binding and writing some simple tests. After some simple tests, I decided to write a simple abstraction to hide the LMDB internals and to provide a simple data storage and to do that I created the mdl crate.
The idea is to be able to define your app model as simple rust structs. LMDB is a key-value database so every struct instance will have an unique key to store in the cache.
The keys are stored in the cache ordered, so we can use some techniques to store related objects and to retrieve all objects of a kind, we only need to build keys correctly, following an scheme. For example, for fractal we can store rooms, members and messages like this:
- rooms with key "room:roomid", to store all the room information, title, topic, icon, unread msgs, etc.
- members with key "member:roomid:userid", to store all member information.
- messages with key "msg:roomid:msgid" to store room messages.
Following this key assignment we can iterate over all rooms by querying all objects that starts with "room", we can get all members and all messages from a room.
This have some inconveniences, because we can't query directly an message by id if we don't know the roomid. If we need that kind of queries, we need to think about another key assignment or maybe we should duplicate data. key-value are simple databases so we don't have the power of relational databases.
Internals
LMDB is fast and efficient, because it's in memory so using this cache won't add a lot of overhead, but to make it simple to use I've to add some overhead, so mdl is easy by default and can be tuned to be really fast.
This crate has three main modules with traits to implement:
- model: This contains the
Model
trait that should implement every struct that we want to make cacheable. - store: This contains the
Store
trait that's implemented by all the cache systems. - signal: This contains the
Signaler
trait and two structs that allow us to emit/subscribe to "key" signals.
And two more modules that implements the current two cache systems:
- cache: LMDB cache that implements the
Store
trait. - bcache: BTreeMap cache that implements the
Store
trait. This is a good example of other cache system that can be used, this doesn't persist to the filesystem.
So we've two main concepts here, the Store and the Model. The model
is the plain data and the store is the container of data. We'll be able to add
models to the store or to query the store to get stored models. We store our
models as key-value where the key is a String
and the value is a Vec<u8>
,
so every model should be serializable.
This serialization is the bigger overhead added. We need to do this because we need to be able to store this in the LMDB database. Every request will create a copy of the object in the database, so we're not using the same data. This can be tuned to use pointers to the real data, but to do that we'll need to use unsafe code and I think that the performance that we'll get with this doesn't deserve the complexity that this will add.
By default, the Model
trait has two methods fromb
and tob
to serialize
and deserialize using bincode, so any struct that implements the Model
trait and doesn't reimplement these two methods should implement Serialize
and Deserialize
from serde.
The signal system is an addition to be able to register callbacks to keys modifications in the store, so we can do something when a new objects is added, modified or deleted from the store. The signaler is optional and we should use it in a explicit way.
How to use it
First of all, you should define your data model, the struct that you want to be able to store in the database:
#[derive(Serialize, Deserialize, Debug)]
struct A {
pub p1: String,
pub p2: u32,
}
In this example we'll define a struct called A with two attributes, p1,
a String
, and p2, an u32
. We derive Serialize
and Deserialize
because we're using the default fromb
and tob
from the Model
trait.
Then we need to implement the Model
trait:
impl Model for A {
fn key(&self) -> String {
format!("{}:{}", self.p1, self.p2)
}
}
We only reimplement the key
method to build a key for every instance of A
.
In this case our key will be the String
followed by the number, so for example
if we've something like let a = A { p1: "myk", p2: 42 };
the key will be
"myk:42".
Then, to use this we need to have a Store
, in this example, we'll use the
LMDB store that's the struct Cache
:
// initializing the cache. This str will be the fs persistence path
let db = "/tmp/mydb.lmdb";
let cache = Cache::new(db).unwrap();
We pass the path to the filesystem where we want to persist the cache as the first argument, in this example we'll persist to "/tmp/mydb.lmdb". When we ran the program for the first time a directory will be created there. The next time, that cache will be used with the information from the previous execution.
Then, with this cache
object we can instantiate an A
object and store
in the cache
:
// create a new *object* and storing in the cache
let a = A{ p1: "hello".to_string(), p2: 42 };
let r = a.store(&cache);
assert!(r.is_ok());
The store
method will serialize the object and store a copy of that in the
cache.
After the store, we can query for this object from other process, using the same lmdb path, or from the same process using the cache:
// querying the cache by key and getting a new *instance*
let a1: A = A::get(&cache, "hello:42").unwrap();
assert_eq!(a1.p1, a.p1);
assert_eq!(a1.p2, a.p2);
We'll get a copy of the original one.
This is the full example:
extern crate mdl;
#[macro_use]
extern crate serde_derive;
use mdl::Cache;
use mdl::Model;
use mdl::Continue;
#[derive(Serialize, Deserialize, Debug)]
struct A {
pub p1: String,
pub p2: u32,
}
impl Model for A {
fn key(&self) -> String {
format!("{}:{}", self.p1, self.p2)
}
}
fn main() {
// initializing the cache. This str will be the fs persistence path
let db = "/tmp/mydb.lmdb";
let cache = Cache::new(db).unwrap();
// create a new *object* and storing in the cache
let a = A{ p1: "hello".to_string(), p2: 42 };
let r = a.store(&cache);
assert!(r.is_ok());
// querying the cache by key and getting a new *instance*
let a1: A = A::get(&cache, "hello:42").unwrap();
assert_eq!(a1.p1, a.p1);
assert_eq!(a1.p2, a.p2);
}
Iterations
When we store objects with the same key prefix we can iterate over all of the objects, because we don't know the full key of all objects.
Currently there's two ways to iterate over all objects with the same prefix
in a Store
:
- all
This is the simpler way, calling the all
method we'll receive a Vec<T>
so
we've all the objects in a vector.
let hellows: Vec<A> = A::all(&cache, "hello").unwrap();
for h in hellows {
println!("hellow: {}", h.p2);
}
This has a little problem, because if we've a lot of objects, this will use a
lot of memory for the vector and we'll be iterating over all objects twice. To
solve this problems, the iter
method was created.
- iter
The iter
method provides a way to call a closure for every object with this
prefix in the key. This closure should return a Continue(bool)
that will
indicates if we should continue iterating of if we should stop the iteration
here.
A::iter(&cache, "hello", |h| {
println!("hellow: {}", h.p2);
Continue(true)
}).unwrap();
Using the Continue
we can avoid to iterate over all the objects, for example
if we're searching for one concrete object.
We're copying every object, but the iter
method is better than the all
,
because if we don't copy or move the object from the closure, this copy only
live in the closure scope, so we'll use less memory and also, we only iterate
one. If we use all
, we'll iterate over all objects with that prefix to build
the vector so if we iterate over that vector another time this will cost more
than the iter
version.
Signal system
As I said before, the signal system provide us a way to register callbacks
to key modifications. The signal system is independent of the Model
and
Store
and can be used independently:
extern crate mdl;
use mdl::Signaler;
use mdl::SignalerAsync;
use mdl::SigType;
use std::sync::{Arc, Mutex};
use std::{thread, time};
fn main() {
let sig = SignalerAsync::new();
sig.signal_loop();
let counter = Arc::new(Mutex::new(0));
// one thread for receive signals
let sig1 = sig.clone();
let c1 = counter.clone();
let t1: thread::JoinHandle<_> =
thread::spawn(move || {
let _ = sig1.subscribe("signal", Box::new(move |_sig| {
*c1.lock().unwrap() += 1;
}));
});
// waiting for threads to finish
t1.join().unwrap();
// one thread for emit signals
let sig2 = sig.clone();
let t2: thread::JoinHandle<_> =
thread::spawn(move || {
sig2.emit(SigType::Update, "signal").unwrap();
sig2.emit(SigType::Update, "signal:2").unwrap();
sig2.emit(SigType::Update, "signal:2:3").unwrap();
});
// waiting for threads to finish
t2.join().unwrap();
let ten_millis = time::Duration::from_millis(10);
thread::sleep(ten_millis);
assert_eq!(*counter.lock().unwrap(), 3);
}
In this example we're creating a SignalerAsync
that can emit
signal and
we can subscribe
a callback to any signal. The sig.signal_loop();
init the
signal loop thread, that wait for signals and call any subscribed callback when
a signal comes.
let _ = sig1.subscribe("signal", Box::new(move |_sig| {
*c1.lock().unwrap() += 1;
}));
We subscribe a callback to the signaler. The signaler can be cloned and the list of callbacks will be the same, if you emit a signal in a clone and subscribe in other clone, that signal will trigger the callback.
Then we're emiting some signals:
sig2.emit(SigType::Update, "signal").unwrap();
sig2.emit(SigType::Update, "signal:2").unwrap();
sig2.emit(SigType::Update, "signal:2:3").unwrap();
All of this three signals will trigger the previous callback because the subscription works as a signal starts with. This allow us to subscribe to all new room messages insertion if we follow the previous described keys, subscribing to "msg:roomid" and if we only want to register a callback to be called only when one message is updated we can subscribe to "msg:roomid:msgid" and this callback won't be triggered for other messages.
The callback should be a Box<Fn(signal)>
where signal is the following
struct:
#[derive(Clone, Debug)]
pub enum SigType {
Update,
Delete,
}
#[derive(Clone, Debug)]
pub struct Signal {
pub type_: SigType,
pub name: String,
}
Currently only Update
and Delete
signal types are supported.
Signaler in gtk main loop
All the UI operations in a gtk app should be executed in the gtk main loop so
we can't use the SignalerAsync
in a gtk app, because this signaler creates
one thread for the callbacks so all callbacks should implement the Send
trait
and if we want to modify, for example, a gtk::Label
in a callback, that
callback won't implement Send
because gtk::Label
can't be send between
threads safely.
To solve this problem, I've added the SignalerSync
. That doesn't launch any
threads and where all operations runs in the same thread, even the callback.
This is a problem if one of your callbacks locks the thread, because this will
lock your interface in a gtk app, so any callback in the sync signaler should
be non blocking.
This signaler should be used in a different way, so we should call from time
to time to the signal_loop_sync
method, that will check for new signals and
will trigger any subscribed callback. This signaler doesn't have a signal_loop
because we should do the loop in our thread.
This is an example of how to run the signaler loop inside a gtk app:
let sig = SignalerSync::new();
let sig1 = sig.clone();
gtk::timeout_add(50, move || {
gtk::Continue(sig1.signal_loop_sync())
});
// We can subscribe callbacks using the sig here
In this example code we're registering a timeout callback, every 50ms this
closure will be called, from the gtk main thread, and the signal_loop_sync
will check for signals and call the needed callbacks.
This method returns a bool
that's false when the signaler stops. You can
stop the signaler calling the stop
method.
Point of extension
I've tried to make this crate generic to be able to extend in the future and provide other kind of cache that can be used changing little code in the apps that uses mdl.
This is the main reason to use traits to implement the store, so the first point of extension is to add more cache systems, we're currently two, the LMDB and the BTreeMap, but it would be easy to add more key-value storages, like memcached, unqlie, mongodb, redis, couchdb, etc.
The signaler is really simple, so maybe we can start to think about new
signalers that uses Futures
and other kind of callbacks registration.
As I said before, mdl does a copy of the data on every write and on every read, so it could be cool to explore the implication of these copies in the performance and try to find methods to reduce this overhead.
Comments !