Flink:ElasticsearchSinkFunction is not serializable

用Java把Flink结果数据下沉到Elasticsearch,执行时执出ElasticsearchSinkFunction is not serializable异常:

The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

代码如下:

ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(esHttphost,
    new ElasticsearchSinkFunction<String>() {

        public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);
            
            log.info("data -> " + json);

            return Requests.indexRequest().source(json);
        }

        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }
    }
);

网上有的说是因为没有设置setRestClientFactory实现导致,也有的把ElasticsearchSinkFunction声明static解决的。

声明为static方法虽然能解决,但这种做法不好。这个问题的原因是,从外部传入两个关于 ES 的参数,导致 ElasticSearchSinkFunction 类无法被序列化。解决方法,就是实现 ElasticsearchSinkFunction 接口,并标记 Serializable,再将外部参数,通过 ElasticSearchSinkFunction 子类的构造函数传入(这里还需要注意避免使用 static 属性)。类似的,还有 PatternFlatSelectFunction 和IterativeCondition,对于外部传入的实例,如果因为序列化,也可能会出现 NullPointerException 异常。这时候,就算通过实现Cloneable 接口,对外部实例进行 clone,也会无法避免。因此,需要在 PatternFlatSelectFunction 或IterativeCondition 内部,重新初始化实例,才能解决该问题。

代码如下:

参考:

https://github.com/asdf2014/gitment/issues/56

https://yuzhouwan.com/posts/20644/


已发布

分类

来自

标签:

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注