Skip to main content

Pulsar

Integration Details

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our metadata ingestion guide.

Module pulsar

Incubating

Important Capabilities

CapabilityStatusNotes
DomainsSupported via the domain config field
Platform InstanceEnabled by default

PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)

NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Prerequisites

In order to ingest metadata from Apache Pulsar, you will need:

  • Access to a Pulsar Instance, if authentication is enabled a valid access token.
  • Pulsar version >= 2.7.0

NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
client_idstringThe application's client IDNone
client_secretstringThe application's client secretNone
exclude_individual_partitionsbooleanExtract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.True
issuer_urlstringThe complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.None
oid_configobjectPlaceholder for OpenId discovery documentNone
platform_instancestringThe instance of the platform that all assets produced by this recipe belong toNone
tenantsarray(string)None
timeoutintegerTimout setting, how long to wait for the Pulsar rest api to send data before giving up5
tokenstringThe access token for the application. Mandatory for token based authentication.None
verify_sslUnionType (See notes for variants)Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.TrueOne of boolean,string
web_service_urlstringThe web URL for the cluster.http://localhost:8080
envstringThe environment that all assets produced by this connector belong toPROD
domainmap(str,AllowDenyPattern)A class to store allow deny regexesNone
domain.key.allowarray(string)None
domain.key.denyarray(string)None
domain.key.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
namespace_patternsAllowDenyPatternList of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.{'allow': ['.*'], 'deny': ['public/functions'], 'ignoreCase': True}
namespace_patterns.allowarray(string)None
namespace_patterns.denyarray(string)None
namespace_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
tenant_patternsAllowDenyPatternList of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.{'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase': True}
tenant_patterns.allowarray(string)None
tenant_patterns.denyarray(string)None
tenant_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
topic_patternsAllowDenyPatternList of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.{'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase': True}
topic_patterns.allowarray(string)None
topic_patterns.denyarray(string)None
topic_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
stateful_ingestionStatefulStaleMetadataRemovalConfigsee Stateful IngestionNone
stateful_ingestion.enabledbooleanThe type of the ingestion state provider registered with datahub.None
stateful_ingestion.ignore_new_statebooleanIf set to True, ignores the current checkpoint state.None
stateful_ingestion.ignore_old_statebooleanIf set to True, ignores the previous checkpoint state.None
stateful_ingestion.remove_stale_metadatabooleanSoft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.True

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack