monotone

monotone Mtn Source Tree

Root/packet.cc

1// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <iostream>
7#include <string>
8
9#include <boost/optional.hpp>
10#include <boost/regex.hpp>
11#include <boost/lexical_cast.hpp>
12#include <boost/shared_ptr.hpp>
13
14#include "app_state.hh"
15#include "change_set.hh"
16#include "packet.hh"
17#include "revision.hh"
18#include "sanity.hh"
19#include "transforms.hh"
20
21using namespace boost;
22using namespace std;
23
24// --- packet db writer --
25//
26// FIXME: this comment is out of date, and untrustworthy.
27//
28// the packet_db_writer::impl class (see below) manages writes to the
29// database. it also ensures that those writes follow the semantic
30// dependencies implied by the objects being written.
31//
32// an incoming manifest delta has three states:
33//
34// when it is first received, it is (probably) "non-constructable".
35// this means that we do not have a way of building its preimage from either
36// the database or from the memory cache of deltas we keep in this class
37//
38// a non-constructable manifest delta is given a prerequisite of
39// constructibility on its preimage.
40//
41// when the preimage becomes constructable, the manifest delta (probably)
42// changes to "non-writable" state. this means that we have a way to build
43// the manifest, we haven't received all the files which depend on it yet,
44// so we won't write it to the database.
45//
46// when a manifest becomes constructable (but not necessarily writable) we
47// call an analyzer back, if we have one, with the pre- and post-states of
48// the delta. this happens in order to give the netsync layer a chance to
49// request all the file deltas which accompany the manifest delta.
50//
51// a non-writable manifest delta is given prerequisites on all its
52// non-existing underlying files, and delayed again.
53//
54// when all the files arrive, a non-writable manifest is written to the
55// database.
56//
57// files are delayed to depend on their preimage, like non-constructable
58// manifests. however, once they are constructable they are immediately
59// written to the database.
60//
61/////////////////////////////////////////////////////////////////
62//
63// how it's done:
64//
65// each manifest or file has a companion class called a "prerequisite". a
66// prerequisite has a set of delayed packets which depend on it. these
67// delayed packets are also called dependents. a prerequisite can either be
68// "unsatisfied" or "satisfied". when it is first constructed, it is
69// unsatisfied. when it is satisfied, it calls all its dependents to inform
70// them that it has become satisfied.
71//
72// when all the prerequisites of a given dependent is satisfied, the
73// dependent writes itself back to the db writer. the dependent is then
74// dead, and the prerequisite will forget about it.
75//
76// dependents' lifetimes are managed by prerequisites. when all
77// prerequisites forget about their dependents, the dependent is destroyed
78// (it is reference counted with a shared pointer). similarly, the
79// packet_db_writer::impl holds references to prerequisites, and when
80// a prerequisite no longer has any dependents, it is dropped from the
81// packet_db_writer::impl, destroying it.
82//
83/////////////////////////////////////////////////////////////////
84//
85// this same machinery is also re-used for the "valved" packet writer, as a
86// convenient way to queue up commands in memory while the valve is closed.
87// in this usage, we simply never add any prerequisites to any packet, and
88// just call apply_delayed_packet when the valve opens.
89
90typedef enum
91 {
92 prereq_revision,
93 prereq_manifest,
94 prereq_file
95 }
96prereq_type;
97
98class delayed_packet;
99
100class
101prerequisite
102{
103 hexenc<id> ident;
104 prereq_type type;
105 set< shared_ptr<delayed_packet> > delayed;
106public:
107 prerequisite(hexenc<id> const & i, prereq_type pt)
108 : ident(i), type(pt)
109 {}
110 void add_dependent(shared_ptr<delayed_packet> p);
111 bool has_live_dependents();
112 void satisfy(shared_ptr<prerequisite> self,
113 packet_db_writer & pw);
114 bool operator<(prerequisite const & other)
115 {
116 return type < other.type ||
117 (type == other.type && ident < other.ident);
118 }
119};
120
121class
122delayed_packet
123{
124 set< shared_ptr<prerequisite> > unsatisfied_prereqs;
125 set< shared_ptr<prerequisite> > satisfied_prereqs;
126public:
127 void add_prerequisite(shared_ptr<prerequisite> p);
128 bool all_prerequisites_satisfied();
129 void prerequisite_satisfied(shared_ptr<prerequisite> p,
130 packet_db_writer & pw);
131 virtual void apply_delayed_packet(packet_db_writer & pw) = 0;
132 virtual ~delayed_packet() {}
133};
134
135void
136prerequisite::add_dependent(shared_ptr<delayed_packet> d)
137{
138 delayed.insert(d);
139}
140
141void
142prerequisite::satisfy(shared_ptr<prerequisite> self,
143 packet_db_writer & pw)
144{
145 set< shared_ptr<delayed_packet> > dead;
146 for (set< shared_ptr<delayed_packet> >::const_iterator i = delayed.begin();
147 i != delayed.end(); ++i)
148 {
149 (*i)->prerequisite_satisfied(self, pw);
150 if ((*i)->all_prerequisites_satisfied())
151 dead.insert(*i);
152 }
153 for (set< shared_ptr<delayed_packet> >::const_iterator i = dead.begin();
154 i != dead.end(); ++i)
155 {
156 delayed.erase(*i);
157 }
158}
159
160void
161delayed_packet::add_prerequisite(shared_ptr<prerequisite> p)
162{
163 unsatisfied_prereqs.insert(p);
164}
165
166bool
167delayed_packet::all_prerequisites_satisfied()
168{
169 return unsatisfied_prereqs.empty();
170}
171
172void
173delayed_packet::prerequisite_satisfied(shared_ptr<prerequisite> p,
174 packet_db_writer & pw)
175{
176 I(unsatisfied_prereqs.find(p) != unsatisfied_prereqs.end());
177 unsatisfied_prereqs.erase(p);
178 satisfied_prereqs.insert(p);
179 if (all_prerequisites_satisfied())
180 {
181 apply_delayed_packet(pw);
182 }
183}
184
185
186// concrete delayed packets
187
188class
189delayed_revision_data_packet
190 : public delayed_packet
191{
192 revision_id ident;
193 revision_data dat;
194public:
195 delayed_revision_data_packet(revision_id const & i,
196 revision_data const & md)
197 : ident(i), dat(md)
198 {}
199 virtual void apply_delayed_packet(packet_db_writer & pw);
200 virtual ~delayed_revision_data_packet();
201};
202
203class
204delayed_manifest_data_packet
205 : public delayed_packet
206{
207 manifest_id ident;
208 manifest_data dat;
209public:
210 delayed_manifest_data_packet(manifest_id const & i,
211 manifest_data const & md)
212 : ident(i), dat(md)
213 {}
214 virtual void apply_delayed_packet(packet_db_writer & pw);
215 virtual ~delayed_manifest_data_packet();
216};
217
218class
219delayed_file_data_packet
220 : public delayed_packet
221{
222 file_id ident;
223 file_data dat;
224public:
225 delayed_file_data_packet(file_id const & i,
226 file_data const & fd)
227 : ident(i), dat(fd)
228 {}
229 virtual void apply_delayed_packet(packet_db_writer & pw);
230 virtual ~delayed_file_data_packet();
231};
232
233class
234delayed_file_delta_packet
235 : public delayed_packet
236{
237 file_id old_id;
238 file_id new_id;
239 file_delta del;
240 bool forward_delta;
241public:
242 delayed_file_delta_packet(file_id const & oi,
243 file_id const & ni,
244 file_delta const & md,
245 bool fwd)
246 : old_id(oi), new_id(ni), del(md), forward_delta(fwd)
247 {}
248 virtual void apply_delayed_packet(packet_db_writer & pw);
249 virtual ~delayed_file_delta_packet();
250};
251
252class
253delayed_manifest_delta_packet
254 : public delayed_packet
255{
256 manifest_id old_id;
257 manifest_id new_id;
258 manifest_delta del;
259 bool forward_delta;
260public:
261 delayed_manifest_delta_packet(manifest_id const & oi,
262 manifest_id const & ni,
263 manifest_delta const & md,
264 bool fwd)
265 : old_id(oi), new_id(ni), del(md), forward_delta(fwd)
266 {}
267 virtual void apply_delayed_packet(packet_db_writer & pw);
268 virtual ~delayed_manifest_delta_packet();
269};
270
271class
272delayed_revision_cert_packet
273 : public delayed_packet
274{
275 revision<cert> c;
276public:
277 delayed_revision_cert_packet(revision<cert> const & c)
278 : c(c)
279 {}
280 virtual void apply_delayed_packet(packet_db_writer & pw);
281 virtual ~delayed_revision_cert_packet();
282};
283
284class
285delayed_public_key_packet
286 : public delayed_packet
287{
288 rsa_keypair_id id;
289 base64<rsa_pub_key> key;
290public:
291 delayed_public_key_packet(rsa_keypair_id const & id,
292 base64<rsa_pub_key> key)
293 : id(id), key(key)
294 {}
295 virtual void apply_delayed_packet(packet_db_writer & pw);
296 virtual ~delayed_public_key_packet();
297};
298
299class
300delayed_private_key_packet
301 : public delayed_packet
302{
303 rsa_keypair_id id;
304 base64< arc4<rsa_priv_key> > key;
305public:
306 delayed_private_key_packet(rsa_keypair_id const & id,
307 base64< arc4<rsa_priv_key> > key)
308 : id(id), key(key)
309 {}
310 virtual void apply_delayed_packet(packet_db_writer & pw);
311 virtual ~delayed_private_key_packet();
312};
313
314void
315delayed_revision_data_packet::apply_delayed_packet(packet_db_writer & pw)
316{
317 L(F("writing delayed revision data packet for %s\n") % ident);
318 pw.consume_revision_data(ident, dat);
319}
320
321delayed_revision_data_packet::~delayed_revision_data_packet()
322{
323 if (!all_prerequisites_satisfied())
324 W(F("discarding revision data packet with unmet dependencies\n"));
325}
326
327void
328delayed_manifest_data_packet::apply_delayed_packet(packet_db_writer & pw)
329{
330 L(F("writing delayed manifest data packet for %s\n") % ident);
331 pw.consume_manifest_data(ident, dat);
332}
333
334delayed_manifest_data_packet::~delayed_manifest_data_packet()
335{
336 if (!all_prerequisites_satisfied())
337 W(F("discarding manifest data packet with unmet dependencies\n"));
338}
339
340void
341delayed_file_data_packet::apply_delayed_packet(packet_db_writer & pw)
342{
343 L(F("writing delayed file data packet for %s\n") % ident);
344 pw.consume_file_data(ident, dat);
345}
346
347delayed_file_data_packet::~delayed_file_data_packet()
348{
349 // files have no prerequisites
350 I(all_prerequisites_satisfied());
351}
352
353void
354delayed_manifest_delta_packet::apply_delayed_packet(packet_db_writer & pw)
355{
356 L(F("writing delayed manifest %s packet for %s -> %s\n")
357 % (forward_delta ? "delta" : "reverse delta")
358 % (forward_delta ? old_id : new_id)
359 % (forward_delta ? new_id : old_id));
360 if (forward_delta)
361 pw.consume_manifest_delta(old_id, new_id, del);
362 else
363 pw.consume_manifest_reverse_delta(new_id, old_id, del);
364}
365
366delayed_manifest_delta_packet::~delayed_manifest_delta_packet()
367{
368 if (!all_prerequisites_satisfied())
369 W(F("discarding manifest delta packet with unmet dependencies\n"));
370}
371
372void
373delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw)
374{
375 L(F("writing delayed file %s packet for %s -> %s\n")
376 % (forward_delta ? "delta" : "reverse delta")
377 % (forward_delta ? old_id : new_id)
378 % (forward_delta ? new_id : old_id));
379 if (forward_delta)
380 pw.consume_file_delta(old_id, new_id, del);
381 else
382 pw.consume_file_reverse_delta(new_id, old_id, del);
383}
384
385delayed_file_delta_packet::~delayed_file_delta_packet()
386{
387 if (!all_prerequisites_satisfied())
388 W(F("discarding file delta packet with unmet dependencies\n"));
389}
390
391void
392delayed_revision_cert_packet::apply_delayed_packet(packet_db_writer & pw)
393{
394 L(F("writing delayed revision cert on %s\n") % c.inner().ident);
395 pw.consume_revision_cert(c);
396}
397
398delayed_revision_cert_packet::~delayed_revision_cert_packet()
399{
400 if (!all_prerequisites_satisfied())
401 W(F("discarding revision cert packet %s with unmet dependencies\n")
402 % c.inner().ident);
403}
404
405void
406delayed_public_key_packet::apply_delayed_packet(packet_db_writer & pw)
407{
408 L(F("writing delayed public key %s\n") % id());
409 pw.consume_public_key(id, key);
410}
411
412delayed_public_key_packet::~delayed_public_key_packet()
413{
414 // keys don't have dependencies
415 I(all_prerequisites_satisfied());
416}
417
418void
419delayed_private_key_packet::apply_delayed_packet(packet_db_writer & pw)
420{
421 L(F("writing delayed private key %s\n") % id());
422 pw.consume_private_key(id, key);
423}
424
425delayed_private_key_packet::~delayed_private_key_packet()
426{
427 // keys don't have dependencies
428 I(all_prerequisites_satisfied());
429}
430
431struct packet_db_writer::impl
432{
433 app_state & app;
434 bool take_keys;
435 size_t count;
436
437 map<revision_id, shared_ptr<prerequisite> > revision_prereqs;
438 map<manifest_id, shared_ptr<prerequisite> > manifest_prereqs;
439 map<file_id, shared_ptr<prerequisite> > file_prereqs;
440
441 // ticker cert;
442 // ticker manc;
443 // ticker manw;
444 // ticker filec;
445
446 bool revision_exists_in_db(revision_id const & r);
447 bool manifest_version_exists_in_db(manifest_id const & m);
448 bool file_version_exists_in_db(file_id const & f);
449
450 void get_revision_prereq(revision_id const & revision, shared_ptr<prerequisite> & p);
451 void get_manifest_prereq(manifest_id const & manifest, shared_ptr<prerequisite> & p);
452 void get_file_prereq(file_id const & file, shared_ptr<prerequisite> & p);
453
454 void accepted_revision(revision_id const & r, packet_db_writer & dbw);
455 void accepted_manifest(manifest_id const & m, packet_db_writer & dbw);
456 void accepted_file(file_id const & f, packet_db_writer & dbw);
457
458 impl(app_state & app, bool take_keys)
459 : app(app), take_keys(take_keys), count(0)
460 // cert("cert", 1), manc("manc", 1), manw("manw", 1), filec("filec", 1)
461 {}
462};
463
464packet_db_writer::packet_db_writer(app_state & app, bool take_keys)
465 : pimpl(new impl(app, take_keys))
466{}
467
468packet_db_writer::~packet_db_writer()
469{}
470
471
472bool
473packet_db_writer::impl::revision_exists_in_db(revision_id const & r)
474{
475 return app.db.revision_exists(r);
476}
477
478bool
479packet_db_writer::impl::manifest_version_exists_in_db(manifest_id const & m)
480{
481 return app.db.manifest_version_exists(m);
482}
483
484bool
485packet_db_writer::impl::file_version_exists_in_db(file_id const & f)
486{
487 return app.db.file_version_exists(f);
488}
489
490void
491packet_db_writer::impl::get_file_prereq(file_id const & file,
492 shared_ptr<prerequisite> & p)
493{
494 map<file_id, shared_ptr<prerequisite> >::const_iterator i;
495 i = file_prereqs.find(file);
496 if (i != file_prereqs.end())
497 p = i->second;
498 else
499 {
500 p = shared_ptr<prerequisite>(new prerequisite(file.inner(), prereq_file));
501 file_prereqs.insert(make_pair(file, p));
502 }
503}
504
505void
506packet_db_writer::impl::get_manifest_prereq(manifest_id const & man,
507 shared_ptr<prerequisite> & p)
508{
509 map<manifest_id, shared_ptr<prerequisite> >::const_iterator i;
510 i = manifest_prereqs.find(man);
511 if (i != manifest_prereqs.end())
512 p = i->second;
513 else
514 {
515 p = shared_ptr<prerequisite>(new prerequisite(man.inner(), prereq_manifest));
516 manifest_prereqs.insert(make_pair(man, p));
517 }
518}
519
520void
521packet_db_writer::impl::get_revision_prereq(revision_id const & rev,
522 shared_ptr<prerequisite> & p)
523{
524 map<revision_id, shared_ptr<prerequisite> >::const_iterator i;
525 i = revision_prereqs.find(rev);
526 if (i != revision_prereqs.end())
527 p = i->second;
528 else
529 {
530 p = shared_ptr<prerequisite>(new prerequisite(rev.inner(), prereq_revision));
531 revision_prereqs.insert(make_pair(rev, p));
532 }
533}
534
535
536void
537packet_db_writer::impl::accepted_revision(revision_id const & r, packet_db_writer & dbw)
538{
539 L(F("noting acceptence of revision %s\n") % r);
540 map<revision_id, shared_ptr<prerequisite> >::iterator i = revision_prereqs.find(r);
541 if (i != revision_prereqs.end())
542 {
543 shared_ptr<prerequisite> prereq = i->second;
544 revision_prereqs.erase(i);
545 prereq->satisfy(prereq, dbw);
546 }
547}
548
549void
550packet_db_writer::impl::accepted_manifest(manifest_id const & m, packet_db_writer & dbw)
551{
552 L(F("noting acceptence of manifest %s\n") % m);
553 map<manifest_id, shared_ptr<prerequisite> >::iterator i = manifest_prereqs.find(m);
554 if (i != manifest_prereqs.end())
555 {
556 shared_ptr<prerequisite> prereq = i->second;
557 manifest_prereqs.erase(i);
558 prereq->satisfy(prereq, dbw);
559 }
560}
561
562void
563packet_db_writer::impl::accepted_file(file_id const & f, packet_db_writer & dbw)
564{
565 L(F("noting acceptence of file %s\n") % f);
566 map<file_id, shared_ptr<prerequisite> >::iterator i = file_prereqs.find(f);
567 if (i != file_prereqs.end())
568 {
569 shared_ptr<prerequisite> prereq = i->second;
570 file_prereqs.erase(i);
571 prereq->satisfy(prereq, dbw);
572 }
573}
574
575
576void
577packet_db_writer::consume_file_data(file_id const & ident,
578 file_data const & dat)
579{
580 transaction_guard guard(pimpl->app.db);
581 if (! pimpl->file_version_exists_in_db(ident))
582 {
583 pimpl->app.db.put_file(ident, dat);
584 pimpl->accepted_file(ident, *this);
585 }
586 else
587 L(F("skipping existing file version %s\n") % ident);
588 ++(pimpl->count);
589 guard.commit();
590}
591
592void
593packet_db_writer::consume_file_delta(file_id const & old_id,
594 file_id const & new_id,
595 file_delta const & del)
596{
597 transaction_guard guard(pimpl->app.db);
598 if (! pimpl->file_version_exists_in_db(new_id))
599 {
600 if (pimpl->file_version_exists_in_db(old_id))
601 {
602 file_id confirm;
603 file_data old_dat;
604 base64< gzip<data> > new_dat;
605 pimpl->app.db.get_file_version(old_id, old_dat);
606 patch(old_dat.inner(), del.inner(), new_dat);
607 calculate_ident(file_data(new_dat), confirm);
608 if (confirm == new_id)
609 {
610 pimpl->app.db.put_file_version(old_id, new_id, del);
611 pimpl->accepted_file(new_id, *this);
612 }
613 else
614 {
615 W(F("reconstructed file from delta '%s' -> '%s' has wrong id '%s'\n")
616 % old_id % new_id % confirm);
617 }
618 }
619 else
620 {
621 L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id);
622 shared_ptr<delayed_packet> dp;
623 dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, true));
624 shared_ptr<prerequisite> fp;
625 pimpl->get_file_prereq(old_id, fp);
626 dp->add_prerequisite(fp);
627 fp->add_dependent(dp);
628 }
629 }
630 else
631 L(F("skipping delta to existing file version %s\n") % new_id);
632 ++(pimpl->count);
633 guard.commit();
634}
635
636void
637packet_db_writer::consume_file_reverse_delta(file_id const & new_id,
638 file_id const & old_id,
639 file_delta const & del)
640{
641 transaction_guard guard(pimpl->app.db);
642 if (! pimpl->file_version_exists_in_db(old_id))
643 {
644 if (pimpl->file_version_exists_in_db(new_id))
645 {
646 file_id confirm;
647 file_data new_dat;
648 base64< gzip<data> > old_dat;
649 pimpl->app.db.get_file_version(new_id, new_dat);
650 patch(new_dat.inner(), del.inner(), old_dat);
651 calculate_ident(file_data(old_dat), confirm);
652 if (confirm == old_id)
653 {
654 pimpl->app.db.put_file_reverse_version(new_id, old_id, del);
655 pimpl->accepted_file(old_id, *this);
656 }
657 else
658 {
659 W(F("reconstructed file from reverse delta '%s' -> '%s' has wrong id '%s'\n")
660 % new_id % old_id % confirm);
661 }
662 }
663 else
664 {
665 L(F("delaying reverse file delta %s -> %s for preimage\n") % new_id % old_id);
666 shared_ptr<delayed_packet> dp;
667 dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, false));
668 shared_ptr<prerequisite> fp;
669 pimpl->get_file_prereq(new_id, fp);
670 dp->add_prerequisite(fp);
671 fp->add_dependent(dp);
672 }
673 }
674 else
675 L(F("skipping reverse delta to existing file version %s\n") % old_id);
676 ++(pimpl->count);
677 guard.commit();
678}
679
680
681void
682packet_db_writer::consume_manifest_data(manifest_id const & ident,
683 manifest_data const & dat)
684{
685 transaction_guard guard(pimpl->app.db);
686 if (! pimpl->manifest_version_exists_in_db(ident))
687 {
688 pimpl->app.db.put_manifest(ident, dat);
689 pimpl->accepted_manifest(ident, *this);
690 }
691 else
692 L(F("skipping existing manifest version %s\n") % ident);
693 ++(pimpl->count);
694 guard.commit();
695}
696
697void
698packet_db_writer::consume_manifest_delta(manifest_id const & old_id,
699 manifest_id const & new_id,
700 manifest_delta const & del)
701{
702 transaction_guard guard(pimpl->app.db);
703 if (! pimpl->manifest_version_exists_in_db(new_id))
704 {
705 if (pimpl->manifest_version_exists_in_db(old_id))
706 {
707 manifest_id confirm;
708 manifest_data old_dat;
709 base64< gzip<data> > new_dat;
710 pimpl->app.db.get_manifest_version(old_id, old_dat);
711 patch(old_dat.inner(), del.inner(), new_dat);
712 calculate_ident(manifest_data(new_dat), confirm);
713 if (confirm == new_id)
714 {
715 pimpl->app.db.put_manifest_version(old_id, new_id, del);
716 pimpl->accepted_manifest(new_id, *this);
717 }
718 else
719 {
720 W(F("reconstructed manifest from delta '%s' -> '%s' has wrong id '%s'\n")
721 % old_id % new_id % confirm);
722 }
723 }
724 else
725 {
726 L(F("delaying manifest delta %s -> %s for preimage\n") % old_id % new_id);
727 shared_ptr<delayed_packet> dp;
728 dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, true));
729 shared_ptr<prerequisite> fp;
730 pimpl->get_manifest_prereq(old_id, fp);
731 dp->add_prerequisite(fp);
732 fp->add_dependent(dp);
733 }
734 }
735 else
736 L(F("skipping delta to existing manifest version %s\n") % new_id);
737 ++(pimpl->count);
738 guard.commit();
739}
740
741void
742packet_db_writer::consume_manifest_reverse_delta(manifest_id const & new_id,
743 manifest_id const & old_id,
744 manifest_delta const & del)
745{
746 transaction_guard guard(pimpl->app.db);
747 if (! pimpl->manifest_version_exists_in_db(old_id))
748 {
749 if (pimpl->manifest_version_exists_in_db(new_id))
750 {
751 manifest_id confirm;
752 manifest_data new_dat;
753 base64< gzip<data> > old_dat;
754 pimpl->app.db.get_manifest_version(new_id, new_dat);
755 patch(new_dat.inner(), del.inner(), old_dat);
756 calculate_ident(manifest_data(old_dat), confirm);
757 if (confirm == old_id)
758 {
759 pimpl->app.db.put_manifest_reverse_version(new_id, old_id, del);
760 pimpl->accepted_manifest(old_id, *this);
761 }
762 else
763 {
764 W(F("reconstructed manifest from reverse delta '%s' -> '%s' has wrong id '%s'\n")
765 % new_id % old_id % confirm);
766 }
767 }
768 else
769 {
770 L(F("delaying manifest reverse delta %s -> %s for preimage\n") % new_id % old_id);
771 shared_ptr<delayed_packet> dp;
772 dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, false));
773 shared_ptr<prerequisite> fp;
774 pimpl->get_manifest_prereq(new_id, fp);
775 dp->add_prerequisite(fp);
776 fp->add_dependent(dp);
777 }
778 }
779 else
780 L(F("skipping reverse delta to existing manifest version %s\n") % old_id);
781 ++(pimpl->count);
782 guard.commit();
783}
784
785
786void
787packet_db_writer::consume_revision_data(revision_id const & ident,
788 revision_data const & dat)
789{
790 transaction_guard guard(pimpl->app.db);
791 if (! pimpl->revision_exists_in_db(ident))
792 {
793
794 shared_ptr<delayed_packet> dp;
795 dp = shared_ptr<delayed_packet>(new delayed_revision_data_packet(ident, dat));
796
797 revision_set rev;
798 read_revision_set(dat, rev);
799
800 if (! pimpl->manifest_version_exists_in_db(rev.new_manifest))
801 {
802 L(F("delaying revision %s for new manifest %s\n")
803 % ident % rev.new_manifest);
804 shared_ptr<prerequisite> fp;
805 pimpl->get_manifest_prereq(rev.new_manifest, fp);
806 dp->add_prerequisite(fp);
807 fp->add_dependent(dp);
808 }
809
810 for (edge_map::const_iterator i = rev.edges.begin();
811 i != rev.edges.end(); ++i)
812 {
813 if (! (edge_old_manifest(i).inner()().empty()
814 || pimpl->manifest_version_exists_in_db(edge_old_manifest(i))))
815 {
816 L(F("delaying revision %s for old manifest %s\n")
817 % ident % edge_old_manifest(i));
818 shared_ptr<prerequisite> fp;
819 pimpl->get_manifest_prereq(edge_old_manifest(i), fp);
820 dp->add_prerequisite(fp);
821 fp->add_dependent(dp);
822 }
823 if (! (edge_old_revision(i).inner()().empty()
824 || pimpl->revision_exists_in_db(edge_old_revision(i))))
825 {
826 L(F("delaying revision %s for old revision %s\n")
827 % ident % edge_old_revision(i));
828 shared_ptr<prerequisite> fp;
829 pimpl->get_revision_prereq(edge_old_revision(i), fp);
830 dp->add_prerequisite(fp);
831 fp->add_dependent(dp);
832 }
833 for (change_set::delta_map::const_iterator d = edge_changes(i).deltas.begin();
834 d != edge_changes(i).deltas.end(); ++d)
835 {
836 if (! (delta_entry_src(d).inner()().empty()
837 || pimpl->file_version_exists_in_db(delta_entry_src(d))))
838 {
839 L(F("delaying revision %s for old file %s\n")
840 % ident % delta_entry_src(d));
841 shared_ptr<prerequisite> fp;
842 pimpl->get_file_prereq(delta_entry_src(d), fp);
843 dp->add_prerequisite(fp);
844 fp->add_dependent(dp);
845 }
846 I(!delta_entry_dst(d).inner()().empty());
847 if (! pimpl->file_version_exists_in_db(delta_entry_dst(d)))
848 {
849 L(F("delaying revision %s for new file %s\n")
850 % ident % delta_entry_dst(d));
851 shared_ptr<prerequisite> fp;
852 pimpl->get_file_prereq(delta_entry_dst(d), fp);
853 dp->add_prerequisite(fp);
854 fp->add_dependent(dp);
855 }
856 }
857 }
858
859 if (dp->all_prerequisites_satisfied())
860 {
861 pimpl->app.db.put_revision(ident, dat);
862 pimpl->accepted_revision(ident, *this);
863 }
864 }
865 else
866 L(F("skipping existing revision %s\n") % ident);
867 ++(pimpl->count);
868 guard.commit();
869}
870
871void
872packet_db_writer::consume_revision_cert(revision<cert> const & t)
873{
874 transaction_guard guard(pimpl->app.db);
875 if (! pimpl->app.db.revision_cert_exists(t))
876 {
877 if (pimpl->revision_exists_in_db(revision_id(t.inner().ident)))
878 {
879 pimpl->app.db.put_revision_cert(t);
880 }
881 else
882 {
883 L(F("delaying revision cert on %s\n") % t.inner().ident);
884 shared_ptr<delayed_packet> dp;
885 dp = shared_ptr<delayed_packet>(new delayed_revision_cert_packet(t));
886 shared_ptr<prerequisite> fp;
887 pimpl->get_revision_prereq(revision_id(t.inner().ident), fp);
888 dp->add_prerequisite(fp);
889 fp->add_dependent(dp);
890 }
891 }
892 else
893 {
894 string s;
895 cert_signable_text(t.inner(), s);
896 L(F("skipping existing revision cert %s\n") % s);
897 }
898 ++(pimpl->count);
899 guard.commit();
900}
901
902
903void
904packet_db_writer::consume_public_key(rsa_keypair_id const & ident,
905 base64< rsa_pub_key > const & k)
906{
907 transaction_guard guard(pimpl->app.db);
908 if (! pimpl->take_keys)
909 {
910 W(F("skipping prohibited public key %s\n") % ident);
911 return;
912 }
913 if (! pimpl->app.db.public_key_exists(ident))
914 pimpl->app.db.put_key(ident, k);
915 else
916 {
917 base64<rsa_pub_key> tmp;
918 pimpl->app.db.get_key(ident, tmp);
919 if (!(tmp() == k()))
920 W(F("key '%s' is not equal to key '%s' in database\n") % ident % ident);
921 L(F("skipping existing public key %s\n") % ident);
922 }
923 ++(pimpl->count);
924 guard.commit();
925}
926
927void
928packet_db_writer::consume_private_key(rsa_keypair_id const & ident,
929 base64< arc4<rsa_priv_key> > const & k)
930{
931 transaction_guard guard(pimpl->app.db);
932 if (! pimpl->take_keys)
933 {
934 W(F("skipping prohibited private key %s\n") % ident);
935 return;
936 }
937 if (! pimpl->app.db.private_key_exists(ident))
938 pimpl->app.db.put_key(ident, k);
939 else
940 L(F("skipping existing private key %s\n") % ident);
941 ++(pimpl->count);
942 guard.commit();
943}
944
945
946// --- valved packet writer ---
947
948struct packet_db_valve::impl
949{
950 packet_db_writer writer;
951 std::vector< boost::shared_ptr<delayed_packet> > packets;
952 bool valve_is_open;
953 impl(app_state & app, bool take_keys)
954 : writer(app, take_keys),
955 valve_is_open(false)
956 {}
957 void do_packet(boost::shared_ptr<delayed_packet> packet)
958 {
959 if (valve_is_open)
960 packet->apply_delayed_packet(writer);
961 else
962 packets.push_back(packet);
963 }
964};
965
966packet_db_valve::packet_db_valve(app_state & app, bool take_keys)
967 : pimpl(new impl(app, take_keys))
968{}
969
970packet_db_valve::~packet_db_valve()
971{}
972
973void
974packet_db_valve::open_valve()
975{
976 L(F("packet valve opened\n"));
977 pimpl->valve_is_open = true;
978 int written = 0;
979 for (std::vector< boost::shared_ptr<delayed_packet> >::reverse_iterator
980 i = pimpl->packets.rbegin();
981 i != pimpl->packets.rend();
982 ++i)
983 {
984 pimpl->do_packet(*i);
985 ++written;
986 }
987 pimpl->packets.clear();
988 L(F("wrote %i queued packets\n") % written);
989}
990
991#define DOIT(x) pimpl->do_packet(boost::shared_ptr<delayed_packet>(new x));
992
993void
994packet_db_valve::consume_file_data(file_id const & ident,
995 file_data const & dat)
996{
997 DOIT(delayed_file_data_packet(ident, dat));
998}
999
1000void
1001packet_db_valve::consume_file_delta(file_id const & id_old,
1002 file_id const & id_new,
1003 file_delta const & del)
1004{
1005 DOIT(delayed_file_delta_packet(id_old, id_new, del, true));
1006}
1007
1008void
1009packet_db_valve::consume_file_reverse_delta(file_id const & id_new,
1010 file_id const & id_old,
1011 file_delta const & del)
1012{
1013 DOIT(delayed_file_delta_packet(id_old, id_new, del, false));
1014}
1015
1016void
1017packet_db_valve::consume_manifest_data(manifest_id const & ident,
1018 manifest_data const & dat)
1019{
1020 DOIT(delayed_manifest_data_packet(ident, dat));
1021}
1022
1023void
1024packet_db_valve::consume_manifest_delta(manifest_id const & id_old,
1025 manifest_id const & id_new,
1026 manifest_delta const & del)
1027{
1028 DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true));
1029}
1030
1031void
1032packet_db_valve::consume_manifest_reverse_delta(manifest_id const & id_new,
1033 manifest_id const & id_old,
1034 manifest_delta const & del)
1035{
1036 DOIT(delayed_manifest_delta_packet(id_old, id_new, del, false));
1037}
1038
1039void
1040packet_db_valve::consume_revision_data(revision_id const & ident,
1041 revision_data const & dat)
1042{
1043 DOIT(delayed_revision_data_packet(ident, dat));
1044}
1045
1046void
1047packet_db_valve::consume_revision_cert(revision<cert> const & t)
1048{
1049 DOIT(delayed_revision_cert_packet(t));
1050}
1051
1052void
1053packet_db_valve::consume_public_key(rsa_keypair_id const & ident,
1054 base64< rsa_pub_key > const & k)
1055{
1056 DOIT(delayed_public_key_packet(ident, k));
1057}
1058
1059void
1060packet_db_valve::consume_private_key(rsa_keypair_id const & ident,
1061 base64< arc4<rsa_priv_key> > const & k)
1062{
1063 DOIT(delayed_private_key_packet(ident, k));
1064}
1065
1066#undef DOIT
1067
1068// --- packet writer ---
1069
1070packet_writer::packet_writer(ostream & o) : ost(o) {}
1071
1072void
1073packet_writer::consume_file_data(file_id const & ident,
1074 file_data const & dat)
1075{
1076 ost << "[fdata " << ident.inner()() << "]" << endl
1077 << trim_ws(dat.inner()()) << endl
1078 << "[end]" << endl;
1079}
1080
1081void
1082packet_writer::consume_file_delta(file_id const & old_id,
1083 file_id const & new_id,
1084 file_delta const & del)
1085{
1086 ost << "[fdelta " << old_id.inner()() << endl
1087 << " " << new_id.inner()() << "]" << endl
1088 << trim_ws(del.inner()()) << endl
1089 << "[end]" << endl;
1090}
1091
1092void
1093packet_writer::consume_file_reverse_delta(file_id const & new_id,
1094 file_id const & old_id,
1095 file_delta const & del)
1096{
1097 ost << "[frdelta " << new_id.inner()() << endl
1098 << " " << old_id.inner()() << "]" << endl
1099 << trim_ws(del.inner()()) << endl
1100 << "[end]" << endl;
1101}
1102
1103void
1104packet_writer::consume_manifest_data(manifest_id const & ident,
1105 manifest_data const & dat)
1106{
1107 ost << "[mdata " << ident.inner()() << "]" << endl
1108 << trim_ws(dat.inner()()) << endl
1109 << "[end]" << endl;
1110}
1111
1112void
1113packet_writer::consume_revision_data(revision_id const & ident,
1114 revision_data const & dat)
1115{
1116 ost << "[rdata " << ident.inner()() << "]" << endl
1117 << trim_ws(dat.inner()()) << endl
1118 << "[end]" << endl;
1119}
1120
1121void
1122packet_writer::consume_manifest_delta(manifest_id const & old_id,
1123 manifest_id const & new_id,
1124 manifest_delta const & del)
1125{
1126 ost << "[mdelta " << old_id.inner()() << endl
1127 << " " << new_id.inner()() << "]" << endl
1128 << trim_ws(del.inner()()) << endl
1129 << "[end]" << endl;
1130}
1131
1132void
1133packet_writer::consume_manifest_reverse_delta(manifest_id const & new_id,
1134 manifest_id const & old_id,
1135 manifest_delta const & del)
1136{
1137 ost << "[mrdelta " << new_id.inner()() << endl
1138 << " " << old_id.inner()() << "]" << endl
1139 << trim_ws(del.inner()()) << endl
1140 << "[end]" << endl;
1141}
1142
1143void
1144packet_writer::consume_revision_cert(revision<cert> const & t)
1145{
1146 ost << "[rcert " << t.inner().ident() << endl
1147 << " " << t.inner().name() << endl
1148 << " " << t.inner().key() << endl
1149 << " " << trim_ws(t.inner().value()) << "]" << endl
1150 << trim_ws(t.inner().sig()) << endl
1151 << "[end]" << endl;
1152}
1153
1154void
1155packet_writer::consume_public_key(rsa_keypair_id const & ident,
1156 base64< rsa_pub_key > const & k)
1157{
1158 ost << "[pubkey " << ident() << "]" << endl
1159 << trim_ws(k()) << endl
1160 << "[end]" << endl;
1161}
1162
1163void
1164packet_writer::consume_private_key(rsa_keypair_id const & ident,
1165 base64< arc4<rsa_priv_key> > const & k)
1166{
1167 ost << "[privkey " << ident() << "]" << endl
1168 << trim_ws(k()) << endl
1169 << "[end]" << endl;
1170}
1171
1172
1173// -- remainder just deals with the regexes for reading packets off streams
1174
1175struct
1176feed_packet_consumer
1177{
1178 size_t & count;
1179 packet_consumer & cons;
1180 feed_packet_consumer(size_t & count, packet_consumer & c) : count(count), cons(c)
1181 {}
1182 bool operator()(match_results<std::string::const_iterator> const & res) const
1183 {
1184 if (res.size() != 17)
1185 throw oops("matched impossible packet with "
1186 + lexical_cast<string>(res.size()) + " matching parts: " +
1187 string(res[0].first, res[0].second));
1188
1189 if (res[1].matched)
1190 {
1191 L(F("read data packet\n"));
1192 I(res[2].matched);
1193 I(res[3].matched);
1194 string head(res[1].first, res[1].second);
1195 string ident(res[2].first, res[2].second);
1196 string body(trim_ws(string(res[3].first, res[3].second)));
1197 if (head == "rdata")
1198 cons.consume_revision_data(revision_id(hexenc<id>(ident)),
1199 revision_data(body));
1200 else if (head == "mdata")
1201 cons.consume_manifest_data(manifest_id(hexenc<id>(ident)),
1202 manifest_data(body));
1203 else if (head == "fdata")
1204 cons.consume_file_data(file_id(hexenc<id>(ident)),
1205 file_data(body));
1206 else
1207 throw oops("matched impossible data packet with head '" + head + "'");
1208 }
1209 else if (res[4].matched)
1210 {
1211 L(F("read delta packet\n"));
1212 I(res[5].matched);
1213 I(res[6].matched);
1214 I(res[7].matched);
1215 string head(res[4].first, res[4].second);
1216 string src_id(res[5].first, res[5].second);
1217 string dst_id(res[6].first, res[6].second);
1218 string body(trim_ws(string(res[7].first, res[7].second)));
1219 if (head == "mdelta")
1220 cons.consume_manifest_delta(manifest_id(hexenc<id>(src_id)),
1221 manifest_id(hexenc<id>(dst_id)),
1222 manifest_delta(body));
1223 else if (head == "fdelta")
1224 cons.consume_file_delta(file_id(hexenc<id>(src_id)),
1225 file_id(hexenc<id>(dst_id)),
1226 file_delta(body));
1227 else if (head == "mrdelta")
1228 cons.consume_manifest_reverse_delta(manifest_id(hexenc<id>(src_id)),
1229 manifest_id(hexenc<id>(dst_id)),
1230 manifest_delta(body));
1231 else if (head == "frdelta")
1232 cons.consume_file_reverse_delta(file_id(hexenc<id>(src_id)),
1233 file_id(hexenc<id>(dst_id)),
1234 file_delta(body));
1235 else
1236 throw oops("matched impossible delta packet with head '" + head + "'");
1237 }
1238 else if (res[8].matched)
1239 {
1240 L(F("read cert packet\n"));
1241 I(res[9].matched);
1242 I(res[10].matched);
1243 I(res[11].matched);
1244 I(res[12].matched);
1245 I(res[13].matched);
1246 string head(res[8].first, res[8].second);
1247 string ident(res[9].first, res[9].second);
1248 string certname(res[10].first, res[10].second);
1249 string key(res[11].first, res[11].second);
1250 string val(res[12].first, res[12].second);
1251 string body(res[13].first, res[13].second);
1252
1253 // canonicalize the base64 encodings to permit searches
1254 cert t = cert(hexenc<id>(ident),
1255 cert_name(certname),
1256 base64<cert_value>(canonical_base64(val)),
1257 rsa_keypair_id(key),
1258 base64<rsa_sha1_signature>(canonical_base64(body)));
1259 if (head == "rcert")
1260 cons.consume_revision_cert(revision<cert>(t));
1261 else
1262 throw oops("matched impossible cert packet with head '" + head + "'");
1263 }
1264 else if (res[14].matched)
1265 {
1266 L(F("read key data packet\n"));
1267 I(res[15].matched);
1268 I(res[16].matched);
1269 string head(res[14].first, res[14].second);
1270 string ident(res[15].first, res[15].second);
1271 string body(trim_ws(string(res[16].first, res[16].second)));
1272 if (head == "pubkey")
1273 cons.consume_public_key(rsa_keypair_id(ident),
1274 base64<rsa_pub_key>(body));
1275 else if (head == "privkey")
1276 cons.consume_private_key(rsa_keypair_id(ident),
1277 base64< arc4<rsa_priv_key> >(body));
1278 else
1279 throw oops("matched impossible key data packet with head '" + head + "'");
1280 }
1281 else
1282 return true;
1283 ++count;
1284 return true;
1285 }
1286};
1287
1288static size_t
1289extract_packets(string const & s, packet_consumer & cons)
1290{
1291 string const ident("([[:xdigit:]]{40})");
1292 string const sp("[[:space:]]+");
1293 string const bra("\\[");
1294 string const ket("\\]");
1295 string const certhead("(rcert)");
1296 string const datahead("([mfr]data)");
1297 string const deltahead("([mf]r?delta)");
1298 string const keyhead("(pubkey|privkey)");
1299 string const key("([-a-zA-Z0-9\\.@]+)");
1300 string const certname("([-a-zA-Z0-9]+)");
1301 string const base64("([a-zA-Z0-9+/=[:space:]]+)");
1302 string const end("\\[end\\]");
1303 string const data = bra + datahead + sp + ident + ket + base64 + end;
1304 string const delta = bra + deltahead + sp + ident + sp + ident + ket + base64 + end;
1305 string const cert = bra
1306 + certhead + sp + ident + sp + certname + sp + key + sp + base64
1307 + ket
1308 + base64 + end;
1309 string const keydata = bra + keyhead + sp + key + ket + base64 + end;
1310 string const biggie = (data + "|" + delta + "|" + cert + "|" + keydata);
1311 regex expr(biggie);
1312 size_t count = 0;
1313 regex_grep(feed_packet_consumer(count, cons), s, expr, match_default);
1314 return count;
1315}
1316
1317
1318size_t
1319read_packets(istream & in, packet_consumer & cons)
1320{
1321 string accum, tmp;
1322 size_t count = 0;
1323 size_t const bufsz = 0xff;
1324 char buf[bufsz];
1325 string const end("[end]");
1326 while(in)
1327 {
1328 in.read(buf, bufsz);
1329 accum.append(buf, in.gcount());
1330 string::size_type endpos = string::npos;
1331 endpos = accum.rfind(end);
1332 if (endpos != string::npos)
1333 {
1334 endpos += end.size();
1335 string tmp = accum.substr(0, endpos);
1336 count += extract_packets(tmp, cons);
1337 if (endpos < accum.size() - 1)
1338 accum = accum.substr(endpos+1);
1339 else
1340 accum.clear();
1341 }
1342 }
1343 return count;
1344}
1345
1346
1347#ifdef BUILD_UNIT_TESTS
1348#include "unit_tests.hh"
1349#include "transforms.hh"
1350#include "manifest.hh"
1351
1352static void
1353packet_roundabout_test()
1354{
1355 string tmp;
1356
1357 {
1358 ostringstream oss;
1359 packet_writer pw(oss);
1360
1361 // an fdata packet
1362 base64< gzip<data> > gzdata;
1363 pack(data("this is some file data"), gzdata);
1364 file_data fdata(gzdata);
1365 file_id fid;
1366 calculate_ident(fdata, fid);
1367 pw.consume_file_data(fid, fdata);
1368
1369 // an fdelta packet
1370 base64< gzip<data> > gzdata2;
1371 pack(data("this is some file data which is not the same as the first one"), gzdata2);
1372 file_data fdata2(gzdata2);
1373 file_id fid2;
1374 calculate_ident(fdata2, fid);
1375 base64< gzip<delta> > del;
1376 diff(fdata.inner(), fdata2.inner(), del);
1377 pw.consume_file_delta(fid, fid2, file_delta(del));
1378
1379 // a cert packet
1380 base64<cert_value> val;
1381 encode_base64(cert_value("peaches"), val);
1382 base64<rsa_sha1_signature> sig;
1383 encode_base64(rsa_sha1_signature("blah blah there is no way this is a valid signature"), sig);
1384 // should be a type violation to use a file id here instead of a revision
1385 // id, but no-one checks...
1386 cert c(fid.inner(), cert_name("smell"), val,
1387 rsa_keypair_id("fun@moonman.com"), sig);
1388 pw.consume_revision_cert(revision<cert>(c));
1389
1390 // a manifest data packet
1391 manifest_map mm;
1392 manifest_data mdata;
1393 manifest_id mid;
1394 mm.insert(make_pair(file_path("foo/bar.txt"),
1395 file_id(hexenc<id>("cfb81b30ab3133a31b52eb50bd1c86df67eddec4"))));
1396 write_manifest_map(mm, mdata);
1397 calculate_ident(mdata, mid);
1398 pw.consume_manifest_data(mid, mdata);
1399
1400 // a manifest delta packet
1401 manifest_map mm2;
1402 manifest_data mdata2;
1403 manifest_id mid2;
1404 manifest_delta mdelta;
1405 mm2.insert(make_pair(file_path("foo/bar.txt"),
1406 file_id(hexenc<id>("5b20eb5e5bdd9cd674337fc95498f468d80ef7bc"))));
1407 mm2.insert(make_pair(file_path("bunk.txt"),
1408 file_id(hexenc<id>("54f373ed07b4c5a88eaa93370e1bbac02dc432a8"))));
1409 write_manifest_map(mm2, mdata2);
1410 calculate_ident(mdata2, mid2);
1411 base64< gzip<delta> > del2;
1412 diff(mdata.inner(), mdata2.inner(), del2);
1413 pw.consume_manifest_delta(mid, mid2, manifest_delta(del));
1414
1415 // a public key packet
1416 base64<rsa_pub_key> puk;
1417 encode_base64(rsa_pub_key("this is not a real rsa key"), puk);
1418 pw.consume_public_key(rsa_keypair_id("test@lala.com"), puk);
1419
1420 // a private key packet
1421 base64< arc4<rsa_priv_key> > pik;
1422 encode_base64(arc4<rsa_priv_key>
1423 (rsa_priv_key("this is not a real rsa key either!")), pik);
1424
1425 pw.consume_private_key(rsa_keypair_id("test@lala.com"), pik);
1426
1427 }
1428
1429 for (int i = 0; i < 10; ++i)
1430 {
1431 // now spin around sending and receiving this a few times
1432 ostringstream oss;
1433 packet_writer pw(oss);
1434 istringstream iss(tmp);
1435 read_packets(iss, pw);
1436 BOOST_CHECK(oss.str() == tmp);
1437 tmp = oss.str();
1438 }
1439}
1440
1441void
1442add_packet_tests(test_suite * suite)
1443{
1444 I(suite);
1445 suite->add(BOOST_TEST_CASE(&packet_roundabout_test));
1446}
1447
1448#endif // BUILD_UNIT_TESTS

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status