puller.go 16.2 KB
Newer Older
Jakob Borg's avatar
Jakob Borg committed
1 2 3
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
Jakob Borg's avatar
Jakob Borg committed
4

5
package model
6 7 8 9 10 11

import (
	"bytes"
	"errors"
	"os"
	"path/filepath"
12
	"runtime"
13
	"time"
14

15
	"github.com/calmh/syncthing/config"
16
	"github.com/calmh/syncthing/osutil"
17 18
	"github.com/calmh/syncthing/protocol"
	"github.com/calmh/syncthing/scanner"
19
	"github.com/calmh/syncthing/versioner"
20 21 22
)

type requestResult struct {
23
	node     protocol.NodeID
Jakob Borg's avatar
Jakob Borg committed
24
	file     protocol.FileInfo
25 26 27 28 29 30 31 32 33
	filepath string // full filepath name
	offset   int64
	data     []byte
	err      error
}

type openFile struct {
	filepath     string // full filepath name
	temp         string // temporary filename
Jakob Borg's avatar
Jakob Borg committed
34
	availability []protocol.NodeID
35 36 37 38 39 40
	file         *os.File
	err          error // error when opening or writing to file, all following operations are cancelled
	outstanding  int   // number of requests we still have outstanding
	done         bool  // we have sent all requests for this file
}

41
type activityMap map[protocol.NodeID]int
42

Jakob Borg's avatar
Jakob Borg committed
43
func (m activityMap) leastBusyNode(availability []protocol.NodeID) protocol.NodeID {
44
	var low int = 2<<30 - 1
45
	var selected protocol.NodeID
Jakob Borg's avatar
Jakob Borg committed
46
	for _, node := range availability {
47
		usage := m[node]
Jakob Borg's avatar
Jakob Borg committed
48 49 50
		if usage < low {
			low = usage
			selected = node
51 52 53 54 55 56
		}
	}
	m[selected]++
	return selected
}

57
func (m activityMap) decrease(node protocol.NodeID) {
58 59 60 61 62 63
	m[node]--
}

var errNoNode = errors.New("no available source node")

type puller struct {
64
	cfg               *config.Configuration
Jakob Borg's avatar
Jakob Borg committed
65
	repoCfg           config.RepositoryConfiguration
66 67 68 69 70 71 72
	bq                *blockQueue
	model             *Model
	oustandingPerNode activityMap
	openFiles         map[string]openFile
	requestSlots      chan bool
	blocks            chan bqBlock
	requestResults    chan requestResult
73
	versioner         versioner.Versioner
74 75
}

Jakob Borg's avatar
Jakob Borg committed
76
func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
77
	p := &puller{
Jakob Borg's avatar
Jakob Borg committed
78
		repoCfg:           repoCfg,
79
		cfg:               cfg,
80 81 82 83 84 85 86 87 88
		bq:                newBlockQueue(),
		model:             model,
		oustandingPerNode: make(activityMap),
		openFiles:         make(map[string]openFile),
		requestSlots:      make(chan bool, slots),
		blocks:            make(chan bqBlock),
		requestResults:    make(chan requestResult),
	}

89 90 91 92 93 94 95 96
	if len(repoCfg.Versioning.Type) > 0 {
		factory, ok := versioner.Factories[repoCfg.Versioning.Type]
		if !ok {
			l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
		}
		p.versioner = factory(repoCfg.Versioning.Params)
	}

97 98 99 100 101
	if slots > 0 {
		// Read/write
		for i := 0; i < slots; i++ {
			p.requestSlots <- true
		}
102
		if debug {
Jakob Borg's avatar
Jakob Borg committed
103
			l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
104 105 106 107
		}
		go p.run()
	} else {
		// Read only
108
		if debug {
Jakob Borg's avatar
Jakob Borg committed
109
			l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
110 111 112 113 114 115 116 117 118 119 120 121
		}
		go p.runRO()
	}
	return p
}

