monotone

monotone Mtn Source Tree

Root/database.cc

1// Copyright (C) 2002 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <algorithm>
11#include <deque>
12#include <fstream>
13#include <iterator>
14#include <list>
15#include <set>
16#include <sstream>
17#include <vector>
18
19#include <string.h>
20
21#include <boost/shared_ptr.hpp>
22#include <boost/lexical_cast.hpp>
23
24#include <sqlite3.h>
25
26#include "app_state.hh"
27#include "cert.hh"
28#include "cleanup.hh"
29#include "constants.hh"
30#include "database.hh"
31#include "hash_map.hh"
32#include "keys.hh"
33#include "revision.hh"
34#include "safe_map.hh"
35#include "sanity.hh"
36#include "schema_migration.hh"
37#include "transforms.hh"
38#include "ui.hh"
39#include "vocab.hh"
40#include "xdelta.hh"
41#include "epoch.hh"
42
43#undef _REENTRANT
44#include "lru_cache.h"
45
46// defined in schema.sql, converted to header:
47#include "schema.h"
48
49// this file defines a public, typed interface to the database.
50// the database class encapsulates all knowledge about sqlite,
51// the schema, and all SQL statements used to access the schema.
52//
53// see file schema.sql for the text of the schema.
54
55using std::deque;
56using std::endl;
57using std::istream;
58using std::ifstream;
59using std::make_pair;
60using std::map;
61using std::multimap;
62using std::ostream;
63using std::pair;
64using std::set;
65using std::string;
66using std::vector;
67
68using boost::shared_ptr;
69using boost::lexical_cast;
70
71int const one_row = 1;
72int const one_col = 1;
73int const any_rows = -1;
74int const any_cols = -1;
75
76bool global_slow_assertions_version_check = true;
77
78namespace
79{
80 struct query_param
81 {
82 enum arg_type { text, blob };
83 arg_type type;
84 string data;
85 };
86
87 query_param
88 text(string const & txt)
89 {
90 query_param q = {
91 query_param::text,
92 txt,
93 };
94 return q;
95 }
96
97 query_param
98 blob(string const & blb)
99 {
100 query_param q = {
101 query_param::blob,
102 blb,
103 };
104 return q;
105 }
106
107 // track all open databases for close_all_databases() handler
108 set<sqlite3*> sql_contexts;
109}
110
111struct query
112{
113 explicit query(string const & cmd)
114 : sql_cmd(cmd)
115 {}
116
117 query()
118 {}
119
120 query & operator %(query_param const & qp)
121 {
122 args.push_back(qp);
123 return *this;
124 }
125
126 vector<query_param> args;
127 string sql_cmd;
128};
129
130database::database(system_path const & fn) :
131 filename(fn),
132 // nb. update this if you change the schema. unfortunately we are not
133 // using self-digesting schemas due to comment irregularities and
134 // non-alphabetic ordering of tables in sql source files. we could create
135 // a temporary db, write our intended schema into it, and read it back,
136 // but this seems like it would be too rude. possibly revisit this issue.
137 schema("9d2b5d7b86df00c30ac34fe87a3c20f1195bb2df"),
138 pending_writes_size(0),
139 __sql(NULL),
140 transaction_level(0)
141{}
142
143bool
144database::is_dbfile(any_path const & file)
145{
146 system_path fn(file);// why is this needed?
147 bool same = (filename.as_internal() == fn.as_internal());
148 if (same)
149 L(FL("'%s' is the database file") % file);
150 return same;
151}
152
153void
154database::check_schema()
155{
156 string db_schema_id;
157 calculate_schema_id (__sql, db_schema_id);
158 N (schema == db_schema_id,
159 F("layout of database %s doesn't match this version of monotone\n"
160 "wanted schema %s, got %s\n"
161 "try '%s db migrate' to upgrade\n"
162 "(this is irreversible; you may want to make a backup copy first)")
163 % filename % schema % db_schema_id % __app->prog_name);
164}
165
166void
167database::check_is_not_rosterified()
168{
169 results res;
170 string rosters_query = "SELECT 1 FROM rosters LIMIT 1";
171 fetch(res, one_col, any_rows, query(rosters_query));
172 N(res.empty(),
173 F("this database already contains rosters"));
174}
175
176void
177database::check_format()
178{
179 results res_revisions;
180 string manifests_query = "SELECT 1 FROM manifests LIMIT 1";
181 string revisions_query = "SELECT 1 FROM revisions LIMIT 1";
182 string rosters_query = "SELECT 1 FROM rosters LIMIT 1";
183
184 fetch(res_revisions, one_col, any_rows, query(revisions_query));
185
186 if (!res_revisions.empty())
187 {
188 // they have revisions, so they can't be _ancient_, but they still might
189 // not have rosters
190 results res_rosters;
191 fetch(res_rosters, one_col, any_rows, query(rosters_query));
192 N(!res_rosters.empty(),
193 F("database %s contains revisions but no rosters\n"
194 "if you are a project leader or doing local testing:\n"
195 " see the file UPGRADE for instructions on upgrading.\n"
196 "if you are not a project leader:\n"
197 " wait for a leader to migrate project data, and then\n"
198 " pull into a fresh database.\n"
199 "sorry about the inconvenience.")
200 % filename);
201 }
202 else
203 {
204 // they have no revisions, so they shouldn't have any manifests either.
205 // if they do, their db is probably ancient. (though I guess you could
206 // trigger this check by taking a pre-roster monotone, doing "db
207 // init; commit; db kill_rev_locally", and then upgrading to a
208 // rosterified monotone.)
209 results res_manifests;
210 fetch(res_manifests, one_col, any_rows, query(manifests_query));
211 N(res_manifests.empty(),
212 F("database %s contains manifests but no revisions\n"
213 "this is a very old database; it needs to be upgraded\n"
214 "please see README.changesets for details")
215 % filename);
216 }
217}
218
219static void
220sqlite3_gunzip_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
221{
222 if (nargs != 1)
223 {
224 sqlite3_result_error(f, "need exactly 1 arg to gunzip()", -1);
225 return;
226 }
227 data unpacked;
228 const char *val = (const char*) sqlite3_value_blob(args[0]);
229 int bytes = sqlite3_value_bytes(args[0]);
230 decode_gzip(gzip<data>(string(val,val+bytes)), unpacked);
231 sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT);
232}
233
234void
235database::set_app(app_state * app)
236{
237 __app = app;
238}
239
240static void
241check_sqlite_format_version(system_path const & filename)
242{
243 // sqlite 3 files begin with this constant string
244 // (version 2 files begin with a different one)
245 string version_string("SQLite format 3");
246
247 ifstream file(filename.as_external().c_str());
248 N(file, F("unable to probe database version in file %s") % filename);
249
250 for (string::const_iterator i = version_string.begin();
251 i != version_string.end(); ++i)
252 {
253 char c;
254 file.get(c);
255 N(c == *i, F("database %s is not an sqlite version 3 file, "
256 "try dump and reload") % filename);
257 }
258}
259
260
261static void
262assert_sqlite3_ok(sqlite3 *s)
263{
264 int errcode = sqlite3_errcode(s);
265
266 if (errcode == SQLITE_OK) return;
267
268 const char * errmsg = sqlite3_errmsg(s);
269
270 // sometimes sqlite is not very helpful
271 // so we keep a table of errors people have gotten and more helpful versions
272 if (errcode != SQLITE_OK)
273 {
274 // first log the code so we can find _out_ what the confusing code
275 // was... note that code does not uniquely identify the errmsg, unlike
276 // errno's.
277 L(FL("sqlite error: %d: %s") % errcode % errmsg);
278 }
279 // note: if you update this, try to keep calculate_schema_id() in
280 // schema_migration.cc consistent.
281 string auxiliary_message = "";
282 if (errcode == SQLITE_ERROR)
283 {
284 auxiliary_message += _("make sure database and containing directory are writeable\n"
285 "and you have not run out of disk space");
286 }
287 // if the last message is empty, the \n will be stripped off too
288 E(errcode == SQLITE_OK,
289 // kind of string surgery to avoid ~duplicate strings
290 F("sqlite error: %s\n%s") % errmsg % auxiliary_message);
291}
292
293struct sqlite3 *
294database::sql(bool init, bool migrating_format)
295{
296 if (! __sql)
297 {
298 check_filename();
299
300 if (! init)
301 {
302 check_db_exists();
303 check_sqlite_format_version(filename);
304 }
305
306 open();
307
308 if (init)
309 {
310 sqlite3_exec(__sql, schema_constant, NULL, NULL, NULL);
311 assert_sqlite3_ok(__sql);
312 }
313
314 check_schema();
315 install_functions(__app);
316
317 if (!migrating_format)
318 check_format();
319 }
320 else
321 {
322 I(!init);
323 I(!migrating_format);
324 }
325 return __sql;
326}
327
328void
329database::initialize()
330{
331 if (__sql)
332 throw oops("cannot initialize database while it is open");
333
334 require_path_is_nonexistent(filename,
335 F("could not initialize database: %s: already exists")
336 % filename);
337
338 system_path journal(filename.as_internal() + "-journal");
339 require_path_is_nonexistent(journal,
340 F("existing (possibly stale) journal file '%s' "
341 "has same stem as new database '%s'\n"
342 "cancelling database creation")
343 % journal % filename);
344
345 sqlite3 *s = sql(true);
346 I(s != NULL);
347}
348
349
350struct
351dump_request
352{
353 dump_request() : sql(), out() {};
354 struct sqlite3 *sql;
355 ostream *out;
356};
357
358static void
359dump_row(ostream &out, sqlite3_stmt *stmt, string const& table_name)
360{
361 out << FL("INSERT INTO %s VALUES(") % table_name;
362 unsigned n = sqlite3_data_count(stmt);
363 for (unsigned i = 0; i < n; ++i)
364 {
365 if (i != 0)
366 out << ',';
367
368 if (sqlite3_column_type(stmt, i) == SQLITE_BLOB)
369 {
370 out << "X'";
371 const char *val = (const char*) sqlite3_column_blob(stmt, i);
372 int bytes = sqlite3_column_bytes(stmt, i);
373 out << encode_hexenc(string(val,val+bytes));
374 out << "'";
375 }
376 else
377 {
378 const unsigned char *val = sqlite3_column_text(stmt, i);
379 if (val == NULL)
380 out << "NULL";
381 else
382 {
383 out << "'";
384 for (const unsigned char *cp = val; *cp; ++cp)
385 {
386 if (*cp == '\'')
387 out << "''";
388 else
389 out << *cp;
390 }
391 out << "'";
392 }
393 }
394 }
395 out << ");\n";
396}
397
398static int
399dump_table_cb(void *data, int n, char **vals, char **cols)
400{
401 dump_request *dump = reinterpret_cast<dump_request *>(data);
402 I(dump != NULL);
403 I(dump->sql != NULL);
404 I(vals != NULL);
405 I(vals[0] != NULL);
406 I(vals[1] != NULL);
407 I(vals[2] != NULL);
408 I(n == 3);
409 I(string(vals[1]) == "table");
410 *(dump->out) << vals[2] << ";\n";
411 string table_name(vals[0]);
412 string query = "SELECT * FROM " + table_name;
413 sqlite3_stmt *stmt = 0;
414 sqlite3_prepare(dump->sql, query.c_str(), -1, &stmt, NULL);
415 assert_sqlite3_ok(dump->sql);
416
417 int stepresult = SQLITE_DONE;
418 do
419 {
420 stepresult = sqlite3_step(stmt);
421 I(stepresult == SQLITE_DONE || stepresult == SQLITE_ROW);
422 if (stepresult == SQLITE_ROW)
423 dump_row(*(dump->out), stmt, table_name);
424 }
425 while (stepresult == SQLITE_ROW);
426
427 sqlite3_finalize(stmt);
428 assert_sqlite3_ok(dump->sql);
429 return 0;
430}
431
432static int
433dump_index_cb(void *data, int n, char **vals, char **cols)
434{
435 dump_request *dump = reinterpret_cast<dump_request *>(data);
436 I(dump != NULL);
437 I(dump->sql != NULL);
438 I(vals != NULL);
439 I(vals[0] != NULL);
440 I(vals[1] != NULL);
441 I(vals[2] != NULL);
442 I(n == 3);
443 I(string(vals[1]) == "index");
444 *(dump->out) << vals[2] << ";\n";
445 return 0;
446}
447
448void
449database::dump(ostream & out)
450{
451 // don't care about schema checking etc.
452 check_filename();
453 check_db_exists();
454 open();
455 {
456 transaction_guard guard(*this);
457 dump_request req;
458 req.out = &out;
459 req.sql = sql();
460 out << "BEGIN EXCLUSIVE;\n";
461 int res;
462 res = sqlite3_exec(req.sql,
463 "SELECT name, type, sql FROM sqlite_master "
464 "WHERE type='table' AND sql NOT NULL "
465 "AND name not like 'sqlite_stat%' "
466 "ORDER BY name",
467 dump_table_cb, &req, NULL);
468 assert_sqlite3_ok(req.sql);
469 res = sqlite3_exec(req.sql,
470 "SELECT name, type, sql FROM sqlite_master "
471 "WHERE type='index' AND sql NOT NULL "
472 "ORDER BY name",
473 dump_index_cb, &req, NULL);
474 assert_sqlite3_ok(req.sql);
475 out << "COMMIT;\n";
476 guard.commit();
477 }
478 close();
479}
480
481void
482database::load(istream & in)
483{
484 string line;
485 string sql_stmt;
486
487 check_filename();
488
489 require_path_is_nonexistent(filename,
490 F("cannot create %s; it already exists") % filename);
491
492 open();
493
494 // the page size can only be set before any other commands have been executed
495 sqlite3_exec(__sql, "PRAGMA page_size=8192", NULL, NULL, NULL);
496
497 while(in)
498 {
499 getline(in, line, ';');
500 sql_stmt += line + ';';
501
502 if (sqlite3_complete(sql_stmt.c_str()))
503 {
504 sqlite3_exec(__sql, sql_stmt.c_str(), NULL, NULL, NULL);
505 sql_stmt.clear();
506 }
507 }
508
509 assert_sqlite3_ok(__sql);
510 execute(query("ANALYZE"));
511}
512
513
514void
515database::debug(string const & sql, ostream & out)
516{
517 results res;
518 fetch(res, any_cols, any_rows, query(sql));
519 out << "'" << sql << "' -> " << res.size() << " rows\n" << endl;
520 for (size_t i = 0; i < res.size(); ++i)
521 {
522 for (size_t j = 0; j < res[i].size(); ++j)
523 {
524 if (j != 0)
525 out << " | ";
526 out << res[i][j];
527 }
528 out << endl;
529 }
530}
531
532
533namespace
534{
535 unsigned long
536 add(unsigned long count, unsigned long & total)
537 {
538 total += count;
539 return count;
540 }
541}
542
543void
544database::info(ostream & out)
545{
546 string id;
547 calculate_schema_id(sql(), id);
548
549 unsigned long total = 0UL;
550
551 u64 num_nodes;
552 {
553 results res;
554 fetch(res, one_col, any_rows, query("SELECT node FROM next_roster_node_number"));
555 if (res.empty())
556 num_nodes = 0;
557 else
558 {
559 I(res.size() == 1);
560 num_nodes = lexical_cast<u64>(res[0][0]) - 1;
561 }
562 }
563
564#define SPACE_USAGE(TABLE, COLS) add(space_usage(TABLE, COLS), total)
565
566 out << \
567 F("schema version : %s\n"
568 "counts:\n"
569 " full rosters : %u\n"
570 " roster deltas : %u\n"
571 " full files : %u\n"
572 " file deltas : %u\n"
573 " revisions : %u\n"
574 " ancestry edges : %u\n"
575 " certs : %u\n"
576 " logical files : %u\n"
577 "bytes:\n"
578 " full rosters : %u\n"
579 " roster deltas : %u\n"
580 " full files : %u\n"
581 " file deltas : %u\n"
582 " revisions : %u\n"
583 " cached ancestry : %u\n"
584 " certs : %u\n"
585 " total : %u\n"
586 "database:\n"
587 " page size : %u\n"
588 " cache size : %u\n"
589 )
590 % id
591 // counts
592 % count("rosters")
593 % count("roster_deltas")
594 % count("files")
595 % count("file_deltas")
596 % count("revisions")
597 % count("revision_ancestry")
598 % count("revision_certs")
599 % num_nodes
600 // bytes
601 % SPACE_USAGE("rosters", "length(id) + length(data)")
602 % SPACE_USAGE("roster_deltas", "length(id) + length(base) + length(delta)")
603 % SPACE_USAGE("files", "length(id) + length(data)")
604 % SPACE_USAGE("file_deltas", "length(id) + length(base) + length(delta)")
605 % SPACE_USAGE("revisions", "length(id) + length(data)")
606 % SPACE_USAGE("revision_ancestry", "length(parent) + length(child)")
607 % SPACE_USAGE("revision_certs", "length(hash) + length(id) + length(name)"
608 " + length(value) + length(keypair) + length(signature)")
609 % total
610 % page_size()
611 % cache_size();
612
613#undef SPACE_USAGE
614}
615
616void
617database::version(ostream & out)
618{
619 string id;
620
621 check_filename();
622 check_db_exists();
623 open();
624
625 calculate_schema_id(__sql, id);
626
627 close();
628
629 out << F("database schema version: %s") % id << endl;
630}
631
632void
633database::migrate()
634{
635 check_filename();
636 check_db_exists();
637 open();
638
639 migrate_monotone_schema(__sql, __app);
640
641 close();
642}
643
644void
645database::ensure_open()
646{
647 sqlite3 *s = sql();
648 I(s != NULL);
649}
650
651void
652database::ensure_open_for_format_changes()
653{
654 sqlite3 *s = sql(false, true);
655 I(s != NULL);
656}
657
658database::~database()
659{
660 L(FL("statement cache statistics"));
661 L(FL("prepared %d statements") % statement_cache.size());
662
663 for (map<string, statement>::const_iterator i = statement_cache.begin();
664 i != statement_cache.end(); ++i)
665 L(FL("%d executions of %s") % i->second.count % i->first);
666 // trigger destructors to finalize cached statements
667 statement_cache.clear();
668
669 close();
670}
671
672void
673database::execute(query const & query)
674{
675 results res;
676 fetch(res, 0, 0, query);
677}
678
679void
680database::fetch(results & res,
681 int const want_cols,
682 int const want_rows,
683 query const & query)
684{
685 int nrow;
686 int ncol;
687 int rescode;
688
689 res.clear();
690 res.resize(0);
691
692 map<string, statement>::iterator i = statement_cache.find(query.sql_cmd);
693 if (i == statement_cache.end())
694 {
695 statement_cache.insert(make_pair(query.sql_cmd, statement()));
696 i = statement_cache.find(query.sql_cmd);
697 I(i != statement_cache.end());
698
699 const char * tail;
700 sqlite3_prepare(sql(), query.sql_cmd.c_str(), -1, i->second.stmt.paddr(), &tail);
701 assert_sqlite3_ok(sql());
702 L(FL("prepared statement %s") % query.sql_cmd);
703
704 // no support for multiple statements here
705 E(*tail == 0,
706 F("multiple statements in query: %s\n") % query.sql_cmd);
707 }
708
709 ncol = sqlite3_column_count(i->second.stmt());
710
711 E(want_cols == any_cols || want_cols == ncol,
712 F("wanted %d columns got %d in query: %s") % want_cols % ncol % query.sql_cmd);
713
714 // bind parameters for this execution
715
716 int params = sqlite3_bind_parameter_count(i->second.stmt());
717
718 // Ensure that exactly the right number of parameters were given
719 I(params == int(query.args.size()));
720
721 // profiling finds this logging to be quite expensive
722 if (global_sanity.debug)
723 L(FL("binding %d parameters for %s") % params % query.sql_cmd);
724
725 for (int param = 1; param <= params; param++)
726 {
727 // profiling finds this logging to be quite expensive
728 if (global_sanity.debug)
729 {
730 string log = query.args[param-1].data;
731
732 if (log.size() > constants::log_line_sz)
733 log = log.substr(0, constants::log_line_sz);
734
735 L(FL("binding %d with value '%s'") % param % log);
736 }
737
738 switch (idx(query.args, param - 1).type)
739 {
740 case query_param::text:
741 sqlite3_bind_text(i->second.stmt(), param,
742 idx(query.args, param - 1).data.c_str(), -1,
743 SQLITE_STATIC);
744 break;
745 case query_param::blob:
746 {
747 string const & data = idx(query.args, param - 1).data;
748 sqlite3_bind_blob(i->second.stmt(), param,
749 data.data(), data.size(),
750 SQLITE_STATIC);
751 }
752 break;
753 default:
754 I(false);
755 }
756
757 assert_sqlite3_ok(sql());
758 }
759
760 // execute and process results
761
762 nrow = 0;
763 for (rescode = sqlite3_step(i->second.stmt()); rescode == SQLITE_ROW;
764 rescode = sqlite3_step(i->second.stmt()))
765 {
766 vector<string> row;
767 for (int col = 0; col < ncol; col++)
768 {
769 const char * value = (const char*)sqlite3_column_blob(i->second.stmt(), col);
770 int bytes = sqlite3_column_bytes(i->second.stmt(), col);
771 E(value, F("null result in query: %s") % query.sql_cmd);
772 row.push_back(string(value, value + bytes));
773 //L(FL("row %d col %d value='%s'") % nrow % col % value);
774 }
775 res.push_back(row);
776 }
777
778 if (rescode != SQLITE_DONE)
779 assert_sqlite3_ok(sql());
780
781 sqlite3_reset(i->second.stmt());
782 assert_sqlite3_ok(sql());
783
784 nrow = res.size();
785
786 i->second.count++;
787
788 E(want_rows == any_rows || want_rows == nrow,
789 F("wanted %d rows got %d in query: %s") % want_rows % nrow % query.sql_cmd);
790}
791
792// general application-level logic
793
794void
795database::set_filename(system_path const & file)
796{
797 I(!__sql);
798 filename = file;
799}
800
801void
802database::begin_transaction(bool exclusive)
803{
804 if (transaction_level == 0)
805 {
806 I(pending_writes.empty());
807 if (exclusive)
808 execute(query("BEGIN EXCLUSIVE"));
809 else
810 execute(query("BEGIN DEFERRED"));
811 transaction_exclusive = exclusive;
812 }
813 else
814 {
815 // You can't start an exclusive transaction within a non-exclusive
816 // transaction
817 I(!exclusive || transaction_exclusive);
818 }
819 transaction_level++;
820}
821
822
823bool
824database::have_pending_write(string const & tab, hexenc<id> const & id)
825{
826 return pending_writes.find(make_pair(tab, id)) != pending_writes.end();
827}
828
829void
830database::load_pending_write(string const & tab, hexenc<id> const & id, data & dat)
831{
832 dat = safe_get(pending_writes, make_pair(tab, id));
833}
834
835void
836database::cancel_pending_write(string const & tab, hexenc<id> const & an_id)
837{
838 std::map<std::pair<std::string, hexenc<id> >, data>::const_iterator i =
839 pending_writes.find(make_pair(tab, an_id));
840 if (i != pending_writes.end()) {
841 unsigned long cancel_size = tab.size() + id()().size() + i->second().size();
842 I(pending_writes_size > cancel_size);
843 pending_writes_size -= cancel_size;
844 }
845
846 safe_erase(pending_writes, make_pair(tab, an_id));
847}
848
849void
850database::schedule_write(string const & tab,
851 hexenc<id> const & an_id,
852 data const & dat)
853{
854 if (!have_pending_write(tab, an_id)) {
855 safe_insert(pending_writes, make_pair(make_pair(tab, an_id), dat));
856 pending_writes_size += tab.size() + an_id().size() + dat().size();
857 }
858 if (pending_writes_size > constants::db_max_pending_writes_bytes) {
859 for (map<pair<string, hexenc<id> >, data>::const_iterator i = pending_writes.begin();
860 i != pending_writes.end(); ++i)
861 {
862 put(i->first.second, i->second, i->first.first);
863 }
864 pending_writes.clear();
865 pending_writes_size = 0;
866 }
867}
868
869void
870database::commit_transaction()
871{
872 if (transaction_level == 1)
873 {
874 for (map<pair<string, hexenc<id> >, data>::const_iterator i = pending_writes.begin();
875 i != pending_writes.end(); ++i)
876 {
877 put(i->first.second, i->second, i->first.first);
878 }
879 pending_writes.clear();
880 pending_writes_size = 0;
881 execute(query("COMMIT"));
882 }
883 transaction_level--;
884}
885
886void
887database::rollback_transaction()
888{
889 if (transaction_level == 1)
890 {
891 pending_writes.clear();
892 pending_writes_size = 0;
893 execute(query("ROLLBACK"));
894 }
895 transaction_level--;
896}
897
898
899bool
900database::exists(hexenc<id> const & ident,
901 string const & table)
902{
903 if (have_pending_write(table, ident))
904 return true;
905
906 results res;
907 query q("SELECT id FROM " + table + " WHERE id = ?");
908 fetch(res, one_col, any_rows, q % text(ident()));
909 I((res.size() == 1) || (res.size() == 0));
910 return res.size() == 1;
911}
912
913
914bool
915database::delta_exists(hexenc<id> const & ident,
916 string const & table)
917{
918 results res;
919 query q("SELECT id FROM " + table + " WHERE id = ?");
920 fetch(res, one_col, any_rows, q % text(ident()));
921 return res.size() > 0;
922}
923
924unsigned long
925database::count(string const & table)
926{
927 results res;
928 query q("SELECT COUNT(*) FROM " + table);
929 fetch(res, one_col, one_row, q);
930 return lexical_cast<unsigned long>(res[0][0]);
931}
932
933unsigned long
934database::space_usage(string const & table, string const & rowspace)
935{
936 results res;
937 // COALESCE is required since SUM({empty set}) is NULL.
938 // the sqlite docs for SUM suggest this as a workaround
939 query q("SELECT COALESCE(SUM(" + rowspace + "), 0) FROM " + table);
940 fetch(res, one_col, one_row, q);
941 return lexical_cast<unsigned long>(res[0][0]);
942}
943
944unsigned int
945database::page_size()
946{
947 results res;
948 query q("PRAGMA page_size");
949 fetch(res, one_col, one_row, q);
950 return lexical_cast<unsigned int>(res[0][0]);
951}
952
953unsigned int
954database::cache_size()
955{
956 // This returns the persistent (default) cache size. It's possible to
957 // override this setting transiently at runtime by setting PRAGMA
958 // cache_size.
959 results res;
960 query q("PRAGMA default_cache_size");
961 fetch(res, one_col, one_row, q);
962 return lexical_cast<unsigned int>(res[0][0]);
963}
964
965void
966database::get_ids(string const & table, set< hexenc<id> > & ids)
967{
968 results res;
969 query q("SELECT id FROM " + table);
970 fetch(res, one_col, any_rows, q);
971
972 for (size_t i = 0; i < res.size(); ++i)
973 {
974 ids.insert(hexenc<id>(res[i][0]));
975 }
976}
977
978void
979database::get(hexenc<id> const & ident,
980 data & dat,
981 string const & table)
982{
983 if (have_pending_write(table, ident))
984 {
985 load_pending_write(table, ident, dat);
986 return;
987 }
988
989 results res;
990 query q("SELECT data FROM " + table + " WHERE id = ?");
991 fetch(res, one_col, one_row, q % text(ident()));
992
993 // consistency check
994 gzip<data> rdata(res[0][0]);
995 data rdata_unpacked;
996 decode_gzip(rdata,rdata_unpacked);
997
998 hexenc<id> tid;
999 calculate_ident(rdata_unpacked, tid);
1000 I(tid == ident);
1001
1002 dat = rdata_unpacked;
1003}
1004
1005void
1006database::get_delta(hexenc<id> const & ident,
1007 hexenc<id> const & base,
1008 delta & del,
1009 string const & table)
1010{
1011 I(ident() != "");
1012 I(base() != "");
1013 results res;
1014 query q("SELECT delta FROM " + table + " WHERE id = ? AND base = ?");
1015 fetch(res, one_col, one_row, q % text(ident()) % text(base()));
1016
1017 gzip<delta> del_packed(res[0][0]);
1018 decode_gzip(del_packed, del);
1019}
1020
1021void
1022database::put(hexenc<id> const & ident,
1023 data const & dat,
1024 string const & table)
1025{
1026 // consistency check
1027 I(ident() != "");
1028 hexenc<id> tid;
1029 calculate_ident(dat, tid);
1030 MM(ident);
1031 MM(tid);
1032 I(tid == ident);
1033
1034 gzip<data> dat_packed;
1035 encode_gzip(dat, dat_packed);
1036
1037 string insert = "INSERT INTO " + table + " VALUES(?, ?)";
1038 execute(query(insert)
1039 % text(ident())
1040 % blob(dat_packed()));
1041}
1042void
1043database::put_delta(hexenc<id> const & ident,
1044 hexenc<id> const & base,
1045 delta const & del,
1046 string const & table)
1047{
1048 // nb: delta schema is (id, base, delta)
1049 I(ident() != "");
1050 I(base() != "");
1051
1052 gzip<delta> del_packed;
1053 encode_gzip(del, del_packed);
1054
1055 string insert = "INSERT INTO "+table+" VALUES(?, ?, ?)";
1056 execute(query(insert)
1057 % text(ident())
1058 % text(base())
1059 % blob(del_packed()));
1060}
1061
1062// static ticker cache_hits("vcache hits", "h", 1);
1063
1064struct datasz
1065{
1066 unsigned long operator()(data const & t) { return t().size(); }
1067};
1068
1069static LRUCache<hexenc<id>, data, datasz>
1070vcache(constants::db_version_cache_sz);
1071
1072void
1073database::set_vcache_max_size()
1074{
1075 int vcache_size = this->__app->lua.hook_get_vcache_size();
1076 if (vcache_size < 1000000) {
1077 throw oops("Disallowing vcache size < 1000000; doesn't make sense");
1078 }
1079 vcache.set_max_size(vcache_size);
1080}
1081
1082typedef vector< hexenc<id> > version_path;
1083
1084static void
1085extend_path_if_not_cycle(string table_name,
1086 shared_ptr<version_path> p,
1087 hexenc<id> const & ext,
1088 set< hexenc<id> > & seen_nodes,
1089 vector< shared_ptr<version_path> > & next_paths)
1090{
1091 for (version_path::const_iterator i = p->begin(); i != p->end(); ++i)
1092 {
1093 if ((*i)() == ext())
1094 throw oops("cycle in table '" + table_name + "', at node "
1095 + (*i)() + " <- " + ext());
1096 }
1097
1098 if (seen_nodes.find(ext) == seen_nodes.end())
1099 {
1100 p->push_back(ext);
1101 next_paths.push_back(p);
1102 seen_nodes.insert(ext);
1103 }
1104}
1105
1106void
1107database::get_version(hexenc<id> const & ident,
1108 data & dat,
1109 string const & data_table,
1110 string const & delta_table)
1111{
1112 I(ident() != "");
1113 if (vcache.fetch(ident, dat))
1114 {
1115 return;
1116 }
1117 else if (exists(ident, data_table))
1118 {
1119 // easy path
1120 get(ident, dat, data_table);
1121 }
1122 else
1123 {
1124 // tricky path
1125
1126 // we start from the file we want to reconstruct and work *forwards*
1127 // through the database, until we get to a full data object. we then
1128 // trace back through the list of edges we followed to get to the data
1129 // object, applying reverse deltas.
1130 //
1131 // the effect of this algorithm is breadth-first search, backwards
1132 // through the storage graph, to discover a forwards shortest path, and
1133 // then following that shortest path with delta application.
1134 //
1135 // we used to do this with the boost graph library, but it invovled
1136 // loading too much of the storage graph into memory at any moment. this
1137 // imperative version only loads the descendents of the reconstruction
1138 // node, so it much cheaper in terms of memory.
1139 //
1140 // we also maintain a cycle-detecting set, just to be safe
1141
1142 L(FL("reconstructing %s in %s") % ident % delta_table);
1143 I(delta_exists(ident, delta_table));
1144
1145 // Our reconstruction algorithm involves keeping a set of parallel
1146 // linear paths, starting from ident, moving forward through the
1147 // storage DAG until we hit a storage root.
1148 //
1149 // On each iteration, we extend every active path by one step. If our
1150 // extension involves a fork, we duplicate the path. If any path
1151 // contains a cycle, we fault.
1152 //
1153 // If, by extending a path C, we enter a node which another path
1154 // D has already seen, we kill path C. This avoids the possibility of
1155 // exponential growth in the number of paths due to extensive forking
1156 // and merging.
1157
1158 vector< shared_ptr<version_path> > live_paths;
1159
1160 string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?";
1161
1162 {
1163 shared_ptr<version_path> pth0 = shared_ptr<version_path>(new version_path());
1164 pth0->push_back(ident);
1165 live_paths.push_back(pth0);
1166 }
1167
1168 shared_ptr<version_path> selected_path;
1169 set< hexenc<id> > seen_nodes;
1170
1171 while (!selected_path)
1172 {
1173 vector< shared_ptr<version_path> > next_paths;
1174
1175 for (vector<shared_ptr<version_path> >::const_iterator i = live_paths.begin();
1176 i != live_paths.end(); ++i)
1177 {
1178 shared_ptr<version_path> pth = *i;
1179 hexenc<id> tip = pth->back();
1180
1181 if (vcache.exists(tip) || exists(tip, data_table))
1182 {
1183 selected_path = pth;
1184 break;
1185 }
1186 else
1187 {
1188 // This tip is not a root, so extend the path.
1189 results res;
1190 fetch(res, one_col, any_rows,
1191 query(delta_query)
1192 % text(tip()));
1193
1194 I(res.size() != 0);
1195
1196 // Replicate the path if there's a fork.
1197 for (size_t k = 1; k < res.size(); ++k)
1198 {
1199 shared_ptr<version_path> pthN
1200 = shared_ptr<version_path>(new version_path(*pth));
1201 extend_path_if_not_cycle(delta_table, pthN,
1202 hexenc<id>(res[k][0]),
1203 seen_nodes, next_paths);
1204 }
1205
1206 // And extend the base path we're examining.
1207 extend_path_if_not_cycle(delta_table, pth,
1208 hexenc<id>(res[0][0]),
1209 seen_nodes, next_paths);
1210 }
1211 }
1212
1213 I(selected_path || !next_paths.empty());
1214 live_paths = next_paths;
1215 }
1216
1217 // Found a root, now trace it back along the path.
1218
1219 I(selected_path);
1220 I(selected_path->size() > 1);
1221
1222 hexenc<id> curr = selected_path->back();
1223 selected_path->pop_back();
1224 data begin;
1225
1226 if (vcache.exists(curr))
1227 {
1228 I(vcache.fetch(curr, begin));
1229 }
1230 else
1231 {
1232 get(curr, begin, data_table);
1233 }
1234
1235 shared_ptr<delta_applicator> app = new_piecewise_applicator();
1236 app->begin(begin());
1237
1238 for (version_path::reverse_iterator i = selected_path->rbegin();
1239 i != selected_path->rend(); ++i)
1240 {
1241 hexenc<id> const nxt = *i;
1242
1243 if (!vcache.exists(curr))
1244 {
1245 string tmp;
1246 app->finish(tmp);
1247 vcache.insert(curr, tmp);
1248 }
1249
1250 L(FL("following delta %s -> %s") % curr % nxt);
1251 delta del;
1252 get_delta(nxt, curr, del, delta_table);
1253 apply_delta (app, del());
1254
1255 app->next();
1256 curr = nxt;
1257 }
1258
1259 string tmp;
1260 app->finish(tmp);
1261 dat = data(tmp);
1262
1263 if (global_slow_assertions_version_check) {
1264 hexenc<id> final;
1265 calculate_ident(dat, final);
1266 I(final == ident);
1267 }
1268 }
1269 vcache.insert(ident, dat);
1270}
1271
1272
1273void
1274database::drop(hexenc<id> const & ident,
1275 string const & table)
1276{
1277 string drop = "DELETE FROM " + table + " WHERE id = ?";
1278 execute(query(drop) % text(ident()));
1279}
1280
1281void
1282database::put_version(hexenc<id> const & old_id,
1283 hexenc<id> const & new_id,
1284 delta const & del,
1285 string const & data_table,
1286 string const & delta_table)
1287{
1288
1289 data old_data, new_data;
1290 delta reverse_delta;
1291
1292 get_version(old_id, old_data, data_table, delta_table);
1293 patch(old_data, del, new_data);
1294 {
1295 string tmp;
1296 invert_xdelta(old_data(), del(), tmp);
1297 reverse_delta = delta(tmp);
1298 data old_tmp;
1299 hexenc<id> old_tmp_id;
1300 patch(new_data, reverse_delta, old_tmp);
1301 calculate_ident(old_tmp, old_tmp_id);
1302 I(old_tmp_id == old_id);
1303 }
1304
1305 transaction_guard guard(*this);
1306 if (exists(old_id, data_table))
1307 {
1308 // descendent of a head version replaces the head, therefore old head
1309 // must be disposed of
1310 if (have_pending_write(data_table, old_id))
1311 cancel_pending_write(data_table, old_id);
1312 else
1313 drop(old_id, data_table);
1314 }
1315 schedule_write(data_table, new_id, new_data);
1316 put_delta(old_id, new_id, reverse_delta, delta_table);
1317 guard.commit();
1318}
1319
1320void
1321database::remove_version(hexenc<id> const & target_id,
1322 string const & data_table,
1323 string const & delta_table)
1324{
1325 // We have a one of two cases (for multiple 'older' nodes):
1326 //
1327 // 1. pre: older <- target <- newer
1328 // post: older <- newer
1329 //
1330 // 2. pre: older <- target (a root)
1331 // post: older (a root)
1332 //
1333 // In case 1 we want to build new deltas bypassing the target we're
1334 // removing. In case 2 we just promote the older object to a root.
1335
1336 transaction_guard guard(*this);
1337
1338 I(exists(target_id, data_table)
1339 || delta_exists(target_id, delta_table));
1340
1341 map<hexenc<id>, data> older;
1342
1343 {
1344 results res;
1345 query q("SELECT id FROM " + delta_table + " WHERE base = ?");
1346 fetch(res, one_col, any_rows, q % text(target_id()));
1347 for (size_t i = 0; i < res.size(); ++i)
1348 {
1349 hexenc<id> old_id(res[i][0]);
1350 data old_data;
1351 get_version(old_id, old_data, data_table, delta_table);
1352 older.insert(make_pair(old_id, old_data));
1353 }
1354 }
1355
1356 // no deltas are allowed to point to the target.
1357 execute(query("DELETE from " + delta_table + " WHERE base = ?")
1358 % text(target_id()));
1359
1360 if (delta_exists(target_id, delta_table))
1361 {
1362 if (!older.empty())
1363 {
1364 // Case 1: need to re-deltify all the older values against a newer
1365 // member of the delta chain. Doesn't really matter which newer
1366 // element (we have no good heuristic for guessing a good one
1367 // anyways).
1368 hexenc<id> newer_id;
1369 data newer_data;
1370 results res;
1371 query q("SELECT base FROM " + delta_table + " WHERE id = ?");
1372 fetch(res, one_col, any_rows, q % text(target_id()));
1373 I(res.size() > 0);
1374 newer_id = hexenc<id>(res[0][0]);
1375 get_version(newer_id, newer_data, data_table, delta_table);
1376 for (map<hexenc<id>, data>::const_iterator i = older.begin();
1377 i != older.end(); ++i)
1378 {
1379 if (delta_exists(i->first, delta_table))
1380 continue;
1381 delta bypass_delta;
1382 diff(newer_data, i->second, bypass_delta);
1383 put_delta(i->first, newer_id, bypass_delta, delta_table);
1384 }
1385 }
1386 execute(query("DELETE from " + delta_table + " WHERE id = ?")
1387 % text(target_id()));
1388 }
1389 else
1390 {
1391 // Case 2: just plop the older values down as new storage roots.
1392 I(exists(target_id, data_table));
1393 for (map<hexenc<id>, data>::const_iterator i = older.begin();
1394 i != older.end(); ++i)
1395 {
1396 if (!exists(i->first, data_table))
1397 put(i->first, i->second, data_table);
1398 }
1399 execute(query("DELETE from " + data_table + " WHERE id = ?")
1400 % text(target_id()));
1401 }
1402
1403 guard.commit();
1404}
1405
1406
1407// ------------------------------------------------------------
1408// -- --
1409// -- public interface follows --
1410// -- --
1411// ------------------------------------------------------------
1412
1413bool
1414database::file_version_exists(file_id const & id)
1415{
1416 return delta_exists(id.inner(), "file_deltas")
1417 || exists(id.inner(), "files");
1418}
1419
1420bool
1421database::roster_version_exists(roster_id const & id)
1422{
1423 return delta_exists(id.inner(), "roster_deltas")
1424 || exists(id.inner(), "rosters");
1425}
1426
1427bool
1428database::revision_exists(revision_id const & id)
1429{
1430 return exists(id.inner(), "revisions");
1431}
1432
1433bool
1434database::roster_link_exists_for_revision(revision_id const & rev_id)
1435{
1436 results res;
1437 fetch(res, one_col, any_rows,
1438 query("SELECT roster_id FROM revision_roster WHERE rev_id = ? ")
1439 % text(rev_id.inner()()));
1440 I((res.size() == 1) || (res.size() == 0));
1441 return res.size() == 1;
1442}
1443
1444bool
1445database::roster_exists_for_revision(revision_id const & rev_id)
1446{
1447 results res;
1448 fetch(res, one_col, any_rows,
1449 query("SELECT roster_id FROM revision_roster WHERE rev_id = ? ")
1450 % text(rev_id.inner()()));
1451 I((res.size() == 1) || (res.size() == 0));
1452 return (res.size() == 1) && roster_version_exists(roster_id(res[0][0]));
1453}
1454
1455void
1456database::get_roster_links(map<revision_id, roster_id> & links)
1457{
1458 links.clear();
1459 results res;
1460 fetch(res, 2, any_rows, query("SELECT rev_id, roster_id FROM revision_roster"));
1461 for (size_t i = 0; i < res.size(); ++i)
1462 {
1463 links.insert(make_pair(revision_id(res[i][0]),
1464 roster_id(res[i][1])));
1465 }
1466}
1467
1468void
1469database::get_file_ids(set<file_id> & ids)
1470{
1471 ids.clear();
1472 set< hexenc<id> > tmp;
1473 get_ids("files", tmp);
1474 get_ids("file_deltas", tmp);
1475 ids.insert(tmp.begin(), tmp.end());
1476}
1477
1478void
1479database::get_revision_ids(set<revision_id> & ids)
1480{
1481 ids.clear();
1482 set< hexenc<id> > tmp;
1483 get_ids("revisions", tmp);
1484 ids.insert(tmp.begin(), tmp.end());
1485}
1486
1487void
1488database::get_roster_ids(set<roster_id> & ids)
1489{
1490 ids.clear();
1491 set< hexenc<id> > tmp;
1492 get_ids("rosters", tmp);
1493 get_ids("roster_deltas", tmp);
1494 ids.insert(tmp.begin(), tmp.end());
1495}
1496
1497void
1498database::get_file_version(file_id const & id,
1499 file_data & dat)
1500{
1501 data tmp;
1502 get_version(id.inner(), tmp, "files", "file_deltas");
1503 dat = tmp;
1504}
1505
1506void
1507database::get_manifest_version(manifest_id const & id,
1508 manifest_data & dat)
1509{
1510 data tmp;
1511 get_version(id.inner(), tmp, "manifests", "manifest_deltas");
1512 dat = tmp;
1513}
1514
1515void
1516database::get_roster_version(roster_id const & id,
1517 roster_data & dat)
1518{
1519 data tmp;
1520 get_version(id.inner(), tmp, "rosters", "roster_deltas");
1521 dat = tmp;
1522}
1523
1524void
1525database::put_file(file_id const & id,
1526 file_data const & dat)
1527{
1528 schedule_write("files", id.inner(), dat.inner());
1529}
1530
1531void
1532database::put_file_version(file_id const & old_id,
1533 file_id const & new_id,
1534 file_delta const & del)
1535{
1536 put_version(old_id.inner(), new_id.inner(), del.inner(),
1537 "files", "file_deltas");
1538}
1539
1540void
1541database::get_arbitrary_file_delta(file_id const & src_id,
1542 file_id const & dst_id,
1543 file_delta & del)
1544{
1545 delta dtmp;
1546 // Deltas stored in the database go from base -> id.
1547 results res;
1548 query q1("SELECT delta FROM file_deltas "
1549 "WHERE base = ? AND id = ?");
1550 fetch(res, one_col, any_rows,
1551 q1 % text(src_id.inner()()) % text(dst_id.inner()()));
1552
1553 if (!res.empty())
1554 {
1555 // Exact hit: a plain delta from src -> dst.
1556 gzip<delta> del_packed(res[0][0]);
1557 decode_gzip(del_packed, dtmp);
1558 del = file_delta(dtmp);
1559 return;
1560 }
1561
1562 query q2("SELECT delta FROM file_deltas "
1563 "WHERE id = ? AND base = ?");
1564 fetch(res, one_col, any_rows,
1565 q2 % text(dst_id.inner()()) % text(src_id.inner()()));
1566
1567 if (!res.empty())
1568 {
1569 // We have a delta from dst -> src; we need to
1570 // invert this to a delta from src -> dst.
1571 gzip<delta> del_packed(res[0][0]);
1572 decode_gzip(del_packed, dtmp);
1573 string fwd_delta;
1574 file_data dst;
1575 get_file_version(dst_id, dst);
1576 invert_xdelta(dst.inner()(), dtmp(), fwd_delta);
1577 del = file_delta(fwd_delta);
1578 return;
1579 }
1580
1581 // No deltas of use; just load both versions and diff.
1582 file_data fd1, fd2;
1583 get_file_version(src_id, fd1);
1584 get_file_version(dst_id, fd2);
1585 diff(fd1.inner(), fd2.inner(), dtmp);
1586 del = file_delta(dtmp);
1587}
1588
1589
1590void
1591database::get_revision_ancestry(multimap<revision_id, revision_id> & graph)
1592{
1593 results res;
1594 graph.clear();
1595 fetch(res, 2, any_rows,
1596 query("SELECT parent,child FROM revision_ancestry"));
1597 for (size_t i = 0; i < res.size(); ++i)
1598 graph.insert(make_pair(revision_id(res[i][0]),
1599 revision_id(res[i][1])));
1600}
1601
1602void
1603database::get_revision_parents(revision_id const & id,
1604 set<revision_id> & parents)
1605{
1606 I(!null_id(id));
1607 results res;
1608 parents.clear();
1609 fetch(res, one_col, any_rows,
1610 query("SELECT parent FROM revision_ancestry WHERE child = ?")
1611 % text(id.inner()()));
1612 for (size_t i = 0; i < res.size(); ++i)
1613 parents.insert(revision_id(res[i][0]));
1614}
1615
1616void
1617database::get_revision_children(revision_id const & id,
1618 set<revision_id> & children)
1619{
1620 results res;
1621 children.clear();
1622 fetch(res, one_col, any_rows,
1623 query("SELECT child FROM revision_ancestry WHERE parent = ?")
1624 % text(id.inner()()));
1625 for (size_t i = 0; i < res.size(); ++i)
1626 children.insert(revision_id(res[i][0]));
1627}
1628
1629void
1630database::get_revision_manifest(revision_id const & rid,
1631 manifest_id & mid)
1632{
1633 revision_t rev;
1634 get_revision(rid, rev);
1635 mid = rev.new_manifest;
1636}
1637
1638void
1639database::get_revision(revision_id const & id,
1640 revision_t & rev)
1641{
1642 revision_data d;
1643 get_revision(id, d);
1644 read_revision(d, rev);
1645}
1646
1647void
1648database::get_revision(revision_id const & id,
1649 revision_data & dat)
1650{
1651 I(!null_id(id));
1652 results res;
1653 fetch(res, one_col, one_row,
1654 query("SELECT data FROM revisions WHERE id = ?")
1655 % text(id.inner()()));
1656
1657 gzip<data> gzdata(res[0][0]);
1658 data rdat;
1659 decode_gzip(gzdata,rdat);
1660
1661 // verify that we got a revision with the right id
1662 {
1663 revision_id tmp;
1664 calculate_ident(rdat, tmp);
1665 I(id == tmp);
1666 }
1667
1668 dat = rdat;
1669}
1670
1671void
1672database::deltify_revision(revision_id const & rid)
1673{
1674 transaction_guard guard(*this);
1675 revision_t rev;
1676 MM(rev);
1677 MM(rid);
1678 get_revision(rid, rev);
1679 // Make sure that all parent revs have their files replaced with deltas
1680 // from this rev's files.
1681 {
1682 for (edge_map::const_iterator i = rev.edges.begin();
1683 i != rev.edges.end(); ++i)
1684 {
1685 for (map<split_path, pair<file_id, file_id> >::const_iterator
1686 j = edge_changes(i).deltas_applied.begin();
1687 j != edge_changes(i).deltas_applied.end(); ++j)
1688 {
1689 if (exists(delta_entry_src(j).inner(), "files") &&
1690 file_version_exists(delta_entry_dst(j)))
1691 {
1692 file_data old_data;
1693 file_data new_data;
1694 get_file_version(delta_entry_src(j), old_data);
1695 get_file_version(delta_entry_dst(j), new_data);
1696 delta delt;
1697 diff(old_data.inner(), new_data.inner(), delt);
1698 file_delta del(delt);
1699 drop(delta_entry_dst(j).inner(), "files");
1700 drop(delta_entry_dst(j).inner(), "file_deltas");
1701 put_file_version(delta_entry_src(j), delta_entry_dst(j), del);
1702 }
1703 }
1704 }
1705 }
1706 guard.commit();
1707}
1708
1709
1710void
1711database::put_revision(revision_id const & new_id,
1712 revision_t const & rev)
1713{
1714 MM(new_id);
1715 MM(rev);
1716
1717 I(!null_id(new_id));
1718 I(!revision_exists(new_id));
1719
1720 rev.check_sane();
1721 revision_data d;
1722 MM(d.inner());
1723 write_revision(rev, d);
1724
1725 // Phase 1: confirm the revision makes sense
1726 {
1727 revision_id tmp;
1728 MM(tmp);
1729 calculate_ident(d, tmp);
1730 I(tmp == new_id);
1731 }
1732
1733 transaction_guard guard(*this);
1734
1735 // Phase 2: construct a new roster and sanity-check its manifest_id
1736 // against the manifest_id of the revision you're writing.
1737 roster_t ros;
1738 marking_map mm;
1739 {
1740 manifest_id roster_manifest_id;
1741 MM(roster_manifest_id);
1742 make_roster_for_revision(rev, new_id, ros, mm, *__app);
1743 calculate_ident(ros, roster_manifest_id);
1744 I(rev.new_manifest == roster_manifest_id);
1745 }
1746
1747 // Phase 3: Write the revision data
1748
1749 gzip<data> d_packed;
1750 encode_gzip(d.inner(), d_packed);
1751 execute(query("INSERT INTO revisions VALUES(?, ?)")
1752 % text(new_id.inner()())
1753 % blob(d_packed()));
1754
1755 for (edge_map::const_iterator e = rev.edges.begin();
1756 e != rev.edges.end(); ++e)
1757 {
1758 execute(query("INSERT INTO revision_ancestry VALUES(?, ?)")
1759 % text(edge_old_revision(e).inner()())
1760 % text(new_id.inner()()));
1761 }
1762
1763 deltify_revision(new_id);
1764
1765 // Phase 4: write the roster data and commit
1766 put_roster(new_id, ros, mm);
1767
1768 guard.commit();
1769}
1770
1771
1772void
1773database::put_revision(revision_id const & new_id,
1774 revision_data const & dat)
1775{
1776 revision_t rev;
1777 read_revision(dat, rev);
1778 put_revision(new_id, rev);
1779}
1780
1781
1782void
1783database::delete_existing_revs_and_certs()
1784{
1785 execute(query("DELETE FROM revisions"));
1786 execute(query("DELETE FROM revision_ancestry"));
1787 execute(query("DELETE FROM revision_certs"));
1788}
1789
1790void
1791database::delete_existing_manifests()
1792{
1793 execute(query("DELETE FROM manifests"));
1794 execute(query("DELETE FROM manifest_deltas"));
1795}
1796
1797/// Deletes one revision from the local database.
1798/// @see kill_rev_locally
1799void
1800database::delete_existing_rev_and_certs(revision_id const & rid)
1801{
1802 transaction_guard guard (*this);
1803
1804 // Check that the revision exists and doesn't have any children.
1805 I(revision_exists(rid));
1806 set<revision_id> children;
1807 get_revision_children(rid, children);
1808 I(!children.size());
1809
1810
1811 L(FL("Killing revision %s locally") % rid);
1812
1813 // Kill the certs, ancestry, and rev itself.
1814 execute(query("DELETE from revision_certs WHERE id = ?")
1815 % text(rid.inner()()));
1816
1817 execute(query("DELETE from revision_ancestry WHERE child = ?")
1818 % text(rid.inner()()));
1819
1820 execute(query("DELETE from revisions WHERE id = ?")
1821 % text(rid.inner()()));
1822
1823 // Find the associated roster and count the number of links to it
1824 roster_id ros_id;
1825 size_t link_count = 0;
1826 get_roster_id_for_revision(rid, ros_id);
1827 {
1828 results res;
1829 fetch(res, 2, any_rows,
1830 query("SELECT rev_id, roster_id FROM revision_roster "
1831 "WHERE roster_id = ?") % text(ros_id.inner()()));
1832 I(res.size() > 0);
1833 link_count = res.size();
1834 }
1835
1836 // Delete our link.
1837 execute(query("DELETE from revision_roster WHERE rev_id = ?")
1838 % text(rid.inner()()));
1839
1840 // If that was the last link to the roster, kill the roster too.
1841 if (link_count == 1)
1842 remove_version(ros_id.inner(), "rosters", "roster_deltas");
1843
1844 guard.commit();
1845}
1846
1847/// Deletes all certs referring to a particular branch.
1848void
1849database::delete_branch_named(cert_value const & branch)
1850{
1851 L(FL("Deleting all references to branch %s") % branch);
1852 execute(query("DELETE FROM revision_certs WHERE name='branch' AND value =?")
1853 % blob(branch()));
1854 execute(query("DELETE FROM branch_epochs WHERE branch=?")
1855 % blob(branch()));
1856}
1857
1858/// Deletes all certs referring to a particular tag.
1859void
1860database::delete_tag_named(cert_value const & tag)
1861{
1862 L(FL("Deleting all references to tag %s") % tag);
1863 execute(query("DELETE FROM revision_certs WHERE name='tag' AND value =?")
1864 % blob(tag()));
1865}
1866
1867// crypto key management
1868
1869void
1870database::get_key_ids(string const & pattern,
1871 vector<rsa_keypair_id> & pubkeys)
1872{
1873 pubkeys.clear();
1874 results res;
1875
1876 if (pattern != "")
1877 fetch(res, one_col, any_rows,
1878 query("SELECT id FROM public_keys WHERE id GLOB ?")
1879 % text(pattern));
1880 else
1881 fetch(res, one_col, any_rows,
1882 query("SELECT id FROM public_keys"));
1883
1884 for (size_t i = 0; i < res.size(); ++i)
1885 pubkeys.push_back(res[i][0]);
1886}
1887
1888void
1889database::get_keys(string const & table, vector<rsa_keypair_id> & keys)
1890{
1891 keys.clear();
1892 results res;
1893 fetch(res, one_col, any_rows, query("SELECT id FROM " + table));
1894 for (size_t i = 0; i < res.size(); ++i)
1895 keys.push_back(res[i][0]);
1896}
1897
1898void
1899database::get_public_keys(vector<rsa_keypair_id> & keys)
1900{
1901 get_keys("public_keys", keys);
1902}
1903
1904bool
1905database::public_key_exists(hexenc<id> const & hash)
1906{
1907 results res;
1908 fetch(res, one_col, any_rows,
1909 query("SELECT id FROM public_keys WHERE hash = ?")
1910 % text(hash()));
1911 I((res.size() == 1) || (res.size() == 0));
1912 if (res.size() == 1)
1913 return true;
1914 return false;
1915}
1916
1917bool
1918database::public_key_exists(rsa_keypair_id const & id)
1919{
1920 results res;
1921 fetch(res, one_col, any_rows,
1922 query("SELECT id FROM public_keys WHERE id = ?")
1923 % text(id()));
1924 I((res.size() == 1) || (res.size() == 0));
1925 if (res.size() == 1)
1926 return true;
1927 return false;
1928}
1929
1930void
1931database::get_pubkey(hexenc<id> const & hash,
1932 rsa_keypair_id & id,
1933 base64<rsa_pub_key> & pub_encoded)
1934{
1935 results res;
1936 fetch(res, 2, one_row,
1937 query("SELECT id, keydata FROM public_keys WHERE hash = ?")
1938 % text(hash()));
1939 id = res[0][0];
1940 encode_base64(rsa_pub_key(res[0][1]), pub_encoded);
1941}
1942
1943void
1944database::get_key(rsa_keypair_id const & pub_id,
1945 base64<rsa_pub_key> & pub_encoded)
1946{
1947 results res;
1948 fetch(res, one_col, one_row,
1949 query("SELECT keydata FROM public_keys WHERE id = ?")
1950 % text(pub_id()));
1951 encode_base64(rsa_pub_key(res[0][0]), pub_encoded);
1952}
1953
1954void
1955database::put_key(rsa_keypair_id const & pub_id,
1956 base64<rsa_pub_key> const & pub_encoded)
1957{
1958 hexenc<id> thash;
1959 key_hash_code(pub_id, pub_encoded, thash);
1960 I(!public_key_exists(thash));
1961 E(!public_key_exists(pub_id),
1962 F("another key with name '%s' already exists") % pub_id);
1963 rsa_pub_key pub_key;
1964 decode_base64(pub_encoded, pub_key);
1965 execute(query("INSERT INTO public_keys VALUES(?, ?, ?)")
1966 % text(thash())
1967 % text(pub_id())
1968 % blob(pub_key()));
1969}
1970
1971void
1972database::delete_public_key(rsa_keypair_id const & pub_id)
1973{
1974 execute(query("DELETE FROM public_keys WHERE id = ?")
1975 % text(pub_id()));
1976}
1977
1978// cert management
1979
1980bool
1981database::cert_exists(cert const & t,
1982 string const & table)
1983{
1984 results res;
1985 cert_value value;
1986 decode_base64(t.value, value);
1987 rsa_sha1_signature sig;
1988 decode_base64(t.sig, sig);
1989 query q = query("SELECT id FROM " + table + " WHERE id = ? "
1990 "AND name = ? "
1991 "AND value = ? "
1992 "AND keypair = ? "
1993 "AND signature = ?")
1994 % text(t.ident())
1995 % text(t.name())
1996 % blob(value())
1997 % text(t.key())
1998 % blob(sig());
1999
2000 fetch(res, 1, any_rows, q);
2001
2002 I(res.size() == 0 || res.size() == 1);
2003 return res.size() == 1;
2004}
2005
2006void
2007database::put_cert(cert const & t,
2008 string const & table)
2009{
2010 hexenc<id> thash;
2011 cert_hash_code(t, thash);
2012 cert_value value;
2013 decode_base64(t.value, value);
2014 rsa_sha1_signature sig;
2015 decode_base64(t.sig, sig);
2016
2017 string insert = "INSERT INTO " + table + " VALUES(?, ?, ?, ?, ?, ?)";
2018
2019 execute(query(insert)
2020 % text(thash())
2021 % text(t.ident())
2022 % text(t.name())
2023 % blob(value())
2024 % text(t.key())
2025 % blob(sig()));
2026}
2027
2028void
2029database::results_to_certs(results const & res,
2030 vector<cert> & certs)
2031{
2032 certs.clear();
2033 for (size_t i = 0; i < res.size(); ++i)
2034 {
2035 cert t;
2036 base64<cert_value> value;
2037 encode_base64(cert_value(res[i][2]), value);
2038 base64<rsa_sha1_signature> sig;
2039 encode_base64(rsa_sha1_signature(res[i][4]), sig);
2040 t = cert(hexenc<id>(res[i][0]),
2041 cert_name(res[i][1]),
2042 value,
2043 rsa_keypair_id(res[i][3]),
2044 sig);
2045 certs.push_back(t);
2046 }
2047}
2048
2049void
2050database::install_functions(app_state * app)
2051{
2052 // register any functions we're going to use
2053 I(sqlite3_create_function(sql(), "gunzip", -1,
2054 SQLITE_UTF8, NULL,
2055 &sqlite3_gunzip_fn,
2056 NULL, NULL) == 0);
2057}
2058
2059void
2060database::get_certs(vector<cert> & certs,
2061 string const & table)
2062{
2063 results res;
2064 query q("SELECT id, name, value, keypair, signature FROM " + table);
2065 fetch(res, 5, any_rows, q);
2066 results_to_certs(res, certs);
2067}
2068
2069
2070void
2071database::get_certs(hexenc<id> const & ident,
2072 vector<cert> & certs,
2073 string const & table)
2074{
2075 results res;
2076 query q("SELECT id, name, value, keypair, signature FROM " + table +
2077 " WHERE id = ?");
2078
2079 fetch(res, 5, any_rows, q % text(ident()));
2080 results_to_certs(res, certs);
2081}
2082
2083
2084void
2085database::get_certs(cert_name const & name,
2086 vector<cert> & certs,
2087 string const & table)
2088{
2089 results res;
2090 query q("SELECT id, name, value, keypair, signature FROM " + table +
2091 " WHERE name = ?");
2092 fetch(res, 5, any_rows, q % text(name()));
2093 results_to_certs(res, certs);
2094}
2095
2096
2097void
2098database::get_certs(hexenc<id> const & ident,
2099 cert_name const & name,
2100 vector<cert> & certs,
2101 string const & table)
2102{
2103 results res;
2104 query q("SELECT id, name, value, keypair, signature FROM " + table +
2105 " WHERE id = ? AND name = ?");
2106
2107 fetch(res, 5, any_rows,
2108 q % text(ident())
2109 % text(name()));
2110 results_to_certs(res, certs);
2111}
2112
2113void
2114database::get_certs(cert_name const & name,
2115 base64<cert_value> const & val,
2116 vector<cert> & certs,
2117 string const & table)
2118{
2119 results res;
2120 query q("SELECT id, name, value, keypair, signature FROM " + table +
2121 " WHERE name = ? AND value = ?");
2122
2123 cert_value binvalue;
2124 decode_base64(val, binvalue);
2125 fetch(res, 5, any_rows,
2126 q % text(name())
2127 % blob(binvalue()));
2128 results_to_certs(res, certs);
2129}
2130
2131
2132void
2133database::get_certs(hexenc<id> const & ident,
2134 cert_name const & name,
2135 base64<cert_value> const & value,
2136 vector<cert> & certs,
2137 string const & table)
2138{
2139 results res;
2140 query q("SELECT id, name, value, keypair, signature FROM " + table +
2141 " WHERE id = ? AND name = ? AND value = ?");
2142
2143 cert_value binvalue;
2144 decode_base64(value, binvalue);
2145 fetch(res, 5, any_rows,
2146 q % text(ident())
2147 % text(name())
2148 % blob(binvalue()));
2149 results_to_certs(res, certs);
2150}
2151
2152
2153
2154bool
2155database::revision_cert_exists(revision<cert> const & cert)
2156{
2157 return cert_exists(cert.inner(), "revision_certs");
2158}
2159
2160void
2161database::put_revision_cert(revision<cert> const & cert)
2162{
2163 put_cert(cert.inner(), "revision_certs");
2164}
2165
2166void database::get_revision_cert_nobranch_index(vector< pair<hexenc<id>,
2167 pair<revision_id, rsa_keypair_id> > > & idx)
2168{
2169 results res;
2170 fetch(res, 3, any_rows,
2171 query("SELECT hash, id, keypair "
2172 "FROM 'revision_certs' WHERE name != 'branch'"));
2173
2174 idx.clear();
2175 idx.reserve(res.size());
2176 for (results::const_iterator i = res.begin(); i != res.end(); ++i)
2177 {
2178 idx.push_back(make_pair(hexenc<id>((*i)[0]),
2179 make_pair(revision_id((*i)[1]),
2180 rsa_keypair_id((*i)[2]))));
2181 }
2182}
2183
2184void
2185database::get_revision_certs(vector< revision<cert> > & ts)
2186{
2187 vector<cert> certs;
2188 get_certs(certs, "revision_certs");
2189 ts.clear();
2190 copy(certs.begin(), certs.end(), back_inserter(ts));
2191}
2192
2193void
2194database::get_revision_certs(cert_name const & name,
2195 vector< revision<cert> > & ts)
2196{
2197 vector<cert> certs;
2198 get_certs(name, certs, "revision_certs");
2199 ts.clear();
2200 copy(certs.begin(), certs.end(), back_inserter(ts));
2201}
2202
2203void
2204database::get_revision_certs(revision_id const & id,
2205 cert_name const & name,
2206 vector< revision<cert> > & ts)
2207{
2208 vector<cert> certs;
2209 get_certs(id.inner(), name, certs, "revision_certs");
2210 ts.clear();
2211 copy(certs.begin(), certs.end(), back_inserter(ts));
2212}
2213
2214void
2215database::get_revision_certs(revision_id const & id,
2216 cert_name const & name,
2217 base64<cert_value> const & val,
2218 vector< revision<cert> > & ts)
2219{
2220 vector<cert> certs;
2221 get_certs(id.inner(), name, val, certs, "revision_certs");
2222 ts.clear();
2223 copy(certs.begin(), certs.end(), back_inserter(ts));
2224}
2225
2226void
2227database::get_revision_certs(cert_name const & name,
2228 base64<cert_value> const & val,
2229 vector< revision<cert> > & ts)
2230{
2231 vector<cert> certs;
2232 get_certs(name, val, certs, "revision_certs");
2233 ts.clear();
2234 copy(certs.begin(), certs.end(), back_inserter(ts));
2235}
2236
2237void
2238database::get_revision_certs(revision_id const & id,
2239 vector< revision<cert> > & ts)
2240{
2241 vector<cert> certs;
2242 get_certs(id.inner(), certs, "revision_certs");
2243 ts.clear();
2244 copy(certs.begin(), certs.end(), back_inserter(ts));
2245}
2246
2247void
2248database::get_revision_certs(revision_id const & ident,
2249 vector< hexenc<id> > & ts)
2250{
2251 results res;
2252 vector<cert> certs;
2253 fetch(res, one_col, any_rows,
2254 query("SELECT hash "
2255 "FROM revision_certs "
2256 "WHERE id = ?")
2257 % text(ident.inner()()));
2258 ts.clear();
2259 for (size_t i = 0; i < res.size(); ++i)
2260 ts.push_back(hexenc<id>(res[i][0]));
2261}
2262
2263void
2264database::get_revision_cert(hexenc<id> const & hash,
2265 revision<cert> & c)
2266{
2267 results res;
2268 vector<cert> certs;
2269 fetch(res, 5, one_row,
2270 query("SELECT id, name, value, keypair, signature "
2271 "FROM revision_certs "
2272 "WHERE hash = ?")
2273 % text(hash()));
2274 results_to_certs(res, certs);
2275 I(certs.size() == 1);
2276 c = revision<cert>(certs[0]);
2277}
2278
2279bool
2280database::revision_cert_exists(hexenc<id> const & hash)
2281{
2282 results res;
2283 vector<cert> certs;
2284 fetch(res, one_col, any_rows,
2285 query("SELECT id "
2286 "FROM revision_certs "
2287 "WHERE hash = ?")
2288 % text(hash()));
2289 I(res.size() == 0 || res.size() == 1);
2290 return (res.size() == 1);
2291}
2292
2293void
2294database::get_manifest_certs(manifest_id const & id,
2295 vector< manifest<cert> > & ts)
2296{
2297 vector<cert> certs;
2298 get_certs(id.inner(), certs, "manifest_certs");
2299 ts.clear();
2300 copy(certs.begin(), certs.end(), back_inserter(ts));
2301}
2302
2303
2304void
2305database::get_manifest_certs(cert_name const & name,
2306 vector< manifest<cert> > & ts)
2307{
2308 vector<cert> certs;
2309 get_certs(name, certs, "manifest_certs");
2310 ts.clear();
2311 copy(certs.begin(), certs.end(), back_inserter(ts));
2312}
2313
2314
2315// completions
2316void
2317database::complete(string const & partial,
2318 set<revision_id> & completions)
2319{
2320 results res;
2321 completions.clear();
2322
2323 string pattern = partial + "*";
2324
2325 fetch(res, 1, any_rows,
2326 query("SELECT id FROM revisions WHERE id GLOB ?")
2327 % text(pattern));
2328
2329 for (size_t i = 0; i < res.size(); ++i)
2330 completions.insert(revision_id(res[i][0]));
2331}
2332
2333
2334void
2335database::complete(string const & partial,
2336 set<file_id> & completions)
2337{
2338 results res;
2339 completions.clear();
2340
2341 string pattern = partial + "*";
2342
2343 fetch(res, 1, any_rows,
2344 query("SELECT id FROM files WHERE id GLOB ?")
2345 % text(pattern));
2346
2347 for (size_t i = 0; i < res.size(); ++i)
2348 completions.insert(file_id(res[i][0]));
2349
2350 res.clear();
2351
2352 fetch(res, 1, any_rows,
2353 query("SELECT id FROM file_deltas WHERE id GLOB ?")
2354 % text(pattern));
2355
2356 for (size_t i = 0; i < res.size(); ++i)
2357 completions.insert(file_id(res[i][0]));
2358}
2359
2360void
2361database::complete(string const & partial,
2362 set< pair<key_id, utf8 > > & completions)
2363{
2364 results res;
2365 completions.clear();
2366
2367 string pattern = partial + "*";
2368
2369 fetch(res, 2, any_rows,
2370 query("SELECT hash, id FROM public_keys WHERE hash GLOB ?")
2371 % text(pattern));
2372
2373 for (size_t i = 0; i < res.size(); ++i)
2374 completions.insert(make_pair(key_id(res[i][0]), utf8(res[i][1])));
2375}
2376
2377using selectors::selector_type;
2378
2379static void selector_to_certname(selector_type ty,
2380 string & s,
2381 string & prefix,
2382 string & suffix)
2383{
2384 prefix = suffix = "*";
2385 switch (ty)
2386 {
2387 case selectors::sel_author:
2388 s = author_cert_name;
2389 break;
2390 case selectors::sel_branch:
2391 prefix = suffix = "";
2392 s = branch_cert_name;
2393 break;
2394 case selectors::sel_head:
2395 prefix = suffix = "";
2396 s = branch_cert_name;
2397 break;
2398 case selectors::sel_date:
2399 case selectors::sel_later:
2400 case selectors::sel_earlier:
2401 s = date_cert_name;
2402 break;
2403 case selectors::sel_tag:
2404 prefix = suffix = "";
2405 s = tag_cert_name;
2406 break;
2407 case selectors::sel_ident:
2408 case selectors::sel_cert:
2409 case selectors::sel_unknown:
2410 I(false); // don't do this.
2411 break;
2412 }
2413}
2414
2415void database::complete(selector_type ty,
2416 string const & partial,
2417 vector<pair<selector_type, string> > const & limit,
2418 set<string> & completions)
2419{
2420 //L(FL("database::complete for partial '%s'") % partial);
2421 completions.clear();
2422
2423 // step 1: the limit is transformed into an SQL select statement which
2424 // selects a set of IDs from the manifest_certs table which match the
2425 // limit. this is done by building an SQL select statement for each term
2426 // in the limit and then INTERSECTing them all.
2427
2428 query lim;
2429 lim.sql_cmd = "(";
2430 if (limit.empty())
2431 {
2432 lim.sql_cmd += "SELECT id FROM revision_certs";
2433 }
2434 else
2435 {
2436 bool first_limit = true;
2437 for (vector<pair<selector_type, string> >::const_iterator i = limit.begin();
2438 i != limit.end(); ++i)
2439 {
2440 if (first_limit)
2441 first_limit = false;
2442 else
2443 lim.sql_cmd += " INTERSECT ";
2444
2445 if (i->first == selectors::sel_ident)
2446 {
2447 lim.sql_cmd += "SELECT id FROM revision_certs WHERE id GLOB ?";
2448 lim % text(i->second + "*");
2449 }
2450 else if (i->first == selectors::sel_cert)
2451 {
2452 if (i->second.length() > 0)
2453 {
2454 size_t spot = i->second.find("=");
2455
2456 if (spot != (size_t)-1)
2457 {
2458 string certname;
2459 string certvalue;
2460
2461 certname = i->second.substr(0, spot);
2462 spot++;
2463 certvalue = i->second.substr(spot);
2464 lim.sql_cmd += "SELECT id FROM revision_certs WHERE name=? AND CAST(value AS TEXT) glob ?";
2465 lim % text(certname) % text(certvalue);
2466 }
2467 else
2468 {
2469 lim.sql_cmd += "SELECT id FROM revision_certs WHERE name=?";
2470 lim % text(i->second);
2471 }
2472
2473 }
2474 }
2475 else if (i->first == selectors::sel_unknown)
2476 {
2477 lim.sql_cmd += "SELECT id FROM revision_certs WHERE (name=? OR name=? OR name=?)";
2478 lim % text(author_cert_name) % text(tag_cert_name) % text(branch_cert_name);
2479 lim.sql_cmd += " AND CAST(value AS TEXT) glob ?";
2480 lim % text(i->second + "*");
2481 }
2482 else if (i->first == selectors::sel_head)
2483 {
2484 // get branch names
2485 vector<cert_value> branch_names;
2486 if (i->second.size() == 0)
2487 {
2488 __app->require_workspace("the empty head selector h: refers to the head of the current branch");
2489 branch_names.push_back((__app->branch_name)());
2490 }
2491 else
2492 {
2493 query subquery("SELECT DISTINCT value FROM revision_certs WHERE name=? AND CAST(value AS TEXT) glob ?");
2494 subquery % text(branch_cert_name) % text(i->second);
2495 results res;
2496 fetch(res, one_col, any_rows, subquery);
2497 for (size_t i = 0; i < res.size(); ++i)
2498 {
2499 data row_decoded(res[i][0]);
2500 branch_names.push_back(row_decoded());
2501 }
2502 }
2503
2504 // for each branch name, get the branch heads
2505 set<revision_id> heads;
2506 for (vector<cert_value>::const_iterator bn = branch_names.begin(); bn != branch_names.end(); bn++)
2507 {
2508 set<revision_id> branch_heads;
2509 get_branch_heads(*bn, *__app, branch_heads);
2510 heads.insert(branch_heads.begin(), branch_heads.end());
2511 L(FL("after get_branch_heads for %s, heads has %d entries") % (*bn) % heads.size());
2512 }
2513
2514 lim.sql_cmd += "SELECT id FROM revision_certs WHERE id IN (";
2515 if (heads.size())
2516 {
2517 set<revision_id>::const_iterator r = heads.begin();
2518 lim.sql_cmd += "?";
2519 lim % text(r->inner()());
2520 r++;
2521 while (r != heads.end())
2522 {
2523 lim.sql_cmd += ", ?";
2524 lim % text(r->inner()());
2525 r++;
2526 }
2527 }
2528 lim.sql_cmd += ") ";
2529 }
2530 else
2531 {
2532 string certname;
2533 string prefix;
2534 string suffix;
2535 selector_to_certname(i->first, certname, prefix, suffix);
2536 L(FL("processing selector type %d with i->second '%s'") % ty % i->second);
2537 if ((i->first == selectors::sel_branch) && (i->second.size() == 0))
2538 {
2539 __app->require_workspace("the empty branch selector b: refers to the current branch");
2540 lim.sql_cmd += "SELECT id FROM revision_certs WHERE name=? AND CAST(value AS TEXT) glob ?";
2541 lim % text(branch_cert_name) % text(__app->branch_name());
2542 L(FL("limiting to current branch '%s'") % __app->branch_name);
2543 }
2544 else
2545 {
2546 lim.sql_cmd += "SELECT id FROM revision_certs WHERE name=? AND ";
2547 lim % text(certname);
2548 switch (i->first)
2549 {
2550 case selectors::sel_earlier:
2551 lim.sql_cmd += "value <= ?";
2552 lim % blob(i->second);
2553 break;
2554 case selectors::sel_later:
2555 lim.sql_cmd += "value > ?";
2556 lim % blob(i->second);
2557 break;
2558 default:
2559 lim.sql_cmd += "CAST(value AS TEXT) glob ?";
2560 lim % text(prefix + i->second + suffix);
2561 break;
2562 }
2563 }
2564 }
2565 //L(FL("found selector type %d, selecting_head is now %d") % i->first % selecting_head);
2566 }
2567 }
2568 lim.sql_cmd += ")";
2569
2570 // step 2: depending on what we've been asked to disambiguate, we
2571 // will complete either some idents, or cert values, or "unknown"
2572 // which generally means "author, tag or branch"
2573
2574 if (ty == selectors::sel_ident)
2575 {
2576 lim.sql_cmd = "SELECT id FROM " + lim.sql_cmd;
2577 }
2578 else
2579 {
2580 string prefix = "*";
2581 string suffix = "*";
2582 lim.sql_cmd = "SELECT value FROM revision_certs WHERE";
2583 if (ty == selectors::sel_unknown)
2584 {
2585 lim.sql_cmd += " (name=? OR name=? OR name=?)";
2586 lim % text(author_cert_name) % text(tag_cert_name) % text(branch_cert_name);
2587 }
2588 else
2589 {
2590 string certname;
2591 selector_to_certname(ty, certname, prefix, suffix);
2592 lim.sql_cmd += " (name=?)";
2593 lim % text(certname);
2594 }
2595
2596 lim.sql_cmd += " AND (CAST(value AS TEXT) GLOB ?) AND (id IN " + lim.sql_cmd + ")";
2597 lim % text(prefix + partial + suffix);
2598 }
2599
2600 results res;
2601 fetch(res, one_col, any_rows, lim);
2602 for (size_t i = 0; i < res.size(); ++i)
2603 {
2604 if (ty == selectors::sel_ident)
2605 completions.insert(res[i][0]);
2606 else
2607 {
2608 data row_decoded(res[i][0]);
2609 completions.insert(row_decoded());
2610 }
2611 }
2612}
2613
2614// epochs
2615
2616void
2617database::get_epochs(map<cert_value, epoch_data> & epochs)
2618{
2619 epochs.clear();
2620 results res;
2621 fetch(res, 2, any_rows, query("SELECT branch, epoch FROM branch_epochs"));
2622 for (results::const_iterator i = res.begin(); i != res.end(); ++i)
2623 {
2624 cert_value decoded(idx(*i, 0));
2625 I(epochs.find(decoded) == epochs.end());
2626 epochs.insert(make_pair(decoded, epoch_data(idx(*i, 1))));
2627 }
2628}
2629
2630void
2631database::get_epoch(epoch_id const & eid,
2632 cert_value & branch, epoch_data & epo)
2633{
2634 I(epoch_exists(eid));
2635 results res;
2636 fetch(res, 2, any_rows,
2637 query("SELECT branch, epoch FROM branch_epochs"
2638 " WHERE hash = ?")
2639 % text(eid.inner()()));
2640 I(res.size() == 1);
2641 branch = cert_value(idx(idx(res, 0), 0));
2642 epo = epoch_data(idx(idx(res, 0), 1));
2643}
2644
2645bool
2646database::epoch_exists(epoch_id const & eid)
2647{
2648 results res;
2649 fetch(res, one_col, any_rows,
2650 query("SELECT hash FROM branch_epochs WHERE hash = ?")
2651 % text(eid.inner()()));
2652 I(res.size() == 1 || res.size() == 0);
2653 return res.size() == 1;
2654}
2655
2656void
2657database::set_epoch(cert_value const & branch, epoch_data const & epo)
2658{
2659 epoch_id eid;
2660 epoch_hash_code(branch, epo, eid);
2661 I(epo.inner()().size() == constants::epochlen);
2662 execute(query("INSERT OR REPLACE INTO branch_epochs VALUES(?, ?, ?)")
2663 % text(eid.inner()())
2664 % blob(branch())
2665 % text(epo.inner()()));
2666}
2667
2668void
2669database::clear_epoch(cert_value const & branch)
2670{
2671 execute(query("DELETE FROM branch_epochs WHERE branch = ?")
2672 % blob(branch()));
2673}
2674
2675bool
2676database::check_integrity()
2677{
2678 results res;
2679 fetch(res, one_col, any_rows, query("PRAGMA integrity_check"));
2680 I(res.size() == 1);
2681 I(res[0].size() == 1);
2682
2683 return res[0][0] == "ok";
2684}
2685
2686// vars
2687
2688void
2689database::get_vars(map<var_key, var_value> & vars)
2690{
2691 vars.clear();
2692 results res;
2693 fetch(res, 3, any_rows, query("SELECT domain, name, value FROM db_vars"));
2694 for (results::const_iterator i = res.begin(); i != res.end(); ++i)
2695 {
2696 var_domain domain(idx(*i, 0));
2697 var_name name(idx(*i, 1));
2698 var_value value(idx(*i, 2));
2699 I(vars.find(make_pair(domain, name)) == vars.end());
2700 vars.insert(make_pair(make_pair(domain, name), value));
2701 }
2702}
2703
2704void
2705database::get_var(var_key const & key, var_value & value)
2706{
2707 // FIXME: sillyly inefficient. Doesn't really matter, though.
2708 map<var_key, var_value> vars;
2709 get_vars(vars);
2710 map<var_key, var_value>::const_iterator i = vars.find(key);
2711 I(i != vars.end());
2712 value = i->second;
2713}
2714
2715bool
2716database::var_exists(var_key const & key)
2717{
2718 // FIXME: sillyly inefficient. Doesn't really matter, though.
2719 map<var_key, var_value> vars;
2720 get_vars(vars);
2721 map<var_key, var_value>::const_iterator i = vars.find(key);
2722 return i != vars.end();
2723}
2724
2725void
2726database::set_var(var_key const & key, var_value const & value)
2727{
2728 execute(query("INSERT OR REPLACE INTO db_vars VALUES(?, ?, ?)")
2729 % text(key.first())
2730 % blob(key.second())
2731 % blob(value()));
2732}
2733
2734void
2735database::clear_var(var_key const & key)
2736{
2737 execute(query("DELETE FROM db_vars WHERE domain = ? AND name = ?")
2738 % text(key.first())
2739 % blob(key.second()));
2740}
2741
2742// branches
2743
2744void
2745database::get_branches(vector<string> & names)
2746{
2747 results res;
2748 query q("SELECT DISTINCT value FROM revision_certs WHERE name = ?");
2749 string cert_name = "branch";
2750 fetch(res, one_col, any_rows, q % text(cert_name));
2751 for (size_t i = 0; i < res.size(); ++i)
2752 {
2753 names.push_back(res[i][0]);
2754 }
2755}
2756
2757void
2758database::get_roster_id_for_revision(revision_id const & rev_id,
2759 roster_id & ros_id)
2760{
2761 if (rev_id.inner()().empty())
2762 {
2763 ros_id = roster_id();
2764 return;
2765 }
2766
2767 results res;
2768 query q("SELECT roster_id FROM revision_roster WHERE rev_id = ? ");
2769 fetch(res, one_col, any_rows, q % text(rev_id.inner()()));
2770 I(res.size() == 1);
2771 ros_id = roster_id(res[0][0]);
2772}
2773
2774void
2775database::get_roster(revision_id const & rev_id,
2776 roster_t & roster)
2777{
2778 marking_map mm;
2779 get_roster(rev_id, roster, mm);
2780}
2781
2782static LRUCache<revision_id,
2783 shared_ptr<pair<roster_t, marking_map> > >
2784rcache(constants::db_roster_cache_sz);
2785
2786void
2787database::get_roster(revision_id const & rev_id,
2788 roster_t & roster,
2789 marking_map & marks)
2790{
2791 if (rev_id.inner()().empty())
2792 {
2793 roster = roster_t();
2794 marks = marking_map();
2795 return;
2796 }
2797
2798 shared_ptr<pair<roster_t, marking_map> > sp;
2799 if (rcache.fetch(rev_id, sp))
2800 {
2801 roster = sp->first;
2802 marks = sp->second;
2803 return;
2804 }
2805
2806 roster_data dat;
2807 roster_id ident;
2808
2809 get_roster_id_for_revision(rev_id, ident);
2810 get_roster_version(ident, dat);
2811 read_roster_and_marking(dat, roster, marks);
2812 sp = shared_ptr<pair<roster_t, marking_map> >
2813 (new pair<roster_t, marking_map>(roster, marks));
2814 rcache.insert(rev_id, sp);
2815}
2816
2817
2818void
2819database::put_roster(revision_id const & rev_id,
2820 roster_t & roster,
2821 marking_map & marks)
2822{
2823 MM(rev_id);
2824 roster_data old_data, new_data;
2825 delta reverse_delta;
2826 roster_id old_id, new_id;
2827
2828 if (!rcache.exists(rev_id))
2829 {
2830 shared_ptr<pair<roster_t, marking_map> > sp
2831 (new pair<roster_t, marking_map>(roster, marks));
2832 rcache.insert(rev_id, sp);
2833 }
2834
2835 write_roster_and_marking(roster, marks, new_data);
2836 calculate_ident(new_data, new_id);
2837
2838 // First: find the "old" revision; if there are multiple old
2839 // revisions, we just pick the first. It probably doesn't matter for
2840 // the sake of delta-encoding.
2841
2842 string data_table = "rosters";
2843 string delta_table = "roster_deltas";
2844
2845 transaction_guard guard(*this);
2846
2847 execute(query("INSERT into revision_roster VALUES (?, ?)")
2848 % text(rev_id.inner()())
2849 % text(new_id.inner()()));
2850
2851 if (exists(new_id.inner(), data_table)
2852 || delta_exists(new_id.inner(), delta_table))
2853 {
2854 guard.commit();
2855 return;
2856 }
2857
2858 // Else we have a new roster the database hasn't seen yet; our task is to
2859 // add it, and deltify all the incoming edges (if they aren't already).
2860
2861 schedule_write(data_table, new_id.inner(), new_data.inner());
2862
2863 set<revision_id> parents;
2864 get_revision_parents(rev_id, parents);
2865
2866 // Now do what deltify would do if we bothered (we have the
2867 // roster written now, so might as well do it here).
2868 for (set<revision_id>::const_iterator i = parents.begin();
2869 i != parents.end(); ++i)
2870 {
2871 if (null_id(*i))
2872 continue;
2873 revision_id old_rev = *i;
2874 get_roster_id_for_revision(old_rev, old_id);
2875 if (exists(new_id.inner(), data_table))
2876 {
2877 get_roster_version(old_id, old_data);
2878 diff(new_data.inner(), old_data.inner(), reverse_delta);
2879 if (have_pending_write(data_table, old_id.inner()))
2880 cancel_pending_write(data_table, old_id.inner());
2881 else
2882 drop(old_id.inner(), data_table);
2883 put_delta(old_id.inner(), new_id.inner(), reverse_delta, delta_table);
2884 }
2885 }
2886 guard.commit();
2887}
2888
2889
2890typedef hashmap::hash_multimap<string, string> ancestry_map;
2891
2892static void
2893transitive_closure(string const & x,
2894 ancestry_map const & m,
2895 set<revision_id> & results)
2896{
2897 results.clear();
2898
2899 deque<string> work;
2900 work.push_back(x);
2901 while (!work.empty())
2902 {
2903 string c = work.front();
2904 work.pop_front();
2905 revision_id curr(c);
2906 if (results.find(curr) == results.end())
2907 {
2908 results.insert(curr);
2909 pair<ancestry_map::const_iterator, ancestry_map::const_iterator> range;
2910 range = m.equal_range(c);
2911 for (ancestry_map::const_iterator i = range.first; i != range.second; ++i)
2912 {
2913 if (i->first == c)
2914 work.push_back(i->second);
2915 }
2916 }
2917 }
2918}
2919
2920void
2921database::get_uncommon_ancestors(revision_id const & a,
2922 revision_id const & b,
2923 set<revision_id> & a_uncommon_ancs,
2924 set<revision_id> & b_uncommon_ancs)
2925{
2926 // FIXME: This is a somewhat ugly, and possibly unaccepably slow way
2927 // to do it. Another approach involves maintaining frontier sets for
2928 // each and slowly deepening them into history; would need to
2929 // benchmark to know which is actually faster on real datasets.
2930
2931 a_uncommon_ancs.clear();
2932 b_uncommon_ancs.clear();
2933
2934 results res;
2935 a_uncommon_ancs.clear();
2936 b_uncommon_ancs.clear();
2937
2938 fetch(res, 2, any_rows,
2939 query("SELECT parent,child FROM revision_ancestry"));
2940
2941 set<revision_id> a_ancs, b_ancs;
2942
2943 ancestry_map child_to_parent_map;
2944 for (size_t i = 0; i < res.size(); ++i)
2945 child_to_parent_map.insert(make_pair(res[i][1], res[i][0]));
2946
2947 transitive_closure(a.inner()(), child_to_parent_map, a_ancs);
2948 transitive_closure(b.inner()(), child_to_parent_map, b_ancs);
2949
2950 set_difference(a_ancs.begin(), a_ancs.end(),
2951 b_ancs.begin(), b_ancs.end(),
2952 inserter(a_uncommon_ancs, a_uncommon_ancs.begin()));
2953
2954 set_difference(b_ancs.begin(), b_ancs.end(),
2955 a_ancs.begin(), a_ancs.end(),
2956 inserter(b_uncommon_ancs, b_uncommon_ancs.begin()));
2957}
2958
2959node_id
2960database::next_node_id()
2961{
2962 transaction_guard guard(*this);
2963 results res;
2964
2965 // We implement this as a fixed db var.
2966
2967 fetch(res, one_col, any_rows,
2968 query("SELECT node FROM next_roster_node_number"));
2969
2970 node_id n;
2971 if (res.empty())
2972 {
2973 n = 1;
2974 execute (query("INSERT INTO next_roster_node_number VALUES(?)")
2975 % text(lexical_cast<string>(n)));
2976 }
2977 else
2978 {
2979 I(res.size() == 1);
2980 n = lexical_cast<node_id>(res[0][0]);
2981 ++n;
2982 execute (query("UPDATE next_roster_node_number SET node = ?")
2983 % text(lexical_cast<string>(n)));
2984
2985 }
2986 guard.commit();
2987 return n;
2988}
2989
2990
2991void
2992database::check_filename()
2993{
2994 N(!filename.empty(), F("no database specified"));
2995}
2996
2997
2998void
2999database::check_db_exists()
3000{
3001 require_path_is_file(filename,
3002 F("database %s does not exist") % filename,
3003 F("%s is a directory, not a database") % filename);
3004}
3005
3006bool
3007database::database_specified()
3008{
3009 return !filename.empty();
3010}
3011
3012
3013void
3014database::open()
3015{
3016 int error;
3017
3018 I(!__sql);
3019
3020 error = sqlite3_open(filename.as_external().c_str(), &__sql);
3021
3022 if (__sql)
3023 {
3024 I(sql_contexts.find(__sql) == sql_contexts.end());
3025 sql_contexts.insert(__sql);
3026 }
3027
3028 N(!error, (F("could not open database '%s': %s")
3029 % filename % string(sqlite3_errmsg(__sql))));
3030}
3031
3032void
3033database::close()
3034{
3035 if (__sql)
3036 {
3037 sqlite3_close(__sql);
3038 I(sql_contexts.find(__sql) != sql_contexts.end());
3039 sql_contexts.erase(__sql);
3040 __sql = 0;
3041 }
3042}
3043
3044
3045// transaction guards
3046
3047transaction_guard::transaction_guard(database & d, bool exclusive,
3048 size_t checkpoint_batch_size,
3049 size_t checkpoint_batch_bytes)
3050 : committed(false), db(d), exclusive(exclusive),
3051 checkpoint_batch_size(checkpoint_batch_size),
3052 checkpoint_batch_bytes(checkpoint_batch_bytes),
3053 checkpointed_calls(0),
3054 checkpointed_bytes(0)
3055{
3056 db.begin_transaction(exclusive);
3057 if (checkpoint_batch_bytes > constants::db_max_pending_writes_bytes) {
3058 W(F("checkpoint_batch_bytes (%d) > constants::db_max_pending_writes_bytes (%lu), so will not be effective.\n") % checkpoint_batch_bytes % constants::db_max_pending_writes_bytes);
3059 }
3060}
3061
3062transaction_guard::~transaction_guard()
3063{
3064 if (committed)
3065 db.commit_transaction();
3066 else
3067 db.rollback_transaction();
3068}
3069
3070void
3071transaction_guard::do_checkpoint()
3072{
3073 db.commit_transaction();
3074 db.begin_transaction(exclusive);
3075 checkpointed_calls = 0;
3076 checkpointed_bytes = 0;
3077}
3078
3079void
3080transaction_guard::maybe_checkpoint(size_t nbytes)
3081{
3082 checkpointed_calls += 1;
3083 checkpointed_bytes += nbytes;
3084 if (checkpointed_calls >= checkpoint_batch_size
3085 || checkpointed_bytes >= checkpoint_batch_bytes)
3086 do_checkpoint();
3087}
3088
3089void
3090transaction_guard::commit()
3091{
3092 committed = true;
3093}
3094
3095
3096
3097// called to avoid foo.db-journal files hanging around if we exit cleanly
3098// without unwinding the stack (happens with SIGINT & SIGTERM)
3099void
3100close_all_databases()
3101{
3102 L(FL("attempting to rollback and close %d databases") % sql_contexts.size());
3103 for (set<sqlite3*>::iterator i = sql_contexts.begin();
3104 i != sql_contexts.end(); i++)
3105 {
3106 // the ROLLBACK is required here, even though the sqlite docs
3107 // imply that transactions are rolled back on database closure
3108 int exec_err = sqlite3_exec(*i, "ROLLBACK", NULL, NULL, NULL);
3109 int close_err = sqlite3_close(*i);
3110
3111 L(FL("exec_err = %d, close_err = %d") % exec_err % close_err);
3112 }
3113 sql_contexts.clear();
3114}
3115
3116// Local Variables:
3117// mode: C++
3118// fill-column: 76
3119// c-file-style: "gnu"
3120// indent-tabs-mode: nil
3121// End:
3122// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status