Featured image of post etcd: Build Robust Server Apps in Go with Distributed Consensus

etcd: Build Robust Server Apps in Go with Distributed Consensus

Highly-available key value store for shared configuration and service discovery.

etcd: A Comprehensive Guide for Go Developers

Introduction

etcd is a highly-available, distributed key-value store designed for shared configuration and service discovery. Developed by the etcd project team at CoreOS (now part of the Cloud Native Computing Foundation), etcd has become a cornerstone of the Kubernetes ecosystem and is widely adopted for managing critical distributed system data. With over 51,480 stars on GitHub, it demonstrates significant community trust and usage. Developers should care about etcd because it solves fundamental challenges in distributed systems: maintaining consistent, reliable, and accessible configuration data across multiple nodes without a single point of failure. Instead of reinventing complex consensus algorithms, etcd leverages the Raft consensus protocol to ensure strong consistency and fault tolerance. This makes it ideal for scenarios like global configuration management, service discovery, leader election, and distributed locks. Real-world use cases include Kubernetes’ cluster state store, Docker Swarm’s service discovery, and numerous cloud-native applications requiring coordinated configuration updates across microservices.

Key Features

etcd offers several powerful features that make it indispensable for distributed systems:

  1. Distributed Key-Value Store: Stores arbitrary data (bytes) associated with string keys. Keys are hierarchical (like /services/web, /services/db), enabling organized data structures.
  2. Raft Consensus Algorithm: Implements the Raft protocol to achieve consensus among a cluster of nodes. This ensures strong consistency, fault tolerance (up to (N-1)/2 node failures), and linearizability for all operations.
  3. Watch Capabilities: Allows clients to subscribe to changes on specific keys or directories. This enables reactive systems (like service discovery) without constant polling, significantly improving efficiency.
  4. Lease Management: Provides leases (time-based tokens) that can be attached to keys. Leases automatically expire if not refreshed, enabling automatic cleanup of stale data (e.g., unused service registrations).
  5. Atomic Compare-and-Swap (CAS) Transactions: Supports atomic operations on multiple keys. This is crucial for complex distributed coordination tasks like safe distributed locking or atomic configuration updates.
  6. Compact and Delete: Allows clients to compact the revision history (discarding older versions) or delete keys, managing storage efficiently.
  7. Secure Communication: Supports Transport Layer Security (TLS) for encrypted client-server and inter-node communication.
  8. API Flexibility: Offers a gRPC-based API and a simpler HTTP/JSON API, providing flexibility for different client needs.

These features collectively enable etcd to act as a reliable, coordinated brain for distributed applications, handling critical state management tasks that are otherwise difficult and error-prone to implement from scratch.

Installation and Setup

Installing etcd is straightforward using Go’s module system:

  1. Install etcd Server:

    1go install go.etcd.io/[email protected]
    

    Replace v3.5.0 with your desired version (check the GitHub releases). This installs the etcd binary into your $GOPATH/bin or $GOBIN.

  2. Start a Single-Node Cluster (for testing):

    1etcd --data-dir=./etcd-data --listen-client-urls=http://127.0.0.1:2379 --advertise-client-urls=http://127.0.0.1:2379
    
    • --data-dir: Directory to store the key-value store data.
    • --listen-client-urls: HTTP endpoint clients use to connect.
    • --advertise-client-urls: URL clients should use to connect (usually the same as --listen-client-urls).
  3. Verify Installation:

    1etcdctl version
    

    This should output the installed version. For example:

    1etcdctl version: 3.5.0
    2client API version: 3.5.0
    3cluster version: 3.5.0
    

Basic Usage

