Zeppelin的interpreter体系架构
这里介绍一个基本的例子,实现一个自己的interpreter的方法和配置、使用步骤。
通过可视化平台,对MPP数据平台结果进行可视化展示,从多个角度为用户展示数据结果。
基本实现思路:zeppelin支持自定义解析器,通过自定义解析器完成后台与前端应用的交互。
基于zeppelin实现数据可视化
zeppelin一种基于web的数据交互平台,支持SQL、Scala等交互语言。通过解析器(interpreter)将用户输入转换为后台服务命令,并将后台结果输出,在web页面进行展示。目前interpreter支持的后端应用包括Apache Spark、Python、JDBC、Markdown以及Shell。
自定义解析器可以按照以下步骤实现:
- 根据后台应用,继承org.apache.zeppelin.interpreter并实现自定义interpreter类
- 在zeppelin的interpreter文件夹下创建文件夹(以自定义解析器的名称命名),并放入自定义解析器的jar包
- 修改interpreter-setting.json文件,一般位于
{ZEPPELIN_INTERPRETER_DIR}/{YOUR_OWN_INTERPRETER_DIR}/interpreter-setting.json
,示例如下:
[
{
"group": "your-group",
"name": "your-name",
"className": "your.own.interpreter.class",
"properties": {
"properties1": {
"envName": null,
"propertyName": "property.1.name",
"defaultValue": "propertyDefaultValue",
"description": "Property description"
},
"properties2": {
"envName": PROPERTIES_2,
"propertyName": null,
"defaultValue": "property2DefaultValue",
"description": "Property 2 description"
}, ...
}
},
{
...
}
]
[
{"group": "your-group",
"name": "your-name",
"className": "your.own.interpreter.class",
"properties":{
"properties1":
{
"envName": null,
"propertyName": "property.1.name",
"defaultValue": "propertyDefaultValue",
"description": "Property description"
},
"properties2":
{
"envName": PROPERTIES_2,
"propertyName": null,
"defaultValue": "property2DefaultValue",
"description": "Property 2 description"
}, ...
}
},
{
...
}
]- 可以通过zeppelin-web/bower.json自定义语法高亮显示
- 将自定义解析器及其依赖库放入相应目录
[ZEPPELIN_HOME]/interpreter/[INTERPRETER_NAME]/
- 配置解析器
- 在
conf/zeppelin-site.xml
增加自定义解析器的属性
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,com.me.MyNewInterpreter</value>
</property> - 若使用自定义的配置,则在相应的配置中添加自定义解析器
- 在界面的interpreter配置页,
+Create
添加对自定义解析器
- 在
- 在代码中使用解析器,例如解析器名称为xdataparser
%xdataparser
select count(*) from test;
interpreter主要功能接口
/**
* Opens interpreter. You may want to place your initialize routine here.
* open() is called only once
*/
@ZeppelinApi
public abstract void open();
/**
* Closes interpreter. You may want to free your resources up here.
* close() is called only once
*/
@ZeppelinApi
public abstract void close();
/**
* Run code and return result, in synchronous way.
*
* @param st statements to run
* @param context
* @return
*/
@ZeppelinApi
public abstract InterpreterResult interpret(String st, InterpreterContext context);
/**
* Optionally implement the canceling routine to abort interpret() method
*
* @param context
*/
@ZeppelinApi
public abstract void cancel(InterpreterContext context);
/**
* Dynamic form handling
* see http://zeppelin.apache.org/docs/dynamicform.html
*
* @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}),
* FormType.NATIVE handles form in API
*/
@ZeppelinApi
public abstract FormType getFormType();
/**
* get interpret() method running process in percentage.
*
* @param context
* @return number between 0-100
*/
@ZeppelinApi
public abstract int getProgress(InterpreterContext context);
/**
* Get completion list based on cursor position.
* By implementing this method, it enables auto-completion.
*
* @param buf statements
* @param cursor cursor position in statements
* @return list of possible completion. Return empty list if there're nothing to return.
*/
@ZeppelinApi
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}
/**
* Interpreter can implements it's own scheduler by overriding this method.
* There're two default scheduler provided, FIFO, Parallel.
* If your interpret() can handle concurrent request, use Parallel or use FIFO.
*
* You can get default scheduler by using
* SchedulerFactory.singleton().createOrGetFIFOScheduler()
* SchedulerFactory.singleton().createOrGetParallelScheduler()
*
*
* @return return scheduler instance.
* This method can be called multiple times and have to return the same instance.
* Can not return null.
*/
@ZeppelinApi
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
}
/**
* Called when interpreter is no longer used.
*/
@ZeppelinApi
public void destroy() {
}
interpreter其他接口实现:
public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
private InterpreterGroup interpreterGroup;
private URL [] classloaderUrls;
protected Properties property;
@ZeppelinApi
public Interpreter(Properties property) {
logger.debug("Properties: {}", property);
this.property = property;
}
public void setProperty(Properties property) {
this.property = property;
}
@ZeppelinApi
public Properties getProperty() {
Properties p = new Properties();
p.putAll(property);
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
getClassName());
if (null != registeredInterpreter) {
Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
for (String k : defaultProperties.keySet()) {
if (!p.containsKey(k)) {
String value = defaultProperties.get(k).getValue();
if (value != null) {
p.put(k, defaultProperties.get(k).getValue());
}
}
}
}
return p;
}
@ZeppelinApi
public String getProperty(String key) {
logger.debug("key: {}, value: {}", key, getProperty().getProperty(key));
return getProperty().getProperty(key);
}
public String getClassName() {
return this.getClass().getName();
}
public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
this.interpreterGroup = interpreterGroup;
}
@ZeppelinApi
public InterpreterGroup getInterpreterGroup() {
return this.interpreterGroup;
}
public URL[] getClassloaderUrls() {
return classloaderUrls;
}
public void setClassloaderUrls(URL[] classloaderUrls) {
this.classloaderUrls = classloaderUrls;
}
@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {
for (List<Interpreter> interpreters : interpreterGroup.values()) {
boolean belongsToSameNoteGroup = false;
Interpreter interpreterFound = null;
for (Interpreter intp : interpreters) {
if (intp.getClassName().equals(className)) {
interpreterFound = intp;
}
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
if (this == p) {
belongsToSameNoteGroup = true;
}
}
if (belongsToSameNoteGroup) {
return interpreterFound;
}
}
}
return null;
}
/**
* Type of interpreter.
*/
public static enum FormType {
NATIVE, SIMPLE, NONE
}
/**
* Represent registered interpreter class
*/
public static class RegisteredInterpreter {
//@SerializedName("interpreterGroup")
private String group;
//@SerializedName("interpreterName")
private String name;
//@SerializedName("interpreterClassName")
private String className;
private boolean defaultInterpreter;
private Map<String, InterpreterProperty> properties;
private String path;
public RegisteredInterpreter(String name, String group, String className,
Map<String, InterpreterProperty> properties) {
this(name, group, className, false, properties);
}
public RegisteredInterpreter(String name, String group, String className,
boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
super();
this.name = name;
this.group = group;
this.className = className;
this.defaultInterpreter = defaultInterpreter;
this.properties = properties;
}
public String getName() {
return name;
}
public String getGroup() {
return group;
}
public String getClassName() {
return className;
}
public boolean isDefaultInterpreter() {
return defaultInterpreter;
}
public void setDefaultInterpreter(boolean defaultInterpreter) {
this.defaultInterpreter = defaultInterpreter;
}
public Map<String, InterpreterProperty> getProperties() {
return properties;
}
public void setPath(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public String getInterpreterKey() {
return getGroup() + "." + getName();
}
}
/**
* Type of Scheduling.
*/
public static enum SchedulingMode {
FIFO, PARALLEL
}
public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections
.synchronizedMap(new HashMap<String, RegisteredInterpreter>());
public static void register(String name, String className) {
register(name, name, className);
}
public static void register(String name, String group, String className) {
register(name, group, className, false, new HashMap<String, InterpreterProperty>());
}
public static void register(String name, String group, String className,
Map<String, InterpreterProperty> properties) {
register(name, group, className, false, properties);
}
public static void register(String name, String group, String className,
boolean defaultInterpreter) {
register(name, group, className, defaultInterpreter,
new HashMap<String, InterpreterProperty>());
}
@Deprecated
public static void register(String name, String group, String className,
boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
logger.error("Static initialization is deprecated. You should change it to use " +
"interpreter-setting.json in your jar or " +
"interpreter/{interpreter}/interpreter-setting.json");
register(new RegisteredInterpreter(name, group, className, defaultInterpreter, properties));
}
public static void register(RegisteredInterpreter registeredInterpreter) {
// TODO(jongyoul): Error should occur when two same interpreter key with different settings
String interpreterKey = registeredInterpreter.getInterpreterKey();
if (!registeredInterpreters.containsKey(interpreterKey)) {
registeredInterpreters.put(interpreterKey, registeredInterpreter);
}
}
public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) {
for (RegisteredInterpreter ri : registeredInterpreters.values()) {
if (ri.getClassName().equals(className)) {
return ri;
}
}
return null;
}
官网参考:
https://zeppelin.apache.org/docs/0.6.0/development/writingzeppelininterpreter.html#what-is-apache-zeppelin-interpreter