No description
Find a file
2026-02-26 06:08:53 +01:00
cmd/livenstore-cli implement ReadStream on event store 2026-02-26 06:08:53 +01:00
internal/idgen refactor 2026-02-22 18:46:08 +01:00
.gitignore json event store implementation 2026-02-22 18:35:23 +01:00
aggregate_repository.go implement ReadStream on event store 2026-02-26 06:08:53 +01:00
aggregate_root.go Enforce ID on aggregate 2026-02-25 06:19:19 +01:00
aggregate_root_test.go Enforce ID on aggregate 2026-02-25 06:19:19 +01:00
event.go FromEvent + documentation 2026-02-24 06:26:57 +01:00
event_test.go FromEvent + documentation 2026-02-24 06:26:57 +01:00
go.mod refactor 2026-02-22 18:46:08 +01:00
go.sum implement events 2026-02-21 18:10:20 +01:00
json_store.go implement ReadStream on event store 2026-02-26 06:08:53 +01:00
json_store_test.go implement ReadStream on event store 2026-02-26 06:08:53 +01:00
livenstore-cli implement ReadStream on event store 2026-02-26 06:08:53 +01:00
README.md implement ReadStream on event store 2026-02-26 06:08:53 +01:00
store.go implement ReadStream on event store 2026-02-26 06:08:53 +01:00
subscription_manager.go add tests 2026-02-23 06:16:37 +01:00
subscription_manager_test.go add tests 2026-02-23 06:16:37 +01:00

LivenStore

An Event Store implementation in Go with file-based JSON storage, stream linking, and pub/sub capabilities.

Installation

go get forge.evrard.online/livenstore

Usage

As a Library

package main

import (
    "fmt"
    "forge.evrard.online/livenstore"
)

func main() {
    // Create a subscription manager for pub/sub
    manager := livenstore.NewInMemorySubscriptionManager()

    // Initialize the store with the subscription manager
    store, err := livenstore.NewJSONEventStore("./data", manager)
    if err != nil {
        panic(err)
    }

    // Subscribe to events
    store.Subscribe([]livenstore.EventType{"UserCreated"}, func(e livenstore.Event) {
        fmt.Printf("Received event: %s\n", e.ID)
    })

    // Create and publish an event (stores and notifies subscribers)
    event := livenstore.NewEvent("UserCreated", []byte(`{"name": "John"}`))
    store.Publish(event)

    // Or use lower-level operations:
    // store.Append(event)           // Store without notifying
    // store.Read(event.ID)          // Read by ID
    // store.LinkToStream(event.ID, "users")  // Link to a stream
}

Building the CLI

go build ./cmd/livenstore-cli

Project Structure

livenstore/
├── event.go                # Event struct and constructor
├── store.go                # EventStore interface
├── json_store.go           # JSON file-based implementation
├── subscription_manager.go # Pub/sub subscription management
├── aggregate_root.go       # Aggregate root for event sourcing
├── internal/               # Internal packages (not importable)
│   └── idgen/              # ULID-based ID generation
└── cmd/                    # Applications
    └── livenstore-cli/

Features

  • Event Storage: Store events as JSON files with ULID-based identifiers
  • Stream Linking: Organize events into streams using filesystem symlinks
  • Pub/Sub: Subscribe to event types and receive notifications when events are published
  • Lexicographic Ordering: ULIDs provide time-ordered, sortable event IDs
  • Domain Events: Transform your business domain structs to/from events automatically
  • Aggregate Root: Apply events to aggregates using convention-based method dispatch

Domain Events

LivenStore allows you to define your own business domain event structs and automatically convert them to and from the internal Event type. This keeps your domain logic clean and decoupled from the event store implementation.

Defining Domain Events

Define your domain events as regular Go structs:

type UserCreated struct {
    UserID   string `json:"user_id"`
    Username string `json:"username"`
    Email    string `json:"email"`
}

type OrderPlaced struct {
    OrderID   string  `json:"order_id"`
    UserID    string  `json:"user_id"`
    Amount    float64 `json:"amount"`
}

Converting Domain Events

Use EventFrom to convert a domain event struct into a storable Event:

// Create a domain event
userCreated := &UserCreated{
    UserID:   "user-123",
    Username: "john_doe",
    Email:    "john@example.com",
}

// Convert to Event (automatically generates ID and serializes payload)
event, err := livenstore.EventFrom(userCreated)
if err != nil {
    panic(err)
}

// Store and publish
store.Publish(*event)

Use FromEvent to convert a stored Event back into your domain struct:

// Read event from store
event, err := store.Read(eventID)
if err != nil {
    panic(err)
}

// Convert back to domain event
var userCreated UserCreated
err = livenstore.FromEvent(*event, &userCreated)
if err != nil {
    panic(err)
}

fmt.Printf("User %s created\n", userCreated.Username)

Type Safety

FromEvent performs type checking to ensure the event type matches the target struct. If you try to deserialize an event into the wrong type, you'll get an error:

event, _ := livenstore.EventFrom(&UserCreated{...})

var order OrderPlaced
err := livenstore.FromEvent(*event, &order)
// Error: Wrong event type. Expected *main.UserCreated, got *main.OrderPlaced

Note: For proper roundtrip conversion, use a pointer when calling EventFrom (e.g., EventFrom(&domainEvent)) since FromEvent requires a pointer to unmarshal into.

Aggregate Root

LivenStore provides a generic AggregateRoot[T] type for implementing the event sourcing pattern. Aggregates can apply events using convention-based method dispatch: when you call Apply(aggregate, event), it automatically invokes the On<EventType> method on the aggregate.

The Aggregate Interface

All aggregates must implement the Aggregate interface, which requires an ID() method:

type Aggregate interface {
    ID() string
}

This ensures that every aggregate has a unique identifier that can be used for event streams and persistence.

Defining an Aggregate

Use the self-referential generic pattern: embed AggregateRoot[*YourType] in your aggregate struct, implement the ID() method, and define handler methods following the On<EventType>(event) error pattern:

// User aggregate uses the self-referential generic pattern.
// The generic parameter *User allows AggregateRoot to call methods on User.
type User struct {
    livenstore.AggregateRoot[*User]
    id       string  // lowercase - accessed via ID() method
    Username string
    Active   bool
}

// ID returns the unique identifier for this aggregate.
// Required by the livenstore.Aggregate interface.
func (u *User) ID() string {
    return u.id
}

// Event types
type UserCreated struct {
    UserID   string `json:"user_id"`
    Username string `json:"username"`
}

type UserRenamed struct {
    NewUsername string `json:"new_username"`
}

type UserDeactivated struct {
    Reason string `json:"reason"`
}

// Event handlers - must follow the pattern On<EventType>(event) error
func (u *User) OnUserCreated(e UserCreated) error {
    u.id = e.UserID  // Set the aggregate ID from the event
    u.Username = e.Username
    u.Active = true
    return nil
}

func (u *User) OnUserRenamed(e UserRenamed) error {
    if e.NewUsername == "" {
        return errors.New("username cannot be empty")
    }
    u.Username = e.NewUsername
    return nil
}

func (u *User) OnUserDeactivated(e UserDeactivated) error {
    u.Active = false
    return nil
}

Applying Events

Use the Apply method to apply events to your aggregate. The method dynamically finds and calls the appropriate On<EventType> handler:

user := &User{}

// Apply UserCreated - calls user.OnUserCreated(event)
err := user.Apply(user, UserCreated{UserID: "user-001", Username: "john_doe"})
if err != nil {
    panic(err)
}
// user.ID() = "user-001", user.Username = "john_doe", user.Active = true

// Apply UserRenamed - calls user.OnUserRenamed(event)
err = user.Apply(user, UserRenamed{NewUsername: "john_smith"})
if err != nil {
    panic(err)
}
// user.Username = "john_smith"

// Apply UserDeactivated - calls user.OnUserDeactivated(event)
err = user.Apply(user, UserDeactivated{Reason: "User requested deletion"})
if err != nil {
    panic(err)
}
// user.Active = false

Replaying Events with OnEvent

The OnEvent method applies an event to the aggregate without tracking it in unsavedEvents. This is useful when replaying events from storage to rebuild aggregate state:

// Load events from store
storedEvents := loadEventsFromStore(userID)

// Replay events to rebuild state (without tracking)
user := &User{}
for _, storedEvent := range storedEvents {
    var domainEvent any
    // Convert stored event back to domain event...
    
    err := user.OnEvent(user, domainEvent)
    if err != nil {
        panic(err)
    }
}
// user state is rebuilt, unsavedEvents is empty

When to use which method:

Method Tracks Events Use Case
Apply Yes New events that need to be persisted
OnEvent No Replaying events from storage

How It Works

  1. Method Resolution: Both Apply and OnEvent extract the event type name (e.g., UserCreated) and look for a method named On<EventType> (e.g., OnUserCreated) on the handler.

  2. Signature Validation: The method must have exactly one parameter matching the event type and return exactly one error value.

  3. Event Tracking: Apply calls OnEvent internally, then tracks the event in unsavedEvents on success. OnEvent does not track events.

  4. Error Propagation: If the handler returns an error, both methods propagate it. Apply does not track events that fail.

Handler Method Requirements

Requirement Example
Method name must be On + event type name OnUserCreated, OnOrderPlaced
Must accept exactly 1 parameter (the event) func (u *User) OnUserCreated(e UserCreated) error
Parameter must match event type (as value) Event UserCreated{} requires param UserCreated
Must return exactly 1 value of type error return nil or return errors.New("...")

Error Handling

The Apply method returns descriptive errors for common issues:

// Missing handler method
err := user.Apply(user, UnknownEvent{})
// Error: method OnUnknownEvent not found on handler *main.User

// Wrong parameter count
// func (u *User) OnBadEvent(a, b int) error
err := user.Apply(user, BadEvent{})
// Error: method OnBadEvent must have exactly 1 parameter, got 2

// Handler returns error
// func (u *User) OnUserRenamed(e UserRenamed) error { return errors.New("invalid") }
err := user.Apply(user, UserRenamed{})
// Error: invalid

Collecting Unsaved Events

After applying events to an aggregate, you can collect them for persistence using CollectUnsavedEvents(). This method returns all successfully applied events and clears the internal tracking slice:

user := &User{}

// Apply multiple events
user.Apply(user, UserCreated{UserID: "user-001", Username: "john_doe"})
user.Apply(user, UserRenamed{NewUsername: "john_smith"})

// Collect events for persistence
events := user.CollectUnsavedEvents()
// events contains [UserCreated{...}, UserRenamed{...}]

// Persist events to store
for _, event := range events {
    storeEvent, _ := livenstore.EventFrom(event)
    store.Publish(*storeEvent)
}

// Subsequent calls return empty slice until new events are applied
events = user.CollectUnsavedEvents()
// events is empty []

Key behaviors:

  • Returns all events that were successfully applied (handlers returned nil)
  • Clears the internal slice after collection (events are only returned once)
  • Failed events (where handler returned an error) are never tracked
  • Always returns a non-nil slice (empty slice []any{} if no events, never nil)

Event Store Implementations

LivenStore defines an EventStore interface that can be implemented by different storage backends:

type EventStore interface {
    Append(Event) error
    Read(string) (*Event, error)
    LinkToStream(string, string) error
    ReadStream(string) ([]Event, error)
}

JSONEventStore

The JSONEventStore is a file-based implementation that stores events as individual JSON files on disk.

Storage Structure

data/
├── events/                          # All events stored here
│   ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json
│   ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9W.json
│   └── ...
└── streams/                         # Stream directories with symlinks
    ├── users/
    │   ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json -> ../../events/01HX...9V.json
    │   └── 01HX7Y8K9M2N3P4Q5R6S7T8U9W.json -> ../../events/01HX...9W.json
    └── orders/
        └── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json -> ../../events/01HX...9V.json

How It Works

  1. Event Storage: Each event is serialized to JSON and written to events/<event-id>.json. The payload is base64-encoded to support binary data.

    {
      "id": "01HX7Y8K9M2N3P4Q5R6S7T8U9V",
      "type": "UserCreated",
      "payload": "eyJuYW1lIjogIkpvaG4ifQ=="
    }
    
  2. Stream Linking: Streams are directories containing symbolic links to event files. This allows:

    • An event to belong to multiple streams
    • Efficient enumeration of events in a stream
    • No data duplication
  3. Reading Streams: Retrieve all events in a stream using ReadStream:

    // Read all events from the "users" stream
    events, err := store.ReadStream("users")
    if err != nil {
        log.Fatal(err)
    }
    
    for _, event := range events {
        var user UserCreated
        livenstore.FromEvent(event, &user)
        fmt.Printf("User: %s\n", user.Username)
    }
    

    Events are returned in lexicographic order by ID, which corresponds to chronological order due to ULID properties.

  4. Event IDs: Events are assigned ULID identifiers, which are:

    • Lexicographically sortable by creation time
    • Globally unique
    • URL-safe and case-insensitive

Limitations

  • Not suitable for high-concurrency scenarios (no file locking)
  • Performance degrades with very large numbers of events
  • Symlinks may not work on all filesystems

Subscription Manager

LivenStore includes a pub/sub system for subscribing to and publishing events.

SubscriptionManager Interface

type EventHandler func(Event)

type SubscriptionManager interface {
    Subscribe([]EventType, EventHandler)
    Publish(Event)
}

InMemorySubscriptionManager

The InMemorySubscriptionManager is an in-memory implementation that delivers events to subscribers asynchronously via channels.

// Create a subscription manager
manager := livenstore.NewInMemorySubscriptionManager()

// Subscribe to multiple event types with a single handler
manager.Subscribe([]livenstore.EventType{"UserCreated", "UserUpdated"}, func(e livenstore.Event) {
    fmt.Printf("User event: %s\n", e.Type)
})

// Publish an event (delivered to all matching subscribers)
manager.Publish(event)

Features

  • Multiple Event Types: Subscribe to multiple event types with a single handler
  • Multiple Subscribers: Multiple handlers can subscribe to the same event type
  • Async Delivery: Events are delivered asynchronously via buffered channels
  • Integrated with Store: JSONEventStore.Publish() both persists and notifies subscribers

Limitations

  • In-memory only (subscriptions are lost on restart)
  • No persistence of subscription state
  • No guaranteed delivery or retry mechanism

License

MIT