Appending events
When you start working with EventStoreDB, the database is empty. So, the first meaningful operation in this case would be to add one or more events to the database using one of the available client SDKs.
TIP
Check the Getting Started guide to learn how to configure and use the client SDK.
Append your first event
The simplest way to append an event to EventStoreDB is to create an EventData
object and call AppendToStream
method.
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
commit_position = client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=[event1],
)
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
type SomeEvent = JSONEventType<
"some-event",
{
id: string;
value: string;
}
>;
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("some-stream", options, eventData)
.get();
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"some-stream",
StreamState.NoStream,
new List<EventData> {
eventData
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
result, err := db.AppendToStream(context.Background(), "some-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;
As you can see, AppendToStream
takes a collection of EventData
, which makes possible saving more than one event in a single batch.
As well as the example above there is also a number of other options for dealing with different scenarios.
TIP
If you are new to Event Sourcing, please study the Handling concurrency section below.
Working with EventData
When appending events to EventStoreDB they must first all be wrapped in an EventData
object. This allows you to specify the content of the event, the type of event and whether its in Json format. In its simplest form you need to the three following arguments:
eventId
This takes the format of a Uuid
and is used to uniquely identify the event you are trying to append. If two events with the same Uuid
are appended to the same stream in quick succession EventStoreDB will only append one copy of the event to the stream.
For example, the following code will only append a single event:
event = NewEvent(
id=uuid4(),
type="some-event",
data=b'{"important_data": "some value"}',
metadata=b"{}",
content_type="application/json",
)
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
)
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
)
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.any());
client.appendToStream("same-event-stream", options, eventData)
.get();
// attempt to append the same event again
client.appendToStream("same-event-stream", options, eventData)
.get();
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
}
);
// attempt to append the same event again
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
id := uuid.New()
event := esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
EventID: id,
Data: bytes,
}
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default();
let _ = client
.append_to_stream("same-event-stream", &options, event.clone())
.await?;
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
type
An event type should be supplied for each event. This is a unique string used to identify the type of event you are saving.
It is common to see the explicit event code type name used as the type as it makes serialising and de-serialising of the event easy. However, we recommend against this as it couples the storage to the type and will make it more difficult if you need to version the event at a later date.
data
Representation of your event data. It is recommended that you store your events as JSON objects as this will allow you to make use of all of EventStoreDB's functionality such as projections. Ultimately though, you can save it using whatever format you like as eventually, it will be stored as encoded bytes.
metadata
It is common to need to store additional information along side your event that is part of the event itself. This can be correlation Id's, timestamps, access information etc. EventStoreDB allows you to store a separate byte array containing this information to keep it separate.
isJson
Simple boolean field to tell EventStoreDB if the event is stored as json, true by default.
Handling concurrency
When appending events to a stream you can supply a stream state or stream revision. Your client can use this to tell EventStoreDB what state or version you expect the stream to be in when you append. If the stream isn't in that state then an exception will be thrown.
For example if we try and append the same record twice expecting both times that the stream doesn't exist we will get an exception on the second:
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
stream_name = str(uuid4())
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=event1,
)
event2 = NewEvent(
type="some-event",
data=b'{"important_data": "some other value"}',
)
try:
# attempt to append the same event again
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=event2,
)
except exceptions.WrongCurrentVersion:
print("Error appending second event")
const eventOne = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
const eventOne = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
EventData eventDataOne = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
EventData eventDataTwo = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"some other value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("no-stream-stream", options, eventDataOne)
.get();
// attempt to append the same event again
client.appendToStream("no-stream-stream", options, eventDataTwo)
.get();
var eventDataOne = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
var eventDataTwo = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"2\" \"value\": \"some other value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataOne
}
);
// attempt to append the same event again
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataTwo
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
if err != nil {
panic(err)
}
bytes, err = json.Marshal(TestEvent{
Id: "2",
ImportantData: "some other value",
})
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "some other value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
There are three available stream states:
Any
NoStream
StreamExists
This check can be used to implement optimistic concurrency. When you retrieve a stream from EventStoreDB, you take note of the current version number, then when you save it back you can determine if somebody else has modified the record in the meantime.
original_version = StreamState.NO_STREAM
for event in client.read_stream(stream_name):
original_version = event.stream_position
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
client.append_to_stream(
stream_name=stream_name,
current_version=original_version,
events=event1,
)
event2 = NewEvent(
type="some-event",
data=b'{"important_data": "some other value"}',
)
try:
client.append_to_stream(
stream_name=stream_name,
current_version=original_version,
events=event2,
)
except exceptions.WrongCurrentVersion:
print("Error appending event2")
const events = client.readStream("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
const events = client.readStream<SomeEvent>("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision: AppendExpectedRevision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
ReadStreamOptions readStreamOptions = ReadStreamOptions.get()
.forwards()
.fromStart();
ReadResult result = client.readStream("concurrency-stream", readStreamOptions)
.get();
EventData clientOneData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"clientOne"
))
.build();
EventData clientTwoData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"clientTwo"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(result.getLastStreamPosition());
client.appendToStream("concurrency-stream", options, clientOneData)
.get();
client.appendToStream("concurrency-stream", options, clientTwoData)
.get();
var clientOneRead = client.ReadStreamAsync(
Direction.Forwards,
"concurrency-stream",
StreamPosition.Start
);
var clientOneRevision = (await clientOneRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientTwoRead = client.ReadStreamAsync(Direction.Forwards, "concurrency-stream", StreamPosition.Start);
var clientTwoRevision = (await clientTwoRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientOneData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"clientOne\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientOneRevision,
new List<EventData> {
clientOneData
}
);
var clientTwoData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"2\" \"value\": \"clientTwo\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientTwoRevision,
new List<EventData> {
clientTwoData
}
);
ropts := esdb.ReadStreamOptions{
Direction: esdb.Backwards,
From: esdb.End{},
}
stream, err := db.ReadStream(context.Background(), "concurrency-stream", ropts, 1)
if err != nil {
panic(err)
}
defer stream.Close()
lastEvent, err := stream.Recv()
if err != nil {
panic(err)
}
data := TestEvent{
Id: "1",
ImportantData: "clientOne",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
aopts := esdb.AppendToStreamOptions{
ExpectedRevision: lastEvent.OriginalStreamRevision(),
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
data = TestEvent{
Id: "1",
ImportantData: "clientTwo",
}
bytes, err = json.Marshal(data)
if err != nil {
panic(err)
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let options = ReadStreamOptions::default().position(StreamPosition::End);
let last_event = client
.read_stream("concurrency-stream", &options)
.await?
.next()
.await?
.expect("the stream to at least exist.");
let data = TestEvent {
id: "1".to_string(),
important_data: "clientOne".to_string(),
};
let event = EventData::json("some-event", data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::Exact(
last_event.get_original_event().revision,
));
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "clientTwo".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
User credentials
You can provide user credentials to be used to append the data as follows. This will override the default credentials set on the connection.
credentials = client.construct_call_credentials(
username="admin",
password="changeit",
)
commit_position = client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
credentials=credentials,
)
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
UserCredentials credentials = new UserCredentials("admin", "changeit");
AppendToStreamOptions options = AppendToStreamOptions.get()
.authenticated(credentials);
client.appendToStream("some-stream", options, eventData)
.get();
await client.AppendToStreamAsync(
"some-stream",
StreamState.Any,
new[] { eventData },
userCredentials: new UserCredentials("admin", "changeit"),
cancellationToken: cancellationToken
);
credentials := &esdb.Credentials{Login: "admin", Password: "changeit"}
result, err := db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{Authenticated: credentials}, event)
let options =
AppendToStreamOptions::default().authenticated(Credentials::new("admin", "changeit"));
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;