Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
lmdb_store_wrapper.cpp
Go to the documentation of this file.
5#include "napi.h"
6#include <algorithm>
7#include <chrono>
8#include <cstdint>
9#include <iterator>
10#include <memory>
11#include <optional>
12#include <ratio>
13#include <stdexcept>
14#include <utility>
15
16using namespace bb::nodejs;
17using namespace bb::nodejs::lmdb_store;
18
19const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
20const uint64_t DEFAULT_MAX_READERS = 16;
21const uint64_t DEFAULT_CURSOR_PAGE_SIZE = 10;
22
23LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)
24 : ObjectWrap(info)
25{
26 Napi::Env env = info.Env();
27
28 size_t data_dir_index = 0;
29 std::string data_dir;
30 if (info.Length() > data_dir_index && info[data_dir_index].IsString()) {
31 data_dir = info[data_dir_index].As<Napi::String>();
32 } else {
33 throw Napi::TypeError::New(env, "Directory needs to be a string");
34 }
35
36 size_t map_size_index = 1;
37 uint64_t map_size = DEFAULT_MAP_SIZE;
38 if (info.Length() > map_size_index) {
40 map_size = info[map_size_index].As<Napi::Number>().Uint32Value();
41 } else {
42 throw Napi::TypeError::New(env, "Map size must be a number or an object");
43 }
44 }
45
46 size_t max_readers_index = 2;
48 if (info.Length() > max_readers_index) {
50 max_readers = info[max_readers_index].As<Napi::Number>().Uint32Value();
51 } else if (!info[max_readers_index].IsUndefined()) {
52 throw Napi::TypeError::New(env, "The number of readers must be a number");
53 }
54 }
55
57
59
62
68
70
72
73 // The close operation requires exclusive execution, no other operations can be run concurrently with it
75
77}
78
79Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
80{
82}
83
84Napi::Function LMDBStoreWrapper::get_class(Napi::Env env)
85{
86 return DefineClass(env,
87 "Store",
88 {
90 });
91}
92
93// Simply verify that the store is still valid and that close has not been called
95{
96 if (_store) {
97 return;
98 }
99 throw std::runtime_error(format("LMDB store unavailable, was close already called?"));
100}
101
103{
104 verify_store();
105 _store->open_database(req.db, !req.uniqueKeys.value_or(true));
106 return { true };
107}
108
110{
111 verify_store();
113 lmdblib::KeysVector keys = req.keys;
114 _store->get(keys, vals, req.db);
115 return { vals };
116}
117
119{
121 return std::lexicographical_compare(a.begin(), a.end(), b.begin(), b.end());
122 };
123
124 verify_store();
125 std::set<lmdblib::Key, decltype(string_cmp)> key_set(string_cmp);
126 for (const auto& entry : req.entries) {
127 key_set.insert(entry.first);
128 }
129
130 lmdblib::KeysVector keys(key_set.begin(), key_set.end());
132 _store->get(keys, vals, req.db);
133
134 std::vector<bool> exists;
135
136 for (const auto& entry : req.entries) {
137 const auto& key = entry.first;
138 const auto& requested_values = entry.second;
139
140 const auto& key_it = std::find(keys.begin(), keys.end(), key);
141 if (key_it == keys.end()) {
142 // this shouldn't happen. It means we missed a key when we created the key_set
143 exists.push_back(false);
144 continue;
145 }
146
147 // should be fine to convert this to an index in the array?
148 const auto& values = vals[static_cast<size_t>(key_it - keys.begin())];
149
150 if (!values.has_value()) {
151 exists.push_back(false);
152 continue;
153 }
154
155 // client just wanted to know if the key exists
156 if (!requested_values.has_value()) {
157 exists.push_back(true);
158 continue;
159 }
160
161 exists.push_back(std::all_of(requested_values->begin(), requested_values->end(), [&](const auto& val) {
162 return std::find(values->begin(), values->end(), val) != values->begin();
163 }));
164 }
165
166 return { exists };
167}
168
170{
171 verify_store();
172 bool reverse = req.reverse.value_or(false);
174 bool one_page = req.onePage.value_or(false);
175 lmdblib::Key key = req.key;
176
177 auto tx = _store->create_shared_read_transaction();
178 lmdblib::LMDBCursor::SharedPtr cursor = _store->create_cursor(tx, req.db);
179 bool start_ok = cursor->set_at_key(key);
180
181 if (!start_ok) {
182 // we couldn't find exactly the requested key. Find the next biggest one.
183 start_ok = cursor->set_at_key_gte(key);
184 // if we found a key that's greater _and_ we want to go in reverse order
185 // then we're actually outside the requested bounds, we need to go back one position
186 if (start_ok && reverse) {
188 // read_prev returns `true` if there's nothing more to read
189 // turn this into a "not ok" because there's nothing in the db for this cursor to read
190 start_ok = !cursor->read_prev(1, entries);
191 } else if (!start_ok && reverse) {
192 // we couldn't find a key greater than our starting point _and_ we want to go in reverse..
193 // then we start at the end of the database (the client requested to start at a key greater than anything in
194 // the DB)
195 start_ok = cursor->set_at_end();
196 }
197
198 // in case we're iterating in ascending order and we can't find the exact key or one that's greater than it
199 // then that means theren's nothing in the DB for the cursor to read
200 }
201
202 // we couldn't find a starting position
203 if (!start_ok) {
204 return { std::nullopt, {} };
205 }
206
207 auto [done, first_page] = _advance_cursor(*cursor, reverse, page_size);
208 // cursor finished after reading a single page or client only wanted the first page
209 if (done || one_page) {
210 return { std::nullopt, first_page };
211 }
212
213 auto cursor_id = cursor->id();
214 {
216 _cursors[cursor_id] = { cursor, reverse };
217 }
218
219 return { cursor_id, first_page };
220}
221
223{
224 {
226 _cursors.erase(req.cursor);
227 }
228 return { true };
229}
230
232{
234
235 {
237 data = _cursors.at(req.cursor);
238 }
239
241 auto [done, entries] = _advance_cursor(*data.cursor, data.reverse, page_size);
242 return { entries, done };
243}
244
246{
248
249 {
251 data = _cursors.at(req.cursor);
252 }
253
254 auto [done, count] = _advance_cursor_count(*data.cursor, data.reverse, req.endKey);
255 return { count, done };
256}
257
259{
260 verify_store();
262 batches.reserve(req.batches.size());
263
264 for (const auto& data : req.batches) {
265 lmdblib::LMDBStore::PutData batch{ data.second.addEntries, data.second.removeEntries, data.first };
266 batches.push_back(batch);
267 }
268
269 auto start = std::chrono::high_resolution_clock::now();
270 _store->put(batches);
271 auto end = std::chrono::high_resolution_clock::now();
272 std::chrono::duration<uint64_t, std::nano> duration_ns = end - start;
273
274 return { duration_ns.count() };
275}
276
278{
279 verify_store();
281 auto [map_size, physical_file_size] = _store->get_stats(stats);
282 return { stats, map_size, physical_file_size };
283}
284
286{
287 // prevent this store from receiving further messages
289
290 {
291 // close all of the open read cursors
292 std::lock_guard cursors(_cursor_mutex);
293 _cursors.clear();
294 }
295
296 // and finally close the database handle
297 _store.reset(nullptr);
298
299 return { true };
300}
301
303{
304 verify_store();
305 _store->copy_store(req.dstPath, req.compact.value_or(false));
306
307 return { true };
308}
309
311 bool reverse,
312 uint64_t page_size)
313{
315 bool done = reverse ? cursor.read_prev(page_size, entries) : cursor.read_next(page_size, entries);
316 return std::make_pair(done, entries);
317}
318
320 bool reverse,
321 const lmdblib::Key& end_key)
322{
323 uint64_t count = 0;
324 bool done = reverse ? cursor.count_until_prev(end_key, count) : cursor.count_until_next(end_key, count);
325 return std::make_pair(done, count);
326}
bool count_until_next(const Key &key, uint64_t &count) const
bool read_next(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
bool read_prev(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
std::shared_ptr< LMDBCursor > SharedPtr
bool count_until_prev(const Key &key, uint64_t &count) const
void register_handler(uint32_t msgType, T *self, R(T::*handler)() const, bool unique=false)
Napi::Promise process_message(const Napi::CallbackInfo &info)
StartCursorResponse start_cursor(const StartCursorRequest &req)
GetResponse get(const GetRequest &req)
static Napi::Function get_class(Napi::Env env)
BoolResponse close_cursor(const CloseCursorRequest &req)
BoolResponse open_database(const OpenDatabaseRequest &req)
bb::nodejs::AsyncMessageProcessor _msg_processor
HasResponse has(const HasRequest &req)
BatchResponse batch(const BatchRequest &req)
BoolResponse copy_store(const CopyStoreRequest &req)
std::unordered_map< uint64_t, CursorData > _cursors
static std::pair< bool, uint64_t > _advance_cursor_count(const lmdblib::LMDBCursor &cursor, bool reverse, const lmdblib::Key &end_key)
AdvanceCursorResponse advance_cursor(const AdvanceCursorRequest &req)
AdvanceCursorCountResponse advance_cursor_count(const AdvanceCursorCountRequest &req)
Napi::Value call(const Napi::CallbackInfo &)
The only instance method exposed to JavaScript. Takes a msgpack Message and returns a Promise.
static std::pair< bool, lmdblib::KeyDupValuesVector > _advance_cursor(const lmdblib::LMDBCursor &cursor, bool reverse, uint64_t page_size)
std::unique_ptr< lmdblib::LMDBStore > _store
std::string format(Args... args)
Definition log.hpp:23
#define info(...)
Definition log.hpp:93
const std::vector< MemoryValue > data
FF a
FF b
const uint64_t DEFAULT_MAP_SIZE
const uint64_t DEFAULT_MAX_READERS
const uint64_t DEFAULT_CURSOR_PAGE_SIZE
std::vector< Key > KeysVector
Definition types.hpp:13
std::vector< uint8_t > Key
Definition types.hpp:11
std::vector< KeyValuesPair > KeyDupValuesVector
Definition types.hpp:18
std::vector< OptionalValues > OptionalValuesVector
Definition types.hpp:17
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13