streamsx.endpoint package

Endpoint integration for IBM Streams

Provides endpoints within IBM Cloud Pak for Data enviroment that are exposed through the enpoint-monitor.

This package is for use with Streams in Cloud Pak for Data and provides functions to add Streams operators acting as communication endpoints for your Streams application.

Overview

For details of implementing applications in Python for Streams including Cloud Pak for Data:

Sample

A simple example of a Streams application that provides an endpoint for json injection:

from streamsx.topology.topology import *
from streamsx.topology.context import submit
import streamsx.endpoint as endpoint

topo = Topology()

s1 = endpoint.inject(topo, context='sample', name='jsoninject', monitor='endpoint-sample')
s1.print()

submit ('DISTRIBUTED', topo)
streamsx.endpoint.download_toolkit(url=None, target_dir=None)

Downloads the latest Inetserver toolkit from GitHub.

Example for updating the Inetserver toolkit for your topology with the latest toolkit from GitHub:

import streamsx.endpoint as endpoint
import streamsx.spl.toolkit as tk
# download toolkit from GitHub
toolkit_location = endpoint.download_toolkit()
# add the toolkit to topology
tk.add_toolkit(topology, toolkit_location)

Example for updating the topology with a specific version of the toolkit using a URL:

import streamsx.endpoint as endpoint
import streamsx.spl.toolkit as tk
url430 = 'https://github.com/IBMStreams/streamsx.inetserver/releases/download/v4.3.0/streamsx.inetserver-4.3.0-104fb9b-20191011-1712.tgz'
toolkit_location = endpoint.download_toolkit(url=url430)
tk.add_toolkit(topology, toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

streamsx.endpoint.inject(topology, context, name, monitor, schema=<CommonSchema.Json: <streamsx.topology.schema.StreamSchema object>>)

Receives HTTP POST requests.

Embeds a Jetty web server to allow HTTP/HTTPS POST requests with the following mime types to be submitted as tuple on the output stream:

schema content-type
CommonSchema.Json application/json
CommonSchema.XML application/xml
CommonSchema.String application/x-www-form-urlencoded
StreamSchema application/x-www-form-urlencoded

Example for JSON injection:

import streamsx.endpoint as endpoint
topo = Topology()
s1 = endpoint.inject(topo, context='sample', name='json', monitor='endpoint-in')
s1.print()

The injection URL (application/json) containing “context/name” for the sample above ends with: /sample/json/inject

URL mapping

The URL contains the following parts:

https://<base-url>/<prefix>/<context>/<name>/<postfix>

For a web-server in a job its URLs are exposed with prefix path:

  • jobname/ - When a job name was explictly set. Job names should be simple mapping to a single path element.
  • streams/jobs/jobid/ - When a job name was not explicitly set.

Example URLs within the cluster for application-name of “em” in project “myproject” are

  • with a web-server in job named “transit” with context “sample” and name “json”:
    https://em.myproject.svc:8443/transit/sample/json/inject
  • with a web-server in job 7:
    https://em.myproject.svc:8443/streams/jobs/7/sample/json/inject
  • retrieve information for job named “transit” with context “sample” and name “json”:
    https://em.myproject.svc:8443/transit/sample/json/ports/info
Parameters:
  • topology – The Streams topology.
  • context (str) – Defines an URL context path. URL contains context/name.
  • name (str) – Source name in the Streams context. This name is part of the URL.
  • monitor (str) – The name of the endpoint-monitor that provides the ssl configuration for this endpoint. If it is None, the connection uses plain HTTP
  • schema – Schema for returned Stream, default is CommonSchema.Json
Returns:

Output Stream with schema defined in schema parameter (default CommonSchema.Json).

streamsx.endpoint.expose(window, context, name, monitor)

REST HTTP/HTTPS API to view tuples from a window on a stream.

Embeds a Jetty web server to provide HTTP REST access to the collection of tuples in window at the time of the last eviction for tumbling windows, or last trigger for sliding windows.

Example with a sliding window:

import streamsx.endpoint as endpoint
s = topo.source([{'a': 'Hello'}, {'a': 'World'}, {'a': '!'}]).as_json()
endpoint.expose(window=s.last(3).trigger(1), context='sample', name='view', monitor='endpoint-out')

The URL containing “context/name” for the sample above ends with: /sample/view/tuples

URL mapping

The URL contains the following parts:

https://<base-url>/<prefix>/<context>/<name>/<postfix>

For a web-server in a job its URLs are exposed with prefix path:

  • jobname/ - When a job name was explictly set. Job names should be simple mapping to a single path element.
  • streams/jobs/jobid/ - When a job name was not explicitly set.

Example URLs within the cluster for application-name of “em” in project “myproject” are

  • with a web-server in job named “transit” with context “sample” and name “view”:
    https://em.myproject.svc:8443/transit/sample/view/tuples
  • with a web-server in job 7:
    https://em.myproject.svc:8443/streams/jobs/7/sample/view/tuples
  • retrieve information for job named “transit” with context “sample” and name “view”:
    https://em.myproject.svc:8443/transit/sample/view/ports/info
Parameters:
  • window (Window) – Windowed stream of tuples that will be viewable using a HTTP GET request.
  • context (str) – Defines an URL context path. URL contains context/name.
  • name (str) – Sink name in the Streams context. This name is part of the URL.
  • monitor (str) – The name of the endpoint-monitor that provides the ssl configuration for this endpoint. If it is None, the connection uses plain HTTP
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

Indices and tables