Here’s a minimal “Hello World” example demonstrating fundamental etcd operations:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"go.etcd.io/etcd/client/v3"
 7)
 8
 9func main() {
10	// Connect to etcd cluster (replace 127.0.0.1:2379 with your cluster endpoints)
11	client, err := client.New(client.Config{
12		Endpoints: []string{"http://127.0.0.1:2379"},
13	})
14	if err != nil {
15		panic(err)
16	}
17	defer client.Close()
18
19	// Create a context with a 5-second timeout
20	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
21	defer cancel()
22
23	// Set a key (key = "hello", value = "World")
24	_, err = client.Put(ctx, "hello", "World")
25	if err != nil {
26		panic(err)
27	}
28	fmt.Println("Set 'hello' to 'World'")
29
30	// Get the key
31	resp, err := client.Get(ctx, "hello")
32	if err != nil {
33		panic(err)
34	}
35	fmt.Println("Retrieved:", resp.Kvs[0].Value) // Output: Retrieved: World
36}

Expected Output:

1Set 'hello' to 'World'
2Retrieved: World

This example shows connecting to etcd, setting a key-value pair, and retrieving it. The context manages timeouts, and the client handles the underlying gRPC communication.

Real-World Examples

Example 1: Distributed Configuration Service (Web Server)

This example demonstrates a simple HTTP server that reads configuration from etcd and provides endpoints to update it.

File Structure:

1config-service/
2├── main.go
3└── config.go

config.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"net/http"
 7	"time"
 8
 9	"go.etcd.io/etcd/client/v3"
10)
11
12// Config represents application configuration
13type Config struct {
14	AppName string
15	Port    int
16	Timeout time.Duration
17}
18
19// LoadConfig loads configuration from etcd
20func LoadConfig(client *client.Client, key string) (*Config, error) {
21	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
22	defer cancel()
23
24	resp, err := client.Get(ctx, key)
25	if err != nil {
26		return nil, err
27	}
28
29	var cfg Config
30	for _, kv := range resp.Kvs {
31		switch string(kv.Key) {
32		case "/config/app_name":
33			cfg.AppName = string(kv.Value)
34		case "/config/port":
35			cfg.Port = int(kv.Value[0])
36		case "/config/timeout":
37			cfg.Timeout = time.Duration(kv.Value[0]) * time.Second
38		}
39	}
40
41	return &cfg, nil
42}

main.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"net/http"
 7	"time"
 8
 9	"go.etcd.io/etcd/client/v3"
10	"github.com/gorilla/mux"
11)
12
13var etcdClient *client.Client
14
15func main() {
16	// Initialize etcd client (replace endpoints)
17	var err error
18	etcdClient, err = client.New(client.Config{
19		Endpoints: []string{"http://127.0.0.1:2379"},
20	})
21	if err != nil {
22		panic(err)
23	}
24	defer etcdClient.Close()
25
26	r := mux.NewRouter()
27
28	// Route to get configuration
29	r.HandleFunc("/config", getConfigHandler).Methods("GET")
30
31	// Start HTTP server
32	fmt.Println("Server listening on :8080")
33	http.ListenAndServe(":8080", r)
34}
35
36func getConfigHandler(w http.ResponseWriter, r *http.Request) {
37	cfg, err := LoadConfig(etcdClient, "/config")
38	if err != nil {
39		http.Error(w, err.Error(), http.StatusInternalServerError)
40		return
41	}
42
43	w.Header().Set("Content-Type", "application/json")
44	fmt.Fprintf(w, `{"app_name":"%s","port":%d,"timeout":%d}`, cfg.AppName, cfg.Port, cfg.Timeout.Seconds())
45}

Expected Output (GET /config):

1{"app_name":"MyApp","port":8080,"timeout":30}

This server provides a /config endpoint that fetches the application’s configuration from etcd, demonstrating etcd as a central configuration source.

Example 2: Service Discovery with Health Checks

This example shows how etcd can be used for service discovery where services register and deregister themselves, and clients discover available services.

File Structure:

1service-discovery/
2├── main.go
3└── service.go

service.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"net/http"
 7	"time"
 8
 9	"go.etcd.io/etcd/client/v3"