func (p *puller) run() {
	go func() {
		// fill blocks queue when there are free slots
		for {
			<-p.requestSlots
			b := p.bq.get()
122
			if debug {
Jakob Borg's avatar
Jakob Borg committed
123
				l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
124 125 126 127 128
			}
			p.blocks <- b
		}
	}()

129
	walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
130
	timeout := time.Tick(5 * time.Second)
131
	changed := true
132
	var prevVer uint64
133 134 135 136 137 138 139

	for {
		// Run the pulling loop as long as there are blocks to fetch
	pull:
		for {
			select {
			case res := <-p.requestResults:
Jakob Borg's avatar
Jakob Borg committed
140
				p.model.setState(p.repoCfg.ID, RepoSyncing)
141
				changed = true
142 143 144 145
				p.requestSlots <- true
				p.handleRequestResult(res)

			case b := <-p.blocks:
Jakob Borg's avatar
Jakob Borg committed
146
				p.model.setState(p.repoCfg.ID, RepoSyncing)
147
				changed = true
Jakob Borg's avatar
Jakob Borg committed
148 149 150 151
				if p.handleBlock(b) {
					// Block was fully handled, free up the slot
					p.requestSlots <- true
				}
152 153 154 155 156 157

			case <-timeout:
				if len(p.openFiles) == 0 && p.bq.empty() {
					// Nothing more to do for the moment
					break pull
				}
158
				if debug {
Jakob Borg's avatar
Jakob Borg committed
159
					l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
160 161
					i := 5
					for _, f := range p.openFiles {
162
						l.Debugf("  %v", f)
163 164 165 166 167 168 169 170 171
						i--
						if i == 0 {
							break
						}
					}
				}
			}
		}

172
		if changed {
Jakob Borg's avatar
Jakob Borg committed
173
			p.model.setState(p.repoCfg.ID, RepoCleaning)
174 175 176 177
			p.fixupDirectories()
			changed = false
		}

Jakob Borg's avatar
Jakob Borg committed
178
		p.model.setState(p.repoCfg.ID, RepoIdle)
179

180 181 182
		// Do a rescan if it's time for it
		select {
		case <-walkTicker:
183
			if debug {
Jakob Borg's avatar
Jakob Borg committed
184
				l.Debugf("%q: time for rescan", p.repoCfg.ID)
185
			}
Jakob Borg's avatar
Jakob Borg committed
186
			err := p.model.ScanRepo(p.repoCfg.ID)
187
			if err != nil {
Jakob Borg's avatar
Jakob Borg committed
188
				invalidateRepo(p.cfg, p.repoCfg.ID, err)
189 190
				return
			}
191 192 193 194

		default:
		}

195 196 197 198 199
		if v := p.model.Version(p.repoCfg.ID); v > prevVer {
			// Queue more blocks to fetch, if any
			p.queueNeededBlocks()
			prevVer = v
		}
200 201 202 203
	}
}

func (p *puller) runRO() {
204
	walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
205 206

	for _ = range walkTicker {
207
		if debug {
Jakob Borg's avatar
Jakob Borg committed
208
			l.Debugf("%q: time for rescan", p.repoCfg.ID)
209
		}
Jakob Borg's avatar
Jakob Borg committed
210
		err := p.model.ScanRepo(p.repoCfg.ID)
211
		if err != nil {
Jakob Borg's avatar
Jakob Borg committed
212
			invalidateRepo(p.cfg, p.repoCfg.ID, err)
213 214
			return
		}
215 216 217
	}
}

