monotone

monotone Mtn Source Tree

Root/packet.cc

1// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <iostream>
7#include <string>
8
9#include <boost/optional.hpp>
10#include <boost/regex.hpp>
11#include <boost/lexical_cast.hpp>
12#include <boost/shared_ptr.hpp>
13
14#include "app_state.hh"
15#include "change_set.hh"
16#include "constants.hh"
17#include "packet.hh"
18#include "revision.hh"
19#include "sanity.hh"
20#include "transforms.hh"
21#include "keys.hh"
22
23using namespace std;
24using boost::shared_ptr;
25using boost::lexical_cast;
26using boost::match_default;
27using boost::match_results;
28using boost::regex;
29
30// --- packet db writer --
31//
32// FIXME: this comment is out of date, and untrustworthy.
33//
34// the packet_db_writer::impl class (see below) manages writes to the
35// database. it also ensures that those writes follow the semantic
36// dependencies implied by the objects being written.
37//
38// an incoming manifest delta has three states:
39//
40// when it is first received, it is (probably) "non-constructable".
41// this means that we do not have a way of building its preimage from either
42// the database or from the memory cache of deltas we keep in this class
43//
44// a non-constructable manifest delta is given a prerequisite of
45// constructibility on its preimage.
46//
47// when the preimage becomes constructable, the manifest delta (probably)
48// changes to "non-writable" state. this means that we have a way to build
49// the manifest, we haven't received all the files which depend on it yet,
50// so we won't write it to the database.
51//
52// when a manifest becomes constructable (but not necessarily writable) we
53// call an analyzer back, if we have one, with the pre- and post-states of
54// the delta. this happens in order to give the netsync layer a chance to
55// request all the file deltas which accompany the manifest delta.
56//
57// a non-writable manifest delta is given prerequisites on all its
58// non-existing underlying files, and delayed again.
59//
60// when all the files arrive, a non-writable manifest is written to the
61// database.
62//
63// files are delayed to depend on their preimage, like non-constructable
64// manifests. however, once they are constructable they are immediately
65// written to the database.
66//
67/////////////////////////////////////////////////////////////////
68//
69// how it's done:
70//
71// each manifest or file has a companion class called a "prerequisite". a
72// prerequisite has a set of delayed packets which depend on it. these
73// delayed packets are also called dependents. a prerequisite can either be
74// "unsatisfied" or "satisfied". when it is first constructed, it is
75// unsatisfied. when it is satisfied, it calls all its dependents to inform
76// them that it has become satisfied.
77//
78// when all the prerequisites of a given dependent is satisfied, the
79// dependent writes itself back to the db writer. the dependent is then
80// dead, and the prerequisite will forget about it.
81//
82// dependents' lifetimes are managed by prerequisites. when all
83// prerequisites forget about their dependents, the dependent is destroyed
84// (it is reference counted with a shared pointer). similarly, the
85// packet_db_writer::impl holds references to prerequisites, and when
86// a prerequisite no longer has any dependents, it is dropped from the
87// packet_db_writer::impl, destroying it.
88//
89/////////////////////////////////////////////////////////////////
90//
91// this same machinery is also re-used for the "valved" packet writer, as a
92// convenient way to queue up commands in memory while the valve is closed.
93// in this usage, we simply never add any prerequisites to any packet, and
94// just call apply_delayed_packet when the valve opens.
95
96typedef enum
97 {
98 prereq_revision,
99 prereq_manifest,
100 prereq_file
101 }
102prereq_type;
103
104class delayed_packet;
105
106class
107prerequisite
108{
109 hexenc<id> ident;
110 prereq_type type;
111 set< shared_ptr<delayed_packet> > delayed;
112public:
113 prerequisite(hexenc<id> const & i, prereq_type pt)
114 : ident(i), type(pt)
115 {}
116 void add_dependent(shared_ptr<delayed_packet> p);
117 bool has_live_dependents();
118 void satisfy(shared_ptr<prerequisite> self,
119 packet_db_writer & pw);
120 bool operator<(prerequisite const & other)
121 {
122 return type < other.type ||
123 (type == other.type && ident < other.ident);
124 }
125 // we need to be able to avoid circular dependencies between prerequisite and
126 // delayed_packet shared_ptrs.
127 void cleanup() { delayed.clear(); }
128};
129
130class
131delayed_packet
132{
133 set< shared_ptr<prerequisite> > unsatisfied_prereqs;
134 set< shared_ptr<prerequisite> > satisfied_prereqs;
135public:
136 void add_prerequisite(shared_ptr<prerequisite> p);
137 bool all_prerequisites_satisfied();
138 void prerequisite_satisfied(shared_ptr<prerequisite> p,
139 packet_db_writer & pw);
140 virtual void apply_delayed_packet(packet_db_writer & pw) = 0;
141 virtual ~delayed_packet() {}
142};
143
144void
145prerequisite::add_dependent(shared_ptr<delayed_packet> d)
146{
147 delayed.insert(d);
148}
149
150void
151prerequisite::satisfy(shared_ptr<prerequisite> self,
152 packet_db_writer & pw)
153{
154 set< shared_ptr<delayed_packet> > dead;
155 for (set< shared_ptr<delayed_packet> >::const_iterator i = delayed.begin();
156 i != delayed.end(); ++i)
157 {
158 (*i)->prerequisite_satisfied(self, pw);
159 if ((*i)->all_prerequisites_satisfied())
160 dead.insert(*i);
161 }
162 for (set< shared_ptr<delayed_packet> >::const_iterator i = dead.begin();
163 i != dead.end(); ++i)
164 {
165 delayed.erase(*i);
166 }
167}
168
169void
170delayed_packet::add_prerequisite(shared_ptr<prerequisite> p)
171{
172 unsatisfied_prereqs.insert(p);
173}
174
175bool
176delayed_packet::all_prerequisites_satisfied()
177{
178 return unsatisfied_prereqs.empty();
179}
180
181void
182delayed_packet::prerequisite_satisfied(shared_ptr<prerequisite> p,
183 packet_db_writer & pw)
184{
185 I(unsatisfied_prereqs.find(p) != unsatisfied_prereqs.end());
186 unsatisfied_prereqs.erase(p);
187 satisfied_prereqs.insert(p);
188 if (all_prerequisites_satisfied())
189 {
190 apply_delayed_packet(pw);
191 }
192}
193
194
195// concrete delayed packets
196
197class
198delayed_revision_data_packet
199 : public delayed_packet
200{
201 revision_id ident;
202 revision_data dat;
203public:
204 delayed_revision_data_packet(revision_id const & i,
205 revision_data const & md)
206 : ident(i), dat(md)
207 {}
208 virtual void apply_delayed_packet(packet_db_writer & pw);
209 virtual ~delayed_revision_data_packet();
210};
211
212class
213delayed_manifest_data_packet
214 : public delayed_packet
215{
216 manifest_id ident;
217 manifest_data dat;
218public:
219 delayed_manifest_data_packet(manifest_id const & i,
220 manifest_data const & md)
221 : ident(i), dat(md)
222 {}
223 virtual void apply_delayed_packet(packet_db_writer & pw);
224 virtual ~delayed_manifest_data_packet();
225};
226
227class
228delayed_file_data_packet
229 : public delayed_packet
230{
231 file_id ident;
232 file_data dat;
233public:
234 delayed_file_data_packet(file_id const & i,
235 file_data const & fd)
236 : ident(i), dat(fd)
237 {}
238 virtual void apply_delayed_packet(packet_db_writer & pw);
239 virtual ~delayed_file_data_packet();
240};
241
242class
243delayed_file_delta_packet
244 : public delayed_packet
245{
246 file_id old_id;
247 file_id new_id;
248 file_delta del;
249 bool forward_delta;
250 bool write_full;
251public:
252 delayed_file_delta_packet(file_id const & oi,
253 file_id const & ni,
254 file_delta const & md,
255 bool fwd,
256 bool full = false)
257 : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full)
258 {}
259 virtual void apply_delayed_packet(packet_db_writer & pw);
260 virtual ~delayed_file_delta_packet();
261};
262
263class
264delayed_manifest_delta_packet
265 : public delayed_packet
266{
267 manifest_id old_id;
268 manifest_id new_id;
269 manifest_delta del;
270 bool forward_delta;
271 bool write_full;
272public:
273 delayed_manifest_delta_packet(manifest_id const & oi,
274 manifest_id const & ni,
275 manifest_delta const & md,
276 bool fwd,
277 bool full = false)
278 : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full)
279 {}
280 virtual void apply_delayed_packet(packet_db_writer & pw);
281 virtual ~delayed_manifest_delta_packet();
282};
283
284class
285delayed_revision_cert_packet
286 : public delayed_packet
287{
288 revision<cert> c;
289public:
290 delayed_revision_cert_packet(revision<cert> const & c)
291 : c(c)
292 {}
293 virtual void apply_delayed_packet(packet_db_writer & pw);
294 virtual ~delayed_revision_cert_packet();
295};
296
297class
298delayed_public_key_packet
299 : public delayed_packet
300{
301 rsa_keypair_id id;
302 base64<rsa_pub_key> key;
303public:
304 delayed_public_key_packet(rsa_keypair_id const & id,
305 base64<rsa_pub_key> key)
306 : id(id), key(key)
307 {}
308 virtual void apply_delayed_packet(packet_db_writer & pw);
309 virtual ~delayed_public_key_packet();
310};
311
312class
313delayed_keypair_packet
314 : public delayed_packet
315{
316 rsa_keypair_id id;
317 keypair kp;
318public:
319 delayed_keypair_packet(rsa_keypair_id const & id,
320 keypair const & kp)
321 : id(id), kp(kp)
322 {}
323 virtual void apply_delayed_packet(packet_db_writer & pw);
324 virtual ~delayed_keypair_packet();
325};
326
327void
328delayed_revision_data_packet::apply_delayed_packet(packet_db_writer & pw)
329{
330 L(F("writing delayed revision data packet for %s\n") % ident);
331 pw.consume_revision_data(ident, dat);
332}
333
334delayed_revision_data_packet::~delayed_revision_data_packet()
335{
336 if (!all_prerequisites_satisfied())
337 W(F("discarding revision data packet %s with unmet dependencies\n") % ident);
338}
339
340void
341delayed_manifest_data_packet::apply_delayed_packet(packet_db_writer & pw)
342{
343 L(F("writing delayed manifest data packet for %s\n") % ident);
344 pw.consume_manifest_data(ident, dat);
345}
346
347delayed_manifest_data_packet::~delayed_manifest_data_packet()
348{
349 if (!all_prerequisites_satisfied())
350 W(F("discarding manifest data packet %s with unmet dependencies\n") % ident);
351}
352
353void
354delayed_file_data_packet::apply_delayed_packet(packet_db_writer & pw)
355{
356 L(F("writing delayed file data packet for %s\n") % ident);
357 pw.consume_file_data(ident, dat);
358}
359
360delayed_file_data_packet::~delayed_file_data_packet()
361{
362 // files have no prerequisites
363 I(all_prerequisites_satisfied());
364}
365
366void
367delayed_manifest_delta_packet::apply_delayed_packet(packet_db_writer & pw)
368{
369 L(F("writing delayed manifest %s packet for %s -> %s%s\n")
370 % (forward_delta ? "delta" : "reverse delta")
371 % (forward_delta ? old_id : new_id)
372 % (forward_delta ? new_id : old_id)
373 % (write_full ? " (writing in full)" : ""));
374 if (forward_delta)
375 pw.consume_manifest_delta(old_id, new_id, del, write_full);
376 else
377 pw.consume_manifest_reverse_delta(new_id, old_id, del);
378}
379
380delayed_manifest_delta_packet::~delayed_manifest_delta_packet()
381{
382 if (!all_prerequisites_satisfied())
383 W(F("discarding manifest delta packet %s -> %s with unmet dependencies\n")
384 % old_id % new_id);
385}
386
387void
388delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw)
389{
390 L(F("writing delayed file %s packet for %s -> %s%s\n")
391 % (forward_delta ? "delta" : "reverse delta")
392 % (forward_delta ? old_id : new_id)
393 % (forward_delta ? new_id : old_id)
394 % (write_full ? " (writing in full)" : ""));
395 if (forward_delta)
396 pw.consume_file_delta(old_id, new_id, del, write_full);
397 else
398 pw.consume_file_reverse_delta(new_id, old_id, del);
399}
400
401delayed_file_delta_packet::~delayed_file_delta_packet()
402{
403 if (!all_prerequisites_satisfied())
404 W(F("discarding file delta packet %s -> %s with unmet dependencies\n")
405 % old_id % new_id);
406}
407
408void
409delayed_revision_cert_packet::apply_delayed_packet(packet_db_writer & pw)
410{
411 L(F("writing delayed revision cert on %s\n") % c.inner().ident);
412 pw.consume_revision_cert(c);
413}
414
415delayed_revision_cert_packet::~delayed_revision_cert_packet()
416{
417 if (!all_prerequisites_satisfied())
418 W(F("discarding revision cert packet %s with unmet dependencies\n")
419 % c.inner().ident);
420}
421
422void
423delayed_public_key_packet::apply_delayed_packet(packet_db_writer & pw)
424{
425 L(F("writing delayed public key %s\n") % id());
426 pw.consume_public_key(id, key);
427}
428
429delayed_public_key_packet::~delayed_public_key_packet()
430{
431 // keys don't have dependencies
432 I(all_prerequisites_satisfied());
433}
434
435void
436delayed_keypair_packet::apply_delayed_packet(packet_db_writer & pw)
437{
438 L(F("writing delayed private key %s\n") % id());
439 pw.consume_key_pair(id, kp);
440}
441
442delayed_keypair_packet::~delayed_keypair_packet()
443{
444 // keys don't have dependencies
445 I(all_prerequisites_satisfied());
446}
447
448
449void
450packet_consumer::set_on_revision_written(boost::function1<void,
451 revision_id> const & x)
452{
453 on_revision_written=x;
454}
455
456void
457packet_consumer::set_on_cert_written(boost::function1<void,
458 cert const &> const & x)
459{
460 on_cert_written=x;
461}
462
463void
464packet_consumer::set_on_pubkey_written(boost::function1<void, rsa_keypair_id>
465 const & x)
466{
467 on_pubkey_written=x;
468}
469
470void
471packet_consumer::set_on_keypair_written(boost::function1<void, rsa_keypair_id>
472 const & x)
473{
474 on_keypair_written=x;
475}
476
477
478struct packet_db_writer::impl
479{
480 app_state & app;
481 bool take_keys;
482 size_t count;
483
484 map<revision_id, shared_ptr<prerequisite> > revision_prereqs;
485 map<manifest_id, shared_ptr<prerequisite> > manifest_prereqs;
486 map<file_id, shared_ptr<prerequisite> > file_prereqs;
487
488 // ticker cert;
489 // ticker manc;
490 // ticker manw;
491 // ticker filec;
492
493 bool revision_exists_in_db(revision_id const & r);
494 bool manifest_version_exists_in_db(manifest_id const & m);
495 bool file_version_exists_in_db(file_id const & f);
496
497 void get_revision_prereq(revision_id const & revision, shared_ptr<prerequisite> & p);
498 void get_manifest_prereq(manifest_id const & manifest, shared_ptr<prerequisite> & p);
499 void get_file_prereq(file_id const & file, shared_ptr<prerequisite> & p);
500
501 void accepted_revision(revision_id const & r, packet_db_writer & dbw);
502 void accepted_manifest(manifest_id const & m, packet_db_writer & dbw);
503 void accepted_file(file_id const & f, packet_db_writer & dbw);
504
505 impl(app_state & app, bool take_keys)
506 : app(app), take_keys(take_keys), count(0)
507 // cert("cert", 1), manc("manc", 1), manw("manw", 1), filec("filec", 1)
508 {}
509
510 ~impl();
511};
512
513packet_db_writer::packet_db_writer(app_state & app, bool take_keys)
514 : pimpl(new impl(app, take_keys))
515{}
516
517packet_db_writer::~packet_db_writer()
518{}
519
520packet_db_writer::impl::~impl()
521{
522
523 // break any circular dependencies for unsatisfied prerequisites
524 for (map<revision_id, shared_ptr<prerequisite> >::const_iterator i =
525 revision_prereqs.begin(); i != revision_prereqs.end(); i++)
526 {
527 i->second->cleanup();
528 }
529 for (map<manifest_id, shared_ptr<prerequisite> >::const_iterator i =
530 manifest_prereqs.begin(); i != manifest_prereqs.end(); i++)
531 {
532 i->second->cleanup();
533 }
534 for (map<file_id, shared_ptr<prerequisite> >::const_iterator i =
535 file_prereqs.begin(); i != file_prereqs.end(); i++)
536 {
537 i->second->cleanup();
538 }
539}
540
541bool
542packet_db_writer::impl::revision_exists_in_db(revision_id const & r)
543{
544 return app.db.revision_exists(r);
545}
546
547bool
548packet_db_writer::impl::manifest_version_exists_in_db(manifest_id const & m)
549{
550 return app.db.manifest_version_exists(m);
551}
552
553bool
554packet_db_writer::impl::file_version_exists_in_db(file_id const & f)
555{
556 return app.db.file_version_exists(f);
557}
558
559void
560packet_db_writer::impl::get_file_prereq(file_id const & file,
561 shared_ptr<prerequisite> & p)
562{
563 map<file_id, shared_ptr<prerequisite> >::const_iterator i;
564 i = file_prereqs.find(file);
565 if (i != file_prereqs.end())
566 p = i->second;
567 else
568 {
569 p = shared_ptr<prerequisite>(new prerequisite(file.inner(), prereq_file));
570 file_prereqs.insert(make_pair(file, p));
571 }
572}
573
574void
575packet_db_writer::impl::get_manifest_prereq(manifest_id const & man,
576 shared_ptr<prerequisite> & p)
577{
578 map<manifest_id, shared_ptr<prerequisite> >::const_iterator i;
579 i = manifest_prereqs.find(man);
580 if (i != manifest_prereqs.end())
581 p = i->second;
582 else
583 {
584 p = shared_ptr<prerequisite>(new prerequisite(man.inner(), prereq_manifest));
585 manifest_prereqs.insert(make_pair(man, p));
586 }
587}
588
589void
590packet_db_writer::impl::get_revision_prereq(revision_id const & rev,
591 shared_ptr<prerequisite> & p)
592{
593 map<revision_id, shared_ptr<prerequisite> >::const_iterator i;
594 i = revision_prereqs.find(rev);
595 if (i != revision_prereqs.end())
596 p = i->second;
597 else
598 {
599 p = shared_ptr<prerequisite>(new prerequisite(rev.inner(), prereq_revision));
600 revision_prereqs.insert(make_pair(rev, p));
601 }
602}
603
604
605void
606packet_db_writer::impl::accepted_revision(revision_id const & r, packet_db_writer & dbw)
607{
608 L(F("noting acceptence of revision %s\n") % r);
609 map<revision_id, shared_ptr<prerequisite> >::iterator i = revision_prereqs.find(r);
610 if (i != revision_prereqs.end())
611 {
612 shared_ptr<prerequisite> prereq = i->second;
613 revision_prereqs.erase(i);
614 prereq->satisfy(prereq, dbw);
615 }
616}
617
618void
619packet_db_writer::impl::accepted_manifest(manifest_id const & m, packet_db_writer & dbw)
620{
621 L(F("noting acceptence of manifest %s\n") % m);
622 map<manifest_id, shared_ptr<prerequisite> >::iterator i = manifest_prereqs.find(m);
623 if (i != manifest_prereqs.end())
624 {
625 shared_ptr<prerequisite> prereq = i->second;
626 manifest_prereqs.erase(i);
627 prereq->satisfy(prereq, dbw);
628 }
629}
630
631void
632packet_db_writer::impl::accepted_file(file_id const & f, packet_db_writer & dbw)
633{
634 L(F("noting acceptence of file %s\n") % f);
635 map<file_id, shared_ptr<prerequisite> >::iterator i = file_prereqs.find(f);
636 if (i != file_prereqs.end())
637 {
638 shared_ptr<prerequisite> prereq = i->second;
639 file_prereqs.erase(i);
640 prereq->satisfy(prereq, dbw);
641 }
642}
643
644
645void
646packet_db_writer::consume_file_data(file_id const & ident,
647 file_data const & dat)
648{
649 transaction_guard guard(pimpl->app.db);
650 if (! pimpl->file_version_exists_in_db(ident))
651 {
652 pimpl->app.db.put_file(ident, dat);
653 pimpl->accepted_file(ident, *this);
654 }
655 else
656 L(F("skipping existing file version %s\n") % ident);
657 ++(pimpl->count);
658 guard.commit();
659}
660
661void
662packet_db_writer::consume_file_delta(file_id const & old_id,
663 file_id const & new_id,
664 file_delta const & del)
665{
666 consume_file_delta(old_id, new_id, del, false);
667}
668
669void
670packet_db_writer::consume_file_delta(file_id const & old_id,
671 file_id const & new_id,
672 file_delta const & del,
673 bool write_full)
674{
675 transaction_guard guard(pimpl->app.db);
676 if (! pimpl->file_version_exists_in_db(new_id))
677 {
678 if (pimpl->file_version_exists_in_db(old_id))
679 {
680 file_id confirm;
681 file_data old_dat;
682 data new_dat;
683 pimpl->app.db.get_file_version(old_id, old_dat);
684 patch(old_dat.inner(), del.inner(), new_dat);
685 calculate_ident(file_data(new_dat), confirm);
686 if (confirm == new_id)
687 {
688 if (!write_full)
689 pimpl->app.db.put_file_version(old_id, new_id, del);
690 else
691 pimpl->app.db.put_file(new_id, file_data(new_dat));
692 pimpl->accepted_file(new_id, *this);
693 }
694 else
695 {
696 W(F("reconstructed file from delta '%s' -> '%s' has wrong id '%s'\n")
697 % old_id % new_id % confirm);
698 }
699 }
700 else
701 {
702 L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id);
703 shared_ptr<delayed_packet> dp;
704 dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, true, write_full));
705 shared_ptr<prerequisite> fp;
706 pimpl->get_file_prereq(old_id, fp);
707 dp->add_prerequisite(fp);
708 fp->add_dependent(dp);
709 }
710 }
711 else
712 L(F("skipping delta to existing file version %s\n") % new_id);
713 ++(pimpl->count);
714 guard.commit();
715}
716
717void
718packet_db_writer::consume_file_reverse_delta(file_id const & new_id,
719 file_id const & old_id,
720 file_delta const & del)
721{
722 transaction_guard guard(pimpl->app.db);
723 if (! pimpl->file_version_exists_in_db(old_id))
724 {
725 if (pimpl->file_version_exists_in_db(new_id))
726 {
727 file_id confirm;
728 file_data new_dat;
729 data old_dat;
730 pimpl->app.db.get_file_version(new_id, new_dat);
731 patch(new_dat.inner(), del.inner(), old_dat);
732 calculate_ident(file_data(old_dat), confirm);
733 if (confirm == old_id)
734 {
735 pimpl->app.db.put_file_reverse_version(new_id, old_id, del);
736 pimpl->accepted_file(old_id, *this);
737 }
738 else
739 {
740 W(F("reconstructed file from reverse delta '%s' -> '%s' has wrong id '%s'\n")
741 % new_id % old_id % confirm);
742 }
743 }
744 else
745 {
746 L(F("delaying reverse file delta %s -> %s for preimage\n") % new_id % old_id);
747 shared_ptr<delayed_packet> dp;
748 dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, false));
749 shared_ptr<prerequisite> fp;
750 pimpl->get_file_prereq(new_id, fp);
751 dp->add_prerequisite(fp);
752 fp->add_dependent(dp);
753 }
754 }
755 else
756 L(F("skipping reverse delta to existing file version %s\n") % old_id);
757 ++(pimpl->count);
758 guard.commit();
759}
760
761
762void
763packet_db_writer::consume_manifest_data(manifest_id const & ident,
764 manifest_data const & dat)
765{
766 transaction_guard guard(pimpl->app.db);
767 if (! pimpl->manifest_version_exists_in_db(ident))
768 {
769 pimpl->app.db.put_manifest(ident, dat);
770 pimpl->accepted_manifest(ident, *this);
771 }
772 else
773 L(F("skipping existing manifest version %s\n") % ident);
774 ++(pimpl->count);
775 guard.commit();
776}
777
778void
779packet_db_writer::consume_manifest_delta(manifest_id const & old_id,
780 manifest_id const & new_id,
781 manifest_delta const & del)
782{
783 consume_manifest_delta(old_id, new_id, del, false);
784}
785
786void
787packet_db_writer::consume_manifest_delta(manifest_id const & old_id,
788 manifest_id const & new_id,
789 manifest_delta const & del,
790 bool write_full)
791{
792 transaction_guard guard(pimpl->app.db);
793 if (! pimpl->manifest_version_exists_in_db(new_id))
794 {
795 if (pimpl->manifest_version_exists_in_db(old_id))
796 {
797 manifest_id confirm;
798 manifest_data old_dat;
799 data new_dat;
800 pimpl->app.db.get_manifest_version(old_id, old_dat);
801 patch(old_dat.inner(), del.inner(), new_dat);
802 calculate_ident(manifest_data(new_dat), confirm);
803 if (confirm == new_id)
804 {
805 if (!write_full)
806 pimpl->app.db.put_manifest_version(old_id, new_id, del);
807 else
808 pimpl->app.db.put_manifest(new_id, manifest_data(new_dat));
809
810 pimpl->accepted_manifest(new_id, *this);
811 }
812 else
813 {
814 W(F("reconstructed manifest from delta '%s' -> '%s' has wrong id '%s'\n")
815 % old_id % new_id % confirm);
816 }
817 }
818 else
819 {
820 L(F("delaying manifest delta %s -> %s for preimage\n") % old_id % new_id);
821 shared_ptr<delayed_packet> dp;
822 dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, true, write_full));
823 shared_ptr<prerequisite> fp;
824 pimpl->get_manifest_prereq(old_id, fp);
825 dp->add_prerequisite(fp);
826 fp->add_dependent(dp);
827 }
828 }
829 else
830 L(F("skipping delta to existing manifest version %s\n") % new_id);
831 ++(pimpl->count);
832 guard.commit();
833}
834
835void
836packet_db_writer::consume_manifest_reverse_delta(manifest_id const & new_id,
837 manifest_id const & old_id,
838 manifest_delta const & del)
839{
840 transaction_guard guard(pimpl->app.db);
841 if (! pimpl->manifest_version_exists_in_db(old_id))
842 {
843 if (pimpl->manifest_version_exists_in_db(new_id))
844 {
845 manifest_id confirm;
846 manifest_data new_dat;
847 data old_dat;
848 pimpl->app.db.get_manifest_version(new_id, new_dat);
849 patch(new_dat.inner(), del.inner(), old_dat);
850 calculate_ident(manifest_data(old_dat), confirm);
851 if (confirm == old_id)
852 {
853 pimpl->app.db.put_manifest_reverse_version(new_id, old_id, del);
854 pimpl->accepted_manifest(old_id, *this);
855 }
856 else
857 {
858 W(F("reconstructed manifest from reverse delta '%s' -> '%s' has wrong id '%s'\n")
859 % new_id % old_id % confirm);
860 }
861 }
862 else
863 {
864 L(F("delaying manifest reverse delta %s -> %s for preimage\n") % new_id % old_id);
865 shared_ptr<delayed_packet> dp;
866 dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, false));
867 shared_ptr<prerequisite> fp;
868 pimpl->get_manifest_prereq(new_id, fp);
869 dp->add_prerequisite(fp);
870 fp->add_dependent(dp);
871 }
872 }
873 else
874 L(F("skipping reverse delta to existing manifest version %s\n") % old_id);
875 ++(pimpl->count);
876 guard.commit();
877}
878
879
880void
881packet_db_writer::consume_revision_data(revision_id const & ident,
882 revision_data const & dat)
883{
884 transaction_guard guard(pimpl->app.db);
885 if (! pimpl->revision_exists_in_db(ident))
886 {
887
888 shared_ptr<delayed_packet> dp;
889 dp = shared_ptr<delayed_packet>(new delayed_revision_data_packet(ident, dat));
890
891 revision_set rev;
892 read_revision_set(dat, rev);
893
894 if (! pimpl->manifest_version_exists_in_db(rev.new_manifest))
895 {
896 L(F("delaying revision %s for new manifest %s\n")
897 % ident % rev.new_manifest);
898 shared_ptr<prerequisite> fp;
899 pimpl->get_manifest_prereq(rev.new_manifest, fp);
900 dp->add_prerequisite(fp);
901 fp->add_dependent(dp);
902 }
903
904 for (edge_map::const_iterator i = rev.edges.begin();
905 i != rev.edges.end(); ++i)
906 {
907 if (! (edge_old_manifest(i).inner()().empty()
908 || pimpl->manifest_version_exists_in_db(edge_old_manifest(i))))
909 {
910 L(F("delaying revision %s for old manifest %s\n")
911 % ident % edge_old_manifest(i));
912 shared_ptr<prerequisite> fp;
913 pimpl->get_manifest_prereq(edge_old_manifest(i), fp);
914 dp->add_prerequisite(fp);
915 fp->add_dependent(dp);
916 }
917 if (! (edge_old_revision(i).inner()().empty()
918 || pimpl->revision_exists_in_db(edge_old_revision(i))))
919 {
920 L(F("delaying revision %s for old revision %s\n")
921 % ident % edge_old_revision(i));
922 shared_ptr<prerequisite> fp;
923 pimpl->get_revision_prereq(edge_old_revision(i), fp);
924 dp->add_prerequisite(fp);
925 fp->add_dependent(dp);
926 }
927 for (change_set::delta_map::const_iterator d = edge_changes(i).deltas.begin();
928 d != edge_changes(i).deltas.end(); ++d)
929 {
930 if (! (delta_entry_src(d).inner()().empty()
931 || pimpl->file_version_exists_in_db(delta_entry_src(d))))
932 {
933 L(F("delaying revision %s for old file %s\n")
934 % ident % delta_entry_src(d));
935 shared_ptr<prerequisite> fp;
936 pimpl->get_file_prereq(delta_entry_src(d), fp);
937 dp->add_prerequisite(fp);
938 fp->add_dependent(dp);
939 }
940 I(!delta_entry_dst(d).inner()().empty());
941 if (! pimpl->file_version_exists_in_db(delta_entry_dst(d)))
942 {
943 L(F("delaying revision %s for new file %s\n")
944 % ident % delta_entry_dst(d));
945 shared_ptr<prerequisite> fp;
946 pimpl->get_file_prereq(delta_entry_dst(d), fp);
947 dp->add_prerequisite(fp);
948 fp->add_dependent(dp);
949 }
950 }
951 }
952
953 if (dp->all_prerequisites_satisfied())
954 {
955 pimpl->app.db.put_revision(ident, dat);
956 if(on_revision_written) on_revision_written(ident);
957 pimpl->accepted_revision(ident, *this);
958 }
959 }
960 else
961 L(F("skipping existing revision %s\n") % ident);
962 ++(pimpl->count);
963 guard.commit();
964}
965
966void
967packet_db_writer::consume_revision_cert(revision<cert> const & t)
968{
969 transaction_guard guard(pimpl->app.db);
970 if (! pimpl->app.db.revision_cert_exists(t))
971 {
972 if (pimpl->revision_exists_in_db(revision_id(t.inner().ident)))
973 {
974 pimpl->app.db.put_revision_cert(t);
975 if(on_cert_written) on_cert_written(t.inner());
976 }
977 else
978 {
979 L(F("delaying revision cert on %s\n") % t.inner().ident);
980 shared_ptr<delayed_packet> dp;
981 dp = shared_ptr<delayed_packet>(new delayed_revision_cert_packet(t));
982 shared_ptr<prerequisite> fp;
983 pimpl->get_revision_prereq(revision_id(t.inner().ident), fp);
984 dp->add_prerequisite(fp);
985 fp->add_dependent(dp);
986 }
987 }
988 else
989 {
990 string s;
991 cert_signable_text(t.inner(), s);
992 L(F("skipping existing revision cert %s\n") % s);
993 }
994 ++(pimpl->count);
995 guard.commit();
996}
997
998
999void
1000packet_db_writer::consume_public_key(rsa_keypair_id const & ident,
1001 base64< rsa_pub_key > const & k)
1002{
1003 transaction_guard guard(pimpl->app.db);
1004 if (! pimpl->take_keys)
1005 {
1006 W(F("skipping prohibited public key %s\n") % ident);
1007 return;
1008 }
1009 if (! pimpl->app.db.public_key_exists(ident))
1010 {
1011 pimpl->app.db.put_key(ident, k);
1012 if(on_pubkey_written) on_pubkey_written(ident);
1013 }
1014 else
1015 {
1016 base64<rsa_pub_key> tmp;
1017 pimpl->app.db.get_key(ident, tmp);
1018 if (!keys_match(ident, tmp, ident, k))
1019 W(F("key '%s' is not equal to key '%s' in database\n") % ident % ident);
1020 L(F("skipping existing public key %s\n") % ident);
1021 }
1022 ++(pimpl->count);
1023 guard.commit();
1024}
1025
1026void
1027packet_db_writer::consume_key_pair(rsa_keypair_id const & ident,
1028 keypair const & kp)
1029{
1030 transaction_guard guard(pimpl->app.db);
1031 if (! pimpl->take_keys)
1032 {
1033 W(F("skipping prohibited key pair %s\n") % ident);
1034 return;
1035 }
1036 if (! pimpl->app.keys.key_pair_exists(ident))
1037 {
1038 pimpl->app.keys.put_key_pair(ident, kp);
1039 if(on_keypair_written) on_keypair_written(ident);
1040 }
1041 else
1042 L(F("skipping existing key pair %s\n") % ident);
1043 ++(pimpl->count);
1044 guard.commit();
1045}
1046
1047
1048// --- valved packet writer ---
1049
1050struct packet_db_valve::impl
1051{
1052 packet_db_writer writer;
1053 std::vector< boost::shared_ptr<delayed_packet> > packets;
1054 bool valve_is_open;
1055 impl(app_state & app, bool take_keys)
1056 : writer(app, take_keys),
1057 valve_is_open(false)
1058 {}
1059 void do_packet(boost::shared_ptr<delayed_packet> packet)
1060 {
1061 if (valve_is_open)
1062 packet->apply_delayed_packet(writer);
1063 else
1064 packets.push_back(packet);
1065 }
1066};
1067
1068packet_db_valve::packet_db_valve(app_state & app, bool take_keys)
1069 : pimpl(new impl(app, take_keys))
1070{}
1071
1072packet_db_valve::~packet_db_valve()
1073{}
1074
1075void
1076packet_db_valve::open_valve()
1077{
1078 L(F("packet valve opened\n"));
1079 pimpl->valve_is_open = true;
1080 int written = 0;
1081 for (std::vector< boost::shared_ptr<delayed_packet> >::reverse_iterator
1082 i = pimpl->packets.rbegin();
1083 i != pimpl->packets.rend();
1084 ++i)
1085 {
1086 pimpl->do_packet(*i);
1087 ++written;
1088 }
1089 pimpl->packets.clear();
1090 L(F("wrote %i queued packets\n") % written);
1091}
1092
1093#define DOIT(x) pimpl->do_packet(boost::shared_ptr<delayed_packet>(new x));
1094
1095void
1096packet_db_valve::set_on_revision_written(boost::function1<void,
1097 revision_id> const & x)
1098{
1099 on_revision_written=x;
1100 pimpl->writer.set_on_revision_written(x);
1101}
1102
1103void
1104packet_db_valve::set_on_cert_written(boost::function1<void,
1105 cert const &> const & x)
1106{
1107 on_cert_written=x;
1108 pimpl->writer.set_on_cert_written(x);
1109}
1110
1111void
1112packet_db_valve::set_on_pubkey_written(boost::function1<void, rsa_keypair_id>
1113 const & x)
1114{
1115 on_pubkey_written=x;
1116 pimpl->writer.set_on_pubkey_written(x);
1117}
1118
1119void
1120packet_db_valve::set_on_keypair_written(boost::function1<void, rsa_keypair_id>
1121 const & x)
1122{
1123 on_keypair_written=x;
1124 pimpl->writer.set_on_keypair_written(x);
1125}
1126
1127void
1128packet_db_valve::consume_file_data(file_id const & ident,
1129 file_data const & dat)
1130{
1131 DOIT(delayed_file_data_packet(ident, dat));
1132}
1133
1134void
1135packet_db_valve::consume_file_delta(file_id const & id_old,
1136 file_id const & id_new,
1137 file_delta const & del)
1138{
1139 DOIT(delayed_file_delta_packet(id_old, id_new, del, true));
1140}
1141
1142void
1143packet_db_valve::consume_file_delta(file_id const & id_old,
1144 file_id const & id_new,
1145 file_delta const & del,
1146 bool write_full)
1147{
1148 DOIT(delayed_file_delta_packet(id_old, id_new, del, true, write_full));
1149}
1150
1151void
1152packet_db_valve::consume_file_reverse_delta(file_id const & id_new,
1153 file_id const & id_old,
1154 file_delta const & del)
1155{
1156 DOIT(delayed_file_delta_packet(id_old, id_new, del, false));
1157}
1158
1159void
1160packet_db_valve::consume_manifest_data(manifest_id const & ident,
1161 manifest_data const & dat)
1162{
1163 DOIT(delayed_manifest_data_packet(ident, dat));
1164}
1165
1166void
1167packet_db_valve::consume_manifest_delta(manifest_id const & id_old,
1168 manifest_id const & id_new,
1169 manifest_delta const & del)
1170{
1171 DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true));
1172}
1173
1174void
1175packet_db_valve::consume_manifest_delta(manifest_id const & id_old,
1176 manifest_id const & id_new,
1177 manifest_delta const & del,
1178 bool write_full)
1179{
1180 DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true, write_full));
1181}
1182
1183void
1184packet_db_valve::consume_manifest_reverse_delta(manifest_id const & id_new,
1185 manifest_id const & id_old,
1186 manifest_delta const & del)
1187{
1188 DOIT(delayed_manifest_delta_packet(id_old, id_new, del, false));
1189}
1190
1191void
1192packet_db_valve::consume_revision_data(revision_id const & ident,
1193 revision_data const & dat)
1194{
1195 DOIT(delayed_revision_data_packet(ident, dat));
1196}
1197
1198void
1199packet_db_valve::consume_revision_cert(revision<cert> const & t)
1200{
1201 DOIT(delayed_revision_cert_packet(t));
1202}
1203
1204void
1205packet_db_valve::consume_public_key(rsa_keypair_id const & ident,
1206 base64< rsa_pub_key > const & k)
1207{
1208 DOIT(delayed_public_key_packet(ident, k));
1209}
1210
1211void
1212packet_db_valve::consume_key_pair(rsa_keypair_id const & ident,
1213 keypair const & kp)
1214{
1215 DOIT(delayed_keypair_packet(ident, kp));
1216}
1217
1218#undef DOIT
1219
1220// --- packet writer ---
1221
1222packet_writer::packet_writer(ostream & o) : ost(o) {}
1223
1224void
1225packet_writer::consume_file_data(file_id const & ident,
1226 file_data const & dat)
1227{
1228 base64<gzip<data> > packed;
1229 pack(dat.inner(), packed);
1230 ost << "[fdata " << ident.inner()() << "]" << endl
1231 << trim_ws(packed()) << endl
1232 << "[end]" << endl;
1233}
1234
1235void
1236packet_writer::consume_file_delta(file_id const & old_id,
1237 file_id const & new_id,
1238 file_delta const & del)
1239{
1240 base64<gzip<delta> > packed;
1241 pack(del.inner(), packed);
1242 ost << "[fdelta " << old_id.inner()() << endl
1243 << " " << new_id.inner()() << "]" << endl
1244 << trim_ws(packed()) << endl
1245 << "[end]" << endl;
1246}
1247
1248void
1249packet_writer::consume_file_reverse_delta(file_id const & new_id,
1250 file_id const & old_id,
1251 file_delta const & del)
1252{
1253 base64<gzip<delta> > packed;
1254 pack(del.inner(), packed);
1255 ost << "[frdelta " << new_id.inner()() << endl
1256 << " " << old_id.inner()() << "]" << endl
1257 << trim_ws(packed()) << endl
1258 << "[end]" << endl;
1259}
1260
1261void
1262packet_writer::consume_manifest_data(manifest_id const & ident,
1263 manifest_data const & dat)
1264{
1265 base64<gzip<data> > packed;
1266 pack(dat.inner(), packed);
1267 ost << "[mdata " << ident.inner()() << "]" << endl
1268 << trim_ws(packed()) << endl
1269 << "[end]" << endl;
1270}
1271
1272void
1273packet_writer::consume_revision_data(revision_id const & ident,
1274 revision_data const & dat)
1275{
1276 base64<gzip<data> > packed;
1277 pack(dat.inner(), packed);
1278 ost << "[rdata " << ident.inner()() << "]" << endl
1279 << trim_ws(packed()) << endl
1280 << "[end]" << endl;
1281}
1282
1283void
1284packet_writer::consume_manifest_delta(manifest_id const & old_id,
1285 manifest_id const & new_id,
1286 manifest_delta const & del)
1287{
1288 base64<gzip<delta> > packed;
1289 pack(del.inner(), packed);
1290 ost << "[mdelta " << old_id.inner()() << endl
1291 << " " << new_id.inner()() << "]" << endl
1292 << trim_ws(packed()) << endl
1293 << "[end]" << endl;
1294}
1295
1296void
1297packet_writer::consume_manifest_reverse_delta(manifest_id const & new_id,
1298 manifest_id const & old_id,
1299 manifest_delta const & del)
1300{
1301 base64<gzip<delta> > packed;
1302 pack(del.inner(), packed);
1303 ost << "[mrdelta " << new_id.inner()() << endl
1304 << " " << old_id.inner()() << "]" << endl
1305 << trim_ws(packed()) << endl
1306 << "[end]" << endl;
1307}
1308
1309void
1310packet_writer::consume_revision_cert(revision<cert> const & t)
1311{
1312 ost << "[rcert " << t.inner().ident() << endl
1313 << " " << t.inner().name() << endl
1314 << " " << t.inner().key() << endl
1315 << " " << trim_ws(t.inner().value()) << "]" << endl
1316 << trim_ws(t.inner().sig()) << endl
1317 << "[end]" << endl;
1318}
1319
1320void
1321packet_writer::consume_public_key(rsa_keypair_id const & ident,
1322 base64< rsa_pub_key > const & k)
1323{
1324 ost << "[pubkey " << ident() << "]" << endl
1325 << trim_ws(k()) << endl
1326 << "[end]" << endl;
1327}
1328
1329void
1330packet_writer::consume_key_pair(rsa_keypair_id const & ident,
1331 keypair const & kp)
1332{
1333 ost << "[keypair " << ident() << "]" << endl
1334 << trim_ws(kp.pub()) <<"#\n" <<trim_ws(kp.priv()) << endl
1335 << "[end]" << endl;
1336}
1337
1338
1339// -- remainder just deals with the regexes for reading packets off streams
1340
1341struct
1342feed_packet_consumer
1343{
1344 app_state & app;
1345 size_t & count;
1346 packet_consumer & cons;
1347 std::string ident;
1348 std::string key;
1349 std::string certname;
1350 std::string base;
1351 std::string sp;
1352 feed_packet_consumer(size_t & count, packet_consumer & c, app_state & app_)
1353 : app(app_), count(count), cons(c),
1354 ident(constants::regex_legal_id_bytes),
1355 key(constants::regex_legal_key_name_bytes),
1356 certname(constants::regex_legal_cert_name_bytes),
1357 base(constants::regex_legal_packet_bytes),
1358 sp("[[:space:]]+")
1359 {}
1360 void require(bool x) const
1361 {
1362 E(x, F("malformed packet"));
1363 }
1364 bool operator()(match_results<std::string::const_iterator> const & res) const
1365 {
1366 if (res.size() != 4)
1367 throw oops("matched impossible packet with "
1368 + lexical_cast<string>(res.size()) + " matching parts: " +
1369 string(res[0].first, res[0].second));
1370 I(res[1].matched);
1371 I(res[2].matched);
1372 I(res[3].matched);
1373 std::string type(res[1].first, res[1].second);
1374 std::string args(res[2].first, res[2].second);
1375 std::string body(res[3].first, res[3].second);
1376 if (regex_match(type, regex("[mfr]data")))
1377 {
1378 L(F("read data packet"));
1379 require(regex_match(args, regex(ident)));
1380 require(regex_match(body, regex(base)));
1381 base64<gzip<data> > body_packed(trim_ws(body));
1382 data contents;
1383 unpack(body_packed, contents);
1384 if (type == "rdata")
1385 cons.consume_revision_data(revision_id(hexenc<id>(args)),
1386 revision_data(contents));
1387 else if (type == "mdata")
1388 cons.consume_manifest_data(manifest_id(hexenc<id>(args)),
1389 manifest_data(contents));
1390 else if (type == "fdata")
1391 cons.consume_file_data(file_id(hexenc<id>(args)),
1392 file_data(contents));
1393 else
1394 throw oops("matched impossible data packet with head '" + type + "'");
1395 }
1396 else if (regex_match(type, regex("[mf]r?delta")))
1397 {
1398 L(F("read delta packet"));
1399 match_results<std::string::const_iterator> matches;
1400 require(regex_match(args, matches, regex(ident + sp + ident)));
1401 string src_id(matches[1].first, matches[1].second);
1402 string dst_id(matches[2].first, matches[2].second);
1403 require(regex_match(body, regex(base)));
1404 base64<gzip<delta> > body_packed(trim_ws(body));
1405 delta contents;
1406 unpack(body_packed, contents);
1407 if (type == "mdelta")
1408 cons.consume_manifest_delta(manifest_id(hexenc<id>(src_id)),
1409 manifest_id(hexenc<id>(dst_id)),
1410 manifest_delta(contents));
1411 else if (type == "fdelta")
1412 cons.consume_file_delta(file_id(hexenc<id>(src_id)),
1413 file_id(hexenc<id>(dst_id)),
1414 file_delta(contents));
1415 else if (type == "mrdelta")
1416 cons.consume_manifest_reverse_delta(manifest_id(hexenc<id>(src_id)),
1417 manifest_id(hexenc<id>(dst_id)),
1418 manifest_delta(contents));
1419 else if (type == "frdelta")
1420 cons.consume_file_reverse_delta(file_id(hexenc<id>(src_id)),
1421 file_id(hexenc<id>(dst_id)),
1422 file_delta(contents));
1423 else
1424 throw oops("matched impossible delta packet with head '"
1425 + type + "'");
1426 }
1427 else if (type == "rcert")
1428 {
1429 L(F("read cert packet"));
1430 match_results<std::string::const_iterator> matches;
1431 require(regex_match(args, matches, regex(ident + sp + certname
1432 + sp + key + sp + base)));
1433 string certid(matches[1].first, matches[1].second);
1434 string name(matches[2].first, matches[2].second);
1435 string keyid(matches[3].first, matches[3].second);
1436 string val(matches[4].first, matches[4].second);
1437 string contents(trim_ws(body));
1438
1439 // canonicalize the base64 encodings to permit searches
1440 cert t = cert(hexenc<id>(certid),
1441 cert_name(name),
1442 base64<cert_value>(canonical_base64(val)),
1443 rsa_keypair_id(keyid),
1444 base64<rsa_sha1_signature>(canonical_base64(contents)));
1445 cons.consume_revision_cert(revision<cert>(t));
1446 }
1447 else if (type == "pubkey")
1448 {
1449 L(F("read pubkey data packet"));
1450 require(regex_match(args, regex(key)));
1451 require(regex_match(body, regex(base)));
1452 string contents(trim_ws(body));
1453 cons.consume_public_key(rsa_keypair_id(args),
1454 base64<rsa_pub_key>(contents));
1455 }
1456 else if (type == "keypair")
1457 {
1458 L(F("read keypair data packet"));
1459 require(regex_match(args, regex(key)));
1460 match_results<std::string::const_iterator> matches;
1461 require(regex_match(body, matches, regex(base + "#" + base)));
1462 string pub_dat(trim_ws(string(matches[1].first, matches[1].second)));
1463 string priv_dat(trim_ws(string(matches[2].first, matches[2].second)));
1464 cons.consume_key_pair(rsa_keypair_id(args), keypair(pub_dat, priv_dat));
1465 }
1466 else if (type == "privkey")
1467 {
1468 L(F("read pubkey data packet"));
1469 require(regex_match(args, regex(key)));
1470 require(regex_match(body, regex(base)));
1471 string contents(trim_ws(body));
1472 keypair kp;
1473 migrate_private_key(app,
1474 rsa_keypair_id(args),
1475 base64<arc4<rsa_priv_key> >(contents),
1476 kp);
1477 cons.consume_key_pair(rsa_keypair_id(args), kp);
1478 }
1479 else
1480 {
1481 W(F("unknown packet type: '%s'") % type);
1482 return true;
1483 }
1484 ++count;
1485 return true;
1486 }
1487};
1488
1489static size_t
1490extract_packets(string const & s, packet_consumer & cons, app_state & app)
1491{
1492 string const head("\\[([a-z]+)[[:space:]]+([^\\[\\]]+)\\]");
1493 string const body("([^\\[\\]]+)");
1494 string const tail("\\[end\\]");
1495 string const whole = head + body + tail;
1496 regex expr(whole);
1497 size_t count = 0;
1498 regex_grep(feed_packet_consumer(count, cons, app), s, expr, match_default);
1499 return count;
1500}
1501
1502
1503size_t
1504read_packets(istream & in, packet_consumer & cons, app_state & app)
1505{
1506 string accum, tmp;
1507 size_t count = 0;
1508 size_t const bufsz = 0xff;
1509 char buf[bufsz];
1510 string const end("[end]");
1511 while(in)
1512 {
1513 in.read(buf, bufsz);
1514 accum.append(buf, in.gcount());
1515 string::size_type endpos = string::npos;
1516 endpos = accum.rfind(end);
1517 if (endpos != string::npos)
1518 {
1519 endpos += end.size();
1520 string tmp = accum.substr(0, endpos);
1521 count += extract_packets(tmp, cons, app);
1522 if (endpos < accum.size() - 1)
1523 accum = accum.substr(endpos+1);
1524 else
1525 accum.clear();
1526 }
1527 }
1528 return count;
1529}
1530
1531
1532#ifdef BUILD_UNIT_TESTS
1533#include "unit_tests.hh"
1534#include "transforms.hh"
1535#include "manifest.hh"
1536
1537static void
1538packet_roundabout_test()
1539{
1540 string tmp;
1541
1542 {
1543 ostringstream oss;
1544 packet_writer pw(oss);
1545
1546 // an fdata packet
1547 file_data fdata(data("this is some file data"));
1548 file_id fid;
1549 calculate_ident(fdata, fid);
1550 pw.consume_file_data(fid, fdata);
1551
1552 // an fdelta packet
1553 file_data fdata2(data("this is some file data which is not the same as the first one"));
1554 file_id fid2;
1555 calculate_ident(fdata2, fid2);
1556 delta del;
1557 diff(fdata.inner(), fdata2.inner(), del);
1558 pw.consume_file_delta(fid, fid2, file_delta(del));
1559
1560 // a cert packet
1561 base64<cert_value> val;
1562 encode_base64(cert_value("peaches"), val);
1563 base64<rsa_sha1_signature> sig;
1564 encode_base64(rsa_sha1_signature("blah blah there is no way this is a valid signature"), sig);
1565 // should be a type violation to use a file id here instead of a revision
1566 // id, but no-one checks...
1567 cert c(fid.inner(), cert_name("smell"), val,
1568 rsa_keypair_id("fun@moonman.com"), sig);
1569 pw.consume_revision_cert(revision<cert>(c));
1570
1571 // a manifest data packet
1572 manifest_map mm;
1573 manifest_data mdata;
1574 manifest_id mid;
1575 mm.insert(make_pair(file_path_internal("foo/bar.txt"),
1576 file_id(hexenc<id>("cfb81b30ab3133a31b52eb50bd1c86df67eddec4"))));
1577 write_manifest_map(mm, mdata);
1578 calculate_ident(mdata, mid);
1579 pw.consume_manifest_data(mid, mdata);
1580
1581 // a manifest delta packet
1582 manifest_map mm2;
1583 manifest_data mdata2;
1584 manifest_id mid2;
1585 manifest_delta mdelta;
1586 mm2.insert(make_pair(file_path_internal("foo/bar.txt"),
1587 file_id(hexenc<id>("5b20eb5e5bdd9cd674337fc95498f468d80ef7bc"))));
1588 mm2.insert(make_pair(file_path_internal("bunk.txt"),
1589 file_id(hexenc<id>("54f373ed07b4c5a88eaa93370e1bbac02dc432a8"))));
1590 write_manifest_map(mm2, mdata2);
1591 calculate_ident(mdata2, mid2);
1592 delta del2;
1593 diff(mdata.inner(), mdata2.inner(), del2);
1594 pw.consume_manifest_delta(mid, mid2, manifest_delta(del));
1595
1596 keypair kp;
1597 // a public key packet
1598 encode_base64(rsa_pub_key("this is not a real rsa key"), kp.pub);
1599 pw.consume_public_key(rsa_keypair_id("test@lala.com"), kp.pub);
1600
1601 // a private key packet
1602 encode_base64(rsa_priv_key("this is not a real rsa key either!"), kp.priv);
1603
1604 pw.consume_key_pair(rsa_keypair_id("test@lala.com"), kp);
1605
1606 tmp = oss.str();
1607 }
1608
1609 // read_packets needs this to convert privkeys to keypairs.
1610 // This doesn't test privkey packets (theres a tests/ test for that),
1611 // so we don't actually use the app_state for anything. So a default one
1612 // is ok.
1613 app_state aaa;
1614 for (int i = 0; i < 10; ++i)
1615 {
1616 // now spin around sending and receiving this a few times
1617 ostringstream oss;
1618 packet_writer pw(oss);
1619 istringstream iss(tmp);
1620 read_packets(iss, pw, aaa);
1621 BOOST_CHECK(oss.str() == tmp);
1622 tmp = oss.str();
1623 }
1624}
1625
1626void
1627add_packet_tests(test_suite * suite)
1628{
1629 I(suite);
1630 suite->add(BOOST_TEST_CASE(&packet_roundabout_test));
1631}
1632
1633#endif // BUILD_UNIT_TESTS

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status