Event-Driven Flows

Overview

The Transactions Service participates in the FloatMe event architecture as both a consumer and a producer. It consumes events from Auth0 (Log Stream via EventBridge) and raw Plaid data (via Kinesis), and it produces account and transaction change events on an outbound Kinesis stream for downstream services.

event architecture
Direction Source / Destination Lambda Purpose

Inbound

prod-auth0 (EventBridge → SQS)

prod-txn-listener

Triggers re-mining when a user logs in with stale transaction data

Inbound

prod-txn-plaid-transactions (Kinesis)

prod-txn-refiner

Converts raw Plaid data into FloatMe format and persists to DynamoDB

Internal

prod-txn-transactions (DynamoDB Streams)

prod-txn-feeder

Publishes account and transaction change events to the outbound Kinesis stream

Internal

prod-txn-plaid (DynamoDB Streams)

prod-txn-feeder

Same feeder also watches the plaid table stream

Outbound

prod-txn-floatme-transactions (Kinesis)

prod-txn-feeder

Delivers refined account and transaction events to downstream FloatMe services

Outbound

floatme-events (EventBridge)

prod-txn-refiner, prod-txn-feeder

Domain events: new_account, user_new_txns_batch_completed


Listener (Auth0 Login Events)

Trigger

Auth0 publishes Log Stream events to a special EventBridge event bus: prod-auth0 → SQS queue: prod-txn-listener-eventbridge

Events Handled

Event Type Description

gd_auth_succeed

User successfully authenticated

sepft

Suspicious email address pattern (still triggers a re-mine check)

sertft

Suspicious IP address pattern (still triggers a re-mine check)

(all others)

Ignored

Flow

Auth0 Log Stream event arrives (SQS message)
        │
        ▼
Event type in [gd_auth_succeed, sepft, sertft]?
        │ No → skip
        │ Yes
        ▼
Look up most recent Plaid item for user
        │
        ├─ No item found → skip (user has no linked bank)
        │
        └─ Item found
             │
             ▼
        Query webhook history for item:
          - Was there ever a HISTORICAL_UPDATE webhook?
          - Was there a DEFAULT_UPDATE webhook in the last 24 hours?
             │
             ├─ No webhooks at all
             │    └─► Send HISTORICAL_UPDATE to miner SQS
             │         (first-time sync; mine last 365 days)
             │
             ├─ Historical found + recent default found
             │    └─► Skip (data is fresh, no action needed)
             │
             ├─ Historical found + no recent default
             │    └─► Send DEFAULT_UPDATE to miner SQS
             │         (refresh last 14 days)
             │
             └─ No historical found
                  └─► Send HISTORICAL_UPDATE to miner SQS
                       (backfill; mine last 365 days)

The synthetic webhook message has source: "FROM_LISTENER" set so the miner can identify it and skip processing if the item has been removed in the meantime.

Error Handling

Records in a batch are processed concurrently. If a record fails to process, the batch is failed and retried. The prod-txn-listener-eventbridge queue has a DLQ for messages that exhaust retries (max receive count: 5).


Refiner (Plaid Kinesis → DynamoDB)

Trigger

Kinesis stream: prod-txn-plaid-transactions (published by the miner and throttle-miner)

Parallelization factor: 2. Starting position: AT_TIMESTAMP (offset -15 minutes from deploy time).

Flow

Kinesis record received (prod-txn-plaid-transactions)
        │
        ▼
Unmarshal Kinesis wrapper
  (user_id, item_id, accounts, transactions, is_last_page)
        │
        ├─ wrapper.Removed is set (TRANSACTIONS_REMOVED path)
        │    │
        │    └─ For each removed txn_id:
        │         Write Remove record to DynamoDB
        │         Update existing Transaction record → mark as removed
        │
        └─ Regular update path (accounts + transactions)
             │
             ▼
        Process accounts:
          For each account in Plaid response:
            Write Account record to DynamoDB (transactions table)
            Write AccountBalance snapshot to DynamoDB (with TTL)
             │
             ▼
        Process transactions:
          For each transaction in Plaid response:
            ├─ Transaction ID already marked removed (out-of-order)?
            │    └─► Write Remove record
            │         Send removal notification to Insight Service SQS
            └─ New or valid transaction
                 └─► Batch write to Transaction table
             │
             ▼
        All transactions processed AND wrapper.LastPage = true
          AND len(transactions) > 0?
          └─► Emit user_new_txns_batch_completed to EventBridge

Error Handling

Uses BisectBatchOnFunctionError. When a Kinesis batch fails, the batch is split and both halves are retried independently, isolating the problematic record without blocking the rest of the batch.


Feeder (DynamoDB Streams → Kinesis)

Trigger

DynamoDB Streams on two tables, both routing to the same feeder Lambda:

  • prod-txn-transactions — filters for ACCOUNT and TRANSACTION type events

  • prod-txn-plaid — does NOT use the ACCOUNT/TRANSACTION filter set; surfaces ITEM and LAYER TTL expirations and other Plaid-specific event types

Behaviour

The feeder reads the type attribute from each DynamoDB stream record’s new image to determine the entity type and route accordingly. TTL expiration events (identified by UserIdentity.PrincipalID == "dynamodb.amazonaws.com") are handled separately for cleanup logic.

Flow

DynamoDB Stream record received
        │
        ▼
Event is a TTL expiration (UserIdentity = dynamodb.amazonaws.com)?
        │ Yes
        ├─ type = LAYER → check if real item was created
        │    └─ No real item found → call Plaid.RemoveItem()
        └─ type = ITEM → mark item REMOVED, call Plaid.RemoveItem()
        │ No
        ▼
Operation = DELETE?
        │ Yes → skip
        │ No
        ▼
Read type from new image
        │
        ├─ type = TRANSACTION
        │    Convert DynamoDB record → Transaction API model
        │    Publish to prod-txn-floatme-transactions Kinesis
        │    Partition key: user_id
        │
        ├─ type = ACCOUNT
        │    Convert DynamoDB record → Account model
        │    Look up active items for user
        │    Set is_main = true if account matches main_account_id
        │    Emit new_account event to floatme-events EventBridge
        │
        └─ type = (other)
             Skip (log debug)

Outbound Stream

The prod-txn-floatme-transactions Kinesis stream is consumed by downstream FloatMe services (underwriting, floats, subscriptions, etc.) for account and transaction change events.

Error Handling

The feeder processes all records in a batch sequentially. Errors on individual records are logged but do not stop processing of subsequent records. Failed records are reported as batch item failures in the DynamoDB event response, allowing the Lambda runtime to retry only the failed records.