218 219
func (p *puller) fixupDirectories() {
	var deleteDirs []string
220 221 222
	var changed = 0

	var walkFn = func(path string, info os.FileInfo, err error) error {
223 224 225 226
		if err != nil {
			return err
		}

227 228 229 230
		if !info.IsDir() {
			return nil
		}

Jakob Borg's avatar
Jakob Borg committed
231
		rn, err := filepath.Rel(p.repoCfg.Directory, path)
232 233 234 235 236 237 238 239
		if err != nil {
			return nil
		}

		if rn == "." {
			return nil
		}

240
		if filepath.Base(rn) == ".stversions" {
Jakob Borg's avatar
Jakob Borg committed
241
			return filepath.SkipDir
242 243
		}

Jakob Borg's avatar
Jakob Borg committed
244
		cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
245 246
		if cur.Name != rn {
			// No matching dir in current list; weird
247 248 249
			if debug {
				l.Debugf("missing dir: %s; %v", rn, cur)
			}
250 251 252
			return nil
		}

253
		if protocol.IsDeleted(cur.Flags) {
254
			if debug {
255
				l.Debugf("queue delete dir: %v", cur)
256 257 258 259 260 261 262 263 264 265
			}

			// We queue the directories to delete since we walk the
			// tree in depth first order and need to remove the
			// directories in the opposite order.

			deleteDirs = append(deleteDirs, path)
			return nil
		}

266
		if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
267 268
			err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
			if err != nil {
269
				l.Warnf("Restoring folder flags: %q: %v", path, err)
270 271 272 273 274
			} else {
				changed++
				if debug {
					l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
				}
275 276 277 278 279
			}
		}

		if cur.Modified != info.ModTime().Unix() {
			t := time.Unix(cur.Modified, 0)
280 281
			err := os.Chtimes(path, t, t)
			if err != nil {
282 283 284 285
				if runtime.GOOS != "windows" {
					// https://code.google.com/p/go/issues/detail?id=8090
					l.Warnf("Restoring folder modtime: %q: %v", path, err)
				}
286 287 288 289 290
			} else {
				changed++
				if debug {
					l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
				}
291 292 293 294
			}
		}

		return nil
295 296 297 298 299
	}

	for {
		deleteDirs = nil
		changed = 0
Jakob Borg's avatar
Jakob Borg committed
300
		filepath.Walk(p.repoCfg.Directory, walkFn)
301 302 303 304 305 306 307 308 309

		var deleted = 0
		// Delete any queued directories
		for i := len(deleteDirs) - 1; i >= 0; i-- {
			dir := deleteDirs[i]
			if debug {
				l.Debugln("delete dir:", dir)
			}
			err := os.Remove(dir)
310
			if err == nil {
311
				deleted++
312 313
			} else {
				l.Warnln("Delete dir:", err)
314 315
			}
		}
316

317
		if debug {
318
			l.Debugf("changed %d, deleted %d dirs", changed, deleted)
319
		}
320 321 322

		if changed+deleted == 0 {
			return
323 324 325 326
		}
	}
}

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (p *puller) handleRequestResult(res requestResult) {
	p.oustandingPerNode.decrease(res.node)
	f := res.file

	of, ok := p.openFiles[f.Name]
	if !ok || of.err != nil {
		// no entry in openFiles means there was an error and we've cancelled the operation
		return
	}

	_, of.err = of.file.WriteAt(res.data, res.offset)

	of.outstanding--
	p.openFiles[f.Name] = of

342
	if debug {
Jakob Borg's avatar
Jakob Borg committed
343
		l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
344 345 346
	}

	if of.done && of.outstanding == 0 {
Jakob Borg's avatar
Jakob Borg committed
347
		p.closeFile(f)
348 349 350
	}
}

Jakob Borg's avatar
Jakob Borg committed
351 352 353 354
// handleBlock fulfills the block request by copying, ignoring or fetching
// from the network. Returns true if the block was fully handled
// synchronously, i.e. if the slot can be reused.
func (p *puller) handleBlock(b bqBlock) bool {
355 356
	f := b.file

357 358
	// For directories, making sure they exist is enough.
	// Deleted directories we mark as handled and delete later.
359 360
	if protocol.IsDirectory(f.Flags) {
		if !protocol.IsDeleted(f.Flags) {
Jakob Borg's avatar
Jakob Borg committed
361
			path := filepath.Join(p.repoCfg.Directory, f.Name)
362 363 364 365 366 367 368 369 370 371 372 373
			_, err := os.Stat(path)
			if err != nil && os.IsNotExist(err) {
				if debug {
					l.Debugf("create dir: %v", f)
				}
				err = os.MkdirAll(path, 0777)
				if err != nil {
					l.Warnf("Create folder: %q: %v", path, err)
				}
			}
		} else if debug {
			l.Debugf("ignore delete dir: %v", f)
374
		}
Jakob Borg's avatar
Jakob Borg committed
375
		p.model.updateLocal(p.repoCfg.ID, f)
Jakob Borg's avatar
Jakob Borg committed
376
		return true
377 378
	}

379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
	if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
		// We are supposed to copy the entire file, and then fetch nothing.
		// We don't actually need to make the copy.
		if debug {
			l.Debugln("taking shortcut:", f)
		}
		fp := filepath.Join(p.repoCfg.Directory, f.Name)
		t := time.Unix(f.Modified, 0)
		err := os.Chtimes(fp, t, t)
		if debug && err != nil {
			l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
		}
		if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
			err = os.Chmod(fp, os.FileMode(f.Flags&0777))
			if debug && err != nil {
				l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
			}
		}

		p.model.updateLocal(p.repoCfg.ID, f)
		return true
	}

402 403 404 405
	of, ok := p.openFiles[f.Name]
	of.done = b.last

	if !ok {
406
		if debug {
Jakob Borg's avatar
Jakob Borg committed
407
			l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
408 409
		}

Jakob Borg's avatar
Jakob Borg committed
410
		of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
Jakob Borg's avatar
Jakob Borg committed
411 412
		of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
		of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
413 414 415 416 417 418 419

		dirName := filepath.Dir(of.filepath)
		_, err := os.Stat(dirName)
		if err != nil {
			err = os.MkdirAll(dirName, 0777)
		}
		if err != nil {
Jakob Borg's avatar
Jakob Borg committed
420
			l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
421 422 423 424
		}

		of.file, of.err = os.Create(of.temp)
		if of.err != nil {
425
			if debug {
Jakob Borg's avatar
Jakob Borg committed
426
				l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
427 428 429 430
			}
			if !b.last {
				p.openFiles[f.Name] = of
			}
Jakob Borg's avatar
Jakob Borg committed
431
			return true
432
		}
433
		osutil.HideFile(of.temp)
434 435 436 437
	}

	if of.err != nil {
		// We have already failed this file.
438
		if debug {
Jakob Borg's avatar
Jakob Borg committed
439
			l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
440 441 442 443 444
		}
		if b.last {
			delete(p.openFiles, f.Name)
		}

Jakob Borg's avatar
Jakob Borg committed
445
		return true
446 447 448 449 450 451 452
	}

	p.openFiles[f.Name] = of

	switch {
	case len(b.copy) > 0:
		p.handleCopyBlock(b)
Jakob Borg's avatar
Jakob Borg committed
453
		return true
454 455

	case b.block.Size > 0:
Jakob Borg's avatar
Jakob Borg committed
456
		return p.handleRequestBlock(b)
457 458 459

	default:
		p.handleEmptyBlock(b)
Jakob Borg's avatar
Jakob Borg committed
460
		return true
461 462 463 464 465 466 467 468
	}
}

func (p *puller) handleCopyBlock(b bqBlock) {
	// We have blocks to copy from the existing file
	f := b.file
	of := p.openFiles[f.Name]

469
	if debug {
Jakob Borg's avatar
Jakob Borg committed
470
		l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
471 472 473 474 475
	}

	var exfd *os.File
	exfd, of.err = os.Open(of.filepath)
	if of.err != nil {
476
		if debug {
Jakob Borg's avatar
Jakob Borg committed
477
			l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
478 479 480 481 482 483 484 485 486 487
		}
		of.file.Close()
		of.file = nil

		p.openFiles[f.Name] = of
		return
	}
	defer exfd.Close()

	for _, b := range b.copy {
Jakob Borg's avatar
Jakob Borg committed
488
		bs := make([]byte, b.Size)
489 490 491 492 493
		_, of.err = exfd.ReadAt(bs, b.Offset)
		if of.err == nil {
			_, of.err = of.file.WriteAt(bs, b.Offset)
		}
		if of.err != nil {
494
			if debug {
Jakob Borg's avatar
Jakob Borg committed
495
				l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
496 497 498 499 500 501 502 503 504 505 506
			}
			exfd.Close()
			of.file.Close()
			of.file = nil

			p.openFiles[f.Name] = of
			return
		}
	}
}

Jakob Borg's avatar
Jakob Borg committed
507 508 509 510
// handleRequestBlock tries to pull a block from the network. Returns true if
// the block could _not_ be fetched (i.e. it was fully handled, matching the
// return criteria of handleBlock)
func (p *puller) handleRequestBlock(b bqBlock) bool {
511
	f := b.file
Jakob Borg's avatar
Jakob Borg committed
512 513 514 515
	of, ok := p.openFiles[f.Name]
	if !ok {
		panic("bug: request for non-open file")
	}
516

Jakob Borg's avatar
Jakob Borg committed
517
	node := p.oustandingPerNode.leastBusyNode(of.availability)
518 519 520 521 522 523 524 525 526 527 528 529
	if len(node) == 0 {
		of.err = errNoNode
		if of.file != nil {
			of.file.Close()
			of.file = nil
			os.Remove(of.temp)
		}
		if b.last {
			delete(p.openFiles, f.Name)
		} else {
			p.openFiles[f.Name] = of
		}
Jakob Borg's avatar
Jakob Borg committed
530
		return true
531 532 533 534 535
	}

	of.outstanding++
	p.openFiles[f.Name] = of

536
	go func(node protocol.NodeID, b bqBlock) {
537
		if debug {
Jakob Borg's avatar
Jakob Borg committed
538
			l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
539 540
		}

Jakob Borg's avatar
Jakob Borg committed
541
		bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
542 543 544 545 546 547 548 549 550
		p.requestResults <- requestResult{
			node:     node,
			file:     f,
			filepath: of.filepath,
			offset:   b.block.Offset,
			data:     bs,
			err:      err,
		}
	}(node, b)
Jakob Borg's avatar
Jakob Borg committed
551 552

	return false
553 554 555 556 557 558 559 560 561 562 563 564
}

func (p *puller) handleEmptyBlock(b bqBlock) {
	f := b.file
	of := p.openFiles[f.Name]

	if b.last {
		if of.err == nil {
			of.file.Close()
		}
	}

565
	if protocol.IsDeleted(f.Flags) {
566
		if debug {
567
			l.Debugf("pull: delete %q", f.Name)
568 569
		}
		os.Remove(of.temp)
570
		os.Chmod(of.filepath, 0666)
571
		if p.versioner != nil {
572 573 574 575
			if debug {
				l.Debugln("pull: deleting with versioner")
			}
			if err := p.versioner.Archive(p.repoCfg.Directory, of.filepath); err == nil {
576
				p.model.updateLocal(p.repoCfg.ID, f)
577 578
			} else if debug {
				l.Debugln("pull: error:", err)
579 580
			}
		} else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
Jakob Borg's avatar
Jakob Borg committed
581
			p.model.updateLocal(p.repoCfg.ID, f)
582
		}
583
	} else {
584
		if debug {
Jakob Borg's avatar
Jakob Borg committed
585
			l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
586 587
		}
		t := time.Unix(f.Modified, 0)
588 589 590 591
		if os.Chtimes(of.temp, t, t) != nil {
			delete(p.openFiles, f.Name)
			return
		}
Jakob Borg's avatar
Jakob Borg committed
592
		if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
593 594 595
			delete(p.openFiles, f.Name)
			return
		}