10	"go.etcd.io/etcd/client/v3/concurrency"
11)
12
13// Service represents a registered service
14type Service struct {
15	Name    string
16	Address string
17	Port    int
18}
19
20// ServiceRegistry handles service registration and discovery
21type ServiceRegistry struct {
22	registryPath string
23	client       *client.Client
24}
25
26func NewServiceRegistry(client *client.Client, path string) *ServiceRegistry {
27	return &ServiceRegistry{
28		registryPath: path,
29		client:       client,
30	}
31}
32
33// RegisterService registers a service with a lease (auto-expires if service dies)
34func (r *ServiceRegistry) RegisterService(service *Service) error {
35	// Create a lease for 10 seconds
36	leaseResp, err := r.client.Grant(context.Background(), 10)
37	if err != nil {
38		return err
39	}
40
41	// Create a key with the service details and attach the lease
42	_, err = r.client.Put(context.Background(), fmt.Sprintf("%s/%s:%s:%d", r.registryPath, service.Name, service.Address, service.Port), "", client.WithLease(leaseResp.ID))
43	if err != nil {
44		return err
45	}
46
47	return nil
48}
49
50// DiscoverServices finds all registered services of a specific type
51func (r *ServiceRegistry) DiscoverServices(serviceType string) ([]*Service, error) {
52	// Watch the service directory for changes
53	watcher := r.client.Watcher(r.registryPath, client.WithPrefix())
54	ctx, cancel := context.WithCancel(context.Background())
55	defer cancel()
56
57	var services []*Service
58	go func() {
59		for {
60			select {
61			case resp := <-watcher.C:
62				for _, event := range resp.Events {
63					if event.Type == client.ActionPut || event.Type == client.ActionDelete {
64						serviceKey := string(event.Kv.Key)
65						if !strings.HasPrefix(serviceKey, r.registryPath) {
66							continue
67						}
68						serviceName := strings.TrimPrefix(serviceKey, r.registryPath+"/")
69						if !strings.HasPrefix(serviceName, serviceType+":") {
70							continue
71						}
72						serviceName = strings.TrimPrefix(serviceName, serviceType+":")
73						addressPort := strings.Split(serviceName, ":")
74						if len(addressPort) != 2 {
75							continue
76						}
77						services = append(services, &Service{
78							Name:    serviceType,
79							Address: addressPort[0],
80							Port:    atoi(addressPort[1]),
81						})
82					}
83				}
84			}
85		}
86	}()
87
88	// Return a snapshot of services
89	return services, nil
90}
91
92func atoi(s string) int {
93	i, _ := strconv.Atoi(s)
94	return i
95}

main.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"net/http"
 7	"time"
 8
 9	"go.etcd.io/etcd/client/v3"
