Using the Rekor Event Stream

The public Rekor instance provides an event stream of new entries added to the transparency log using GCP Pub/Sub. This can be used to monitor the log in real-time for events you are interested in.

Pub/Sub details

Important: Pub/Sub usage is not free. Please familiarize yourself with the pricing before proceeding.

Tip: You can avoid paying egress network costs by placing the workload that processes events from the subscription in the same region as the topic.

The details needed to subscribe to the topic are below. Any authenticated GCP principal can create subscriptions to the topic.

ItemValue
GCP Project IDproject-rekor
Pub/Sub Topic IDnew-entry
Pub/Sub Topic Regionus-central1

Event format

The events are published in both the JSON and Protobuf formats of the CloudEvents specificaion. The data field is always a serialized TransparencyLogEntry proto message. For JSON formatted events, this follows the JSON serialization specification for Protobufs.

All published events have attributes attached to them. Attributes that are custom to Rekor and not a part of the CloudEvents specification are prefixed by rekor_. You can add server-side filters based on these attributes to automatically acknowledge messages you are not interested in to reduce costs.

AttributeValue
datacontenttypeapplication/json or application/protobuf
idThe unique identifier for the entry in the Rekor log.
rekor_entry_kindThe kind of the Rekor entry (hashedrekord, intoto, etc.)
rekor_signing_subjectsA sorted, comma-delimeted list of the principals that signed the entry.
source/createLogEntry
specversion1.0
timeThe timestamp the entry was added to the log.
typedev.sigstore.rekor.events.v1.NewEntry

Example usage

In this example scenario, we want to monitor all log entries added by GitHub Actions CI workers in the Sigstore GitHub org. We do this to verify that all CI runs that upload entries to Rekor are from the main branch only. Uploads from other branches could indicate a GitHub Actions misconfiguration that allowed a malicious actor to submit a pull request which triggered a CI run that allowed them to sign their artifacts using our identity.

Setting up the subscription

Note: To perform the steps below, you will need the gcloud CLI.

First, define some environment variables. You may pick any value you like for SUBSCRIPTION_PROJECT_ID and SUBSCRIPTION_ID. The value for SUBSCRIPTION_PROJECT_ID must be globally unique among all GCP projects.

export SUBSCRIPTION_PROJECT_ID="your-gcp-project-id"
export SUBSCRIPTION_ID="new-entry-subscription"
export SUBSCRIPTION_CONTENT_TYPE="application/protobuf"

If the project does not exist, you can create it with the command below.

gcloud projects create $SUBSCRIPTION_PROJECT_ID

Then, you can create the subscription, filtering on the content type and the signing subjects we want to receive messages about.

export SUBSCRIPTION_FILTER='attributes.datacontenttype = "$SUBSCRIPTION_CONTENT_TYPE" AND hasPrefix(attributes.rekor_signing_subjects, "https://github.com/sigstore/")'
gcloud pubsub subscriptions create \
    $SUBSCRIPTION_ID \
    --project $SUBSCRIPTION_PROJECT_ID \
    --topic new-entry \
    --topic-project project-rekor \
    --message-filter $SUBSCRIPTION_FILTER

The subscription is now set up and we can pull the events

Processing log entries

The first step is to create a program that pulls all messages from the subscription indefinitely. It passes all received messages to a handleMsg function. The code examples below omit imports from Go’s standard library for brevity.

import (
	"cloud.google.com/go/pubsub"
)

func main() {
	ctx, done := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
	defer done()

	// Bootstrap the PubSub client.
	client, err := pubsub.NewClient(ctx, os.Getenv("SUBSCRIPTION_PROJECT_ID"))
	if err != nil {
		log.Fatal(err)
	}
	sub := client.Subscription(os.Getenv("SUBSCRIPTION_ID"))

	// Continuously read messages into a channel.
	msgs := make(chan *pubsub.Message)
	go func() {
		for {
			err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
				msgs <- msg
			})
			if err != nil {
				log.Print(err)
			}
		}
	}()

	// Handle each message received until the process is terminated.
	for {
		select {
		case msg := <-msgs:
			if err := handleMsg(msg); err != nil {
				log.Print(err)
			}
			msg.Ack()
		case <-ctx.Done():
			return
		}
	}
}

The handleMsg function deserializes the event and then checks the signing subjects.

import (
	"google.golang.org/protobuf/proto"
	eventspb "github.com/sigstore/protobuf-specs/gen/pb-go/events/v1"
)

func handleMsg(msg *pubsub.Message) error {
	event := new(eventspb.CloudEvent)
	if err := proto.Unmarshal(msg.Data, event); err != nil {
		return fmt.Errorf("unmarshal msg %q: %w", msg.ID, err)
	}

	// In a real implementation, you should verify the entry before proceeding!

	attr, ok := event.Attributes["rekor_signing_subjects"]
	if !ok {
		return fmt.Errorf("entry %q is missing rekor_signing_subjects", event.GetId())
	}
	subjects := strings.Split(attr.GetCeString(), ",")
	for _, subject := range subjects {
		if !strings.HasSuffix(subject, "@refs/heads/main") {
			return fmt.Errorf("entry %q was not run from the main branch: %s", event.GetId(), subject)
		}
	}

	return nil
}