596 597
		osutil.ShowFile(of.temp)
		if osutil.Rename(of.temp, of.filepath) == nil {
Jakob Borg's avatar
Jakob Borg committed
598
			p.model.updateLocal(p.repoCfg.ID, f)
599
		}
600 601 602 603 604 605
	}
	delete(p.openFiles, f.Name)
}

func (p *puller) queueNeededBlocks() {
	queued := 0
Jakob Borg's avatar
Jakob Borg committed
606 607
	for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
		lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
608
		have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
609
		if debug {
610
			l.Debugf("need:\n  local: %v\n  global: %v\n  haveBlocks: %v\n  needBlocks: %v", lf, f, have, need)
611 612 613 614 615 616 617 618
		}
		queued++
		p.bq.put(bqAdd{
			file: f,
			have: have,
			need: need,
		})
	}
619
	if debug && queued > 0 {
Jakob Borg's avatar
Jakob Borg committed
620
		l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
621 622
	}
}
Jakob Borg's avatar
Jakob Borg committed
623

Jakob Borg's avatar
Jakob Borg committed
624
func (p *puller) closeFile(f protocol.FileInfo) {
625
	if debug {
Jakob Borg's avatar
Jakob Borg committed
626
		l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
Jakob Borg's avatar
Jakob Borg committed
627 628 629 630 631 632 633 634 635 636
	}

	of := p.openFiles[f.Name]
	of.file.Close()
	defer os.Remove(of.temp)

	delete(p.openFiles, f.Name)

	fd, err := os.Open(of.temp)
	if err != nil {
637
		if debug {
Jakob Borg's avatar
Jakob Borg committed
638
			l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
Jakob Borg's avatar
Jakob Borg committed
639 640 641
		}
		return
	}
642
	hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
Jakob Borg's avatar
Jakob Borg committed
643 644 645
	fd.Close()

	if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
646
		if debug {
Jakob Borg's avatar
Jakob Borg committed
647
			l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
Jakob Borg's avatar
Jakob Borg committed
648 649 650 651 652 653
		}
		return
	}

	for i := range hb {
		if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
Jakob Borg's avatar
Jakob Borg committed
654
			l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
Jakob Borg's avatar
Jakob Borg committed
655 656 657 658 659
			return
		}
	}

	t := time.Unix(f.Modified, 0)
Jakob Borg's avatar
Jakob Borg committed
660 661 662 663 664 665 666 667 668
	err = os.Chtimes(of.temp, t, t)
	if debug && err != nil {
		l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
	}
	if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
		err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
		if debug && err != nil {
			l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
		}
669
	}
670 671 672 673

	osutil.ShowFile(of.temp)

	if p.versioner != nil {
674
		err := p.versioner.Archive(p.repoCfg.Directory, of.filepath)
675 676 677 678 679 680 681 682
		if err != nil {
			if debug {
				l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
			}
			return
		}
	}

683
	if debug {
Jakob Borg's avatar
Jakob Borg committed
684
		l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
Jakob Borg's avatar
Jakob Borg committed
685
	}
686
	if err := osutil.Rename(of.temp, of.filepath); err == nil {
Jakob Borg's avatar
Jakob Borg committed
687
		p.model.updateLocal(p.repoCfg.ID, f)
Jakob Borg's avatar
Jakob Borg committed
688
	} else {
Jakob Borg's avatar
Jakob Borg committed
689
		l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
Jakob Borg's avatar
Jakob Borg committed
690 691
	}
}
692

693
func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
694 695 696 697 698 699 700 701
	for i := range cfg.Repositories {
		repo := &cfg.Repositories[i]
		if repo.ID == repoID {
			repo.Invalid = err.Error()
			return
		}
	}
}