10	"github.com/gorilla/mux"
11)
12
13var etcdClient *client.Client
14
15func main() {
16	// Initialize etcd client (replace endpoints)
17	var err error
18	etcdClient, err = client.New(client.Config{
19		Endpoints: []string{"http://127.0.0.1:2379"},
20	})
21	if err != nil {
22		panic(err)
23	}
24	defer etcdClient.Close()
25
26	r := mux.NewRouter()
27
28	// Route to register a service
29	r.HandleFunc("/register/{serviceType}/{address}/{port}", registerServiceHandler).Methods("POST")
30	// Route to discover services
31	r.HandleFunc("/discover/{serviceType}", discoverServicesHandler).Methods("GET")
32
33	// Start HTTP server
34	fmt.Println("Service Discovery server listening on :8080")
35	http.ListenAndServe(":8080", r)
36}
37
38func registerServiceHandler(w http.ResponseWriter, r *http.Request) {
39	vars := mux.Vars(r)
40	serviceType := vars["serviceType"]
41	address := vars["address"]
42	portStr := vars["port"]
43
44	if serviceType == "" || address == "" || portStr == "" {
45		http.Error(w, "Missing required parameters (serviceType, address, port)", http.StatusBadRequest)
46		return
47	}
48
49	port, err := strconv.Atoi(portStr)
50	if err != nil {
51		http.Error(w, "Invalid port number", http.StatusBadRequest)
52		return
53	}
54
55	// Create service
56	service := &Service{
57		Name:    serviceType,
58		Address: address,
59		Port:    port,
60	}
61
62	// Register service in etcd
63	if err := serviceRegistry.RegisterService(service); err != nil {
64		http.Error(w, err.Error(), http.StatusInternalServerError)
65		return
66	}
67
68	w.WriteHeader(http.StatusCreated)
69	fmt.Fprintf(w, "Service %s registered at %s:%d", serviceType, address, port)
70}
71
72func discoverServicesHandler(w http.ResponseWriter, r *http.Request) {
73	vars := mux.Vars(r)
74	serviceType := vars["serviceType"]
75
76	if serviceType == "" {
77		http.Error(w, "Missing required parameter (serviceType)", http.StatusBadRequest)
78		return
79	}
80
81	// Discover services
82	services, err := serviceRegistry.DiscoverServices(serviceType)
83	if err != nil {
84		http.Error(w, err.Error(), http.StatusInternalServerError)
85		return
86	}
87
88	w.Header().Set("Content-Type", "application/json")
89	fmt.Fprintf(w, `[%s]`, strings.Join(servicesListToJSON(services), ","))
90}
91
92func servicesListToJSON(services []*Service) []string {
93	var jsonList []string
94	for _, s := range services {
95		jsonList = append(jsonList, fmt.Sprintf(`{"name":"%s","address":"%s","port":%d}`, s.Name, s.Address, s.Port))
96	}
97	return jsonList
98}

Expected Output (POST /register/…):

1Service web registered at 127.0.0.1:8080

Expected Output (GET /discover/web):

1[{"name":"web","address":"127.0.0.1","port":8080}]

This example demonstrates etcd’s use for dynamic service discovery, leveraging leases for automatic cleanup and watches for real-time updates.

Example 3: Distributed Lock for Coordinated Updates

This example shows how etcd can be used to implement a distributed lock, ensuring only one instance of a process can hold the lock at a time.

File Structure:

1distributed-lock/
2├── main.go
3└── lock.go

lock.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"time"
 7
 8	"go.etcd.io/etcd/client/v3"
 9)
10
11// DistributedLock provides a mutex-like lock using etcd
12type DistributedLock struct {
13	client *client.Client
14	path   string
15}
16
17// NewDistributedLock creates a new lock
18func NewDistributedLock(client *client.Client, path string) *DistributedLock {
19	return &DistributedLock{
20		client: client,
21		path:   path,
22	}
23}
24
25// Lock acquires the lock
26func (l *DistributedLock) Lock(ctx context.Context, ttl int) (*client.Lease, error) {
27	// Create a lease with the TTL
28	lease, err := l.client.Grant(ctx, int64(ttl))
29	if err != nil {
30		return nil, err
31	}
32
33	// Create a key with the lease and set it as the lock
34	_, err = l.client.Put(ctx, l.path, "", client.WithLease(lease.ID))
35	if err != nil {
36		return nil, err
37	}
38
39	return lease, nil
40}
41
42// Unlock releases the lock
43func (l *DistributedLock) Unlock(ctx context.Context, leaseID uint64) error {
44	// Delete the lock key
45	_, err := l.client.Delete(ctx, l.path)
46	if err != nil {
47		return err
48	}
49
50	// Revoke the lease
51	err = l.client.Revoke(ctx, leaseID)
52	return err
53}

main.go:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"net/http"
 7	"time"
 8
 9	"go.etcd.io/etcd
10
11---
12
13*Photo by [Steve Johnson](https://unsplash.com/@steve_j) on [Unsplash](https://unsplash.com)